请教一个 Flink SQL 的问题,解决了星巴克感谢 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
kerie
V2EX    问与答

请教一个 Flink SQL 的问题,解决了星巴克感谢

  •  
  •   kerie 2023-06-12 11:17:41 +08:00 2913 次点击
    这是一个创建于 901 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!

    Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。

    为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:

    CREATE TEMPORARY TABLE customers ( id INT, name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://....' ); CREATE TEMPORARY TABLE orders ( order_id STRING, customer_id INT, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '...' ); CREATE TEMPORARY VIEW order_per_minute AS SELECT customer_id, count(*) as cnt, TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end FROM orders GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE); INSERT INTO destination SELECT COALESCE(window_end, CURRENT_TIMESTAMP), customer_id, COALESCE(cnt, 0), FROM customers LEFT JOIN order_per_minute ON customers.id = order_per_minute.customer_id; 

    实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:

    10:01, c1, 19 10:01, c2, 32 10:01, c3, 0 

    随后每分钟的数据,就会少掉 c3 的结果:

    10:02, c1, 18 10:02, c2, 22 // c3 没有输出 10:03, c1, 18 10:03, c2, 22 // c3 没有输出 

    我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!

    第 1 条附言    2023-06-12 13:09:59 +08:00
    为了防止 customer 、order 场景误导,这里说一下实际场景:
    实际是要监控 IoT 设备的数据推送,IoT 设备全量表是预先初始化好的,保存在 MySQL 数据表; IoT 数据推送是一个 kafka 流消息,理论上每个 IoT 设备每秒都有数据。需求是监控哪些 IoT 设备数据是中断或缺失的。
    32 条回复    2023-06-13 05:19:39 +08:00
    sijue
        1
    sijue  
       2023-06-12 11:47:53 +08:00   1
    上面 left join 是 regural join ,没有数据可能是因为 customer 中 c3 数据在 10:02 和 10:03 没有记录,建议使用 lookup join
    sijue
        2
    sijue  
       2023-06-12 11:49:36 +08:00
    F281M6Dh8DXpD1g2
        3
    F281M6Dh8DXpD1g2  
       2023-06-12 11:51:21 +08:00   1
    你应该用用户表左关联
    kerie
        4
    kerie  
    OP
       2023-06-12 11:55:01 +08:00
    @sijue 对的,上面的 left join 是 regular join ,没有数据是因为 customer 表没有时间戳信息,它只是一个全量客户的表,没有带时间戳信息。
    lookup join 有上面一样的问题,不能输出没有 order 数据的 customer 统计。
    kerie
        5
    kerie  
    OP
       2023-06-12 11:57:55 +08:00
    @liprais 我用的 customers left join order_per_minute ,但不能统计出没有 order 的 customer
    F281M6Dh8DXpD1g2
        6
    F281M6Dh8DXpD1g2  
       2023-06-12 11:58:23 +08:00
    "If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause."
    kerie
        7
    kerie  
    OP
       2023-06-12 12:08:03 +08:00
    @liprais 这里的`table function`指的是 udf (用户自定义函数)吗,不使用 udf 可以吗
    sijue
        8
    sijue  
       2023-06-12 12:25:57 +08:00
    1.有个治标的方式:维表初始化目前存量用户数据,订单数为 0 ,订单表一般是注册之后才能下单;
    2.有个疑问点:有订单,用户表和订单表都会存数据,是否存在订单数据比用户数据迟到的点
    t3zb2xzvjm4yvmn
        9
    t3zb2xzvjm4yvmn  
       2023-06-12 13:01:37 +08:00   1
    实现过类似的需求,首先 lookup join 肯定是不行了,事实流中没有出现的用户肯定关联不到的。

    比较彻底的解决方案是使用 datastream API ,process function 。还要状态编程,因为需要将维表的状态自己维护,比如把状态放到一个 tuple2 里,t0 是 customer id ,t1 给默认值 0 ,然后拿另外一个流的 element ,每来一条就给 t1+1 ,窗口触发时把所有的 tuple2 向下游发送。

    需要额外考虑的一点是,有可能某个时间窗口内 1 个下单的都没有(比如半夜),那么该窗口无法触发,没有任何输出,所以默认的滚动窗口\事件时间语义就不太行。还需要再实现一个窗口,事件时间、处理时间混合语义,保证即使没有事件仍然可以触发,输出所有用户下单数均为 0 的情况。

    Flink SQL 没有实现过,但是有一个简单粗暴的想法,定时把维表的数据全量发到 flink ,构造出所有包含用户的事实流,这样你只需要改造 customers 维表那里就够了。
    缺点是需要不断地读维表,对 MySQL 增加压力; source 端不断地向下游发维度信息,实际上不符合事件驱动和流式计算的原则。

    我猜测这个需求数据量不大,实时性要求也没那么高,使用 spark streaming 可能是更好的选择。
    kerie
        10
    kerie  
    OP
       2023-06-12 13:03:24 +08:00
    @sijue 1. 不行,需求就是监控哪些用户没有下订单(这里的场景我做了更改,实际是监控哪些 IoT 设备没有上传数据,我在主题里也 append 更新下)
    2. 用户表是提前全量初始化好的,不考虑更新问题,用户表没有时间信息;订单表,理论每个用户每分钟都有数据。
    David1119
        11
    David1119  
       2023-06-12 13:22:19 +08:00   1
    参考 9 楼的,process function 最方便,sql 做基础 etl 没问题,复杂一点的逻辑用 datastream 更灵活方便,存一下上一秒的 state ,做比较,可以判断类似连续 2 秒没数据然后报警推送,甚至连续 2s 没数据,但是 5s 内能上传上来就算正常这样的场景,随意发挥
    kerie
        12
    kerie  
    OP
       2023-06-12 13:24:19 +08:00
    @t3zb2xzvjm4yvmn 我感觉看到希望了,我的数据量不大,你知道如何粗暴的每分钟把维表的数据全量发到 flink ?
    用 DataStream API ,你说的额外考虑的场景暂时不用考虑,我们是 IoT 设备,理论不会出现 1 分钟全部设备没有数据的情况,后续优化再额外每分钟加一些 mock 数据,强制触发窗口。
    kerie
        13
    kerie  
    OP
       2023-06-12 13:28:49 +08:00
    @David1119 我再挣扎下,新手感觉 Flink SQL 更直观一些,但如果实在做不了就放弃 SQL 的方案,转 DataStream API process function 。
    fuyufjh
        14
    fuyufjh  
       2023-06-12 13:42:19 +08:00   1
    这是一个典型的 micro batch 需求吧,1 分钟执行一次。用时间条件做过滤能起到很好的过滤效果,执行很快的
    kerie
        15
    kerie  
    OP
       2023-06-12 13:51:57 +08:00
    @fuyufjh 大佬能讲一下用 Flink 具体怎么做吗
    t3zb2xzvjm4yvmn
        16
    t3zb2xzvjm4yvmn  
       2023-06-12 14:16:55 +08:00   1
    @kerie Flink SQL 好像没有现成的方法,可以自定义 source table ,你研究一下吧
    或者不在 Flink 里做,在外部写一个 Java/Python 程序用 JDBC 和 kafka API ,定时把数据推到 kafka ,用 Flink SQL 接 kafka 就比较方便了。
    kerie
        17
    kerie  
    OP
       2023-06-12 14:26:33 +08:00
    @t3zb2xzvjm4yvmn 我研究一下,感谢!
    leonhao
        18
    leonhao  
       2023-06-12 14:28:29 +08:00   1
    Flink SQL 无法实现,需要自己写 stream api 。如果可能出现这种情况,根本不需要 Flink ,把数据写到数据库在算就行
    kerie
        19
    kerie  
    OP
       2023-06-12 14:43:41 +08:00
    @leonhao 数据都写到数据库,还是得一个额外的程序,每分钟做一次计算吧
    leonhao
        20
    leonhao  
       2023-06-12 14:55:09 +08:00
    @kerie 用 timescaledb 之类的时间序列数据库,使用物化视图,每分钟聚合一次,都是自带的功能,很 Flink 简单多了
    kerie
        21
    kerie  
    OP
       2023-06-12 15:10:49 +08:00
    @leonhao 谢谢,时序数据库虽然有物化视图功能,但时序数据库本身不能做监控报警,还是需要外部程序做定时查询,将聚合数据推送给监控报警系统。
    weakbd
        22
    weakbd  
       2023-06-12 15:19:14 +08:00   1
    时间语义试着用 ProcessingTime,再结合滚动窗口去实现你的操作
    leonhao
        23
    leonhao  
       2023-06-12 15:27:26 +08:00
    @kerie grafana
    kerie
        24
    kerie  
    OP
       2023-06-12 15:31:26 +08:00
    @weakbd Flink 的时间属性和 join 语法理解有点困难,可以给个示例,我测试一把吗
    fuyufjh
        25
    fuyufjh  
       2023-06-12 15:33:54 +08:00   2
    @kerie 我是觉得不应该用 Flink 做。Streaming 擅长增量计算,这个问题本质是一个全量计算,“全量”指最近 1min 的全量数据。

    我觉得用数仓、定时 1min 查一次是最合适的。

    Flink 的理念是 event-driven ,反过来说,如果没有 event 则不该触发任何计算。举个简化的例子,如果这 1min 内没有任何订单,你应该也希望得到一个全部 count=0 的结果吧(而不是什么也不输出),那么这就与 event-driven 相违背了。
    BiggerLonger
        26
    BiggerLonger  
       2023-06-12 16:18:35 +08:00 via iPad   1
    要不时时 time window ?
    weakbd
        27
    weakbd  
       2023-06-12 16:50:19 +08:00   1
    #25 是的,仔细看了下 op 的需求,你不应该以你的维表为基准,以实现 count=0 的结果,在 flink sql 中我是没想到能实现的方式。不过 datastream api 更灵活是可以做到的,你可以尝试下试试使用 AggregateFunction 实现你的需求呢
    kerie
        28
    kerie  
    OP
       2023-06-12 17:27:38 +08:00
    @fuyufjh 感谢答复,你对 event-driven 的理解让我很有启发,单这个需求来看确实用数仓更合适一些,
    @weakbd 也感谢你
    kerie
        29
    kerie  
    OP
       2023-06-12 17:31:21 +08:00
    感谢各位 xdjm 的热心答复,我最终决定采纳 @t3zb2xzvjm4yvmn 的方案,大佬留下 vx ,我转你一杯咖啡的钱
    t3zb2xzvjm4yvmn
        30
    t3zb2xzvjm4yvmn  
       2023-06-12 18:08:37 +08:00
    @kerie QnVubnk1NDE=
    kerie
        31
    kerie  
    OP
       2023-06-12 18:21:16 +08:00
    alwaysdazz
        32
    alwaysdazz  
       2023-06-13 05:19:39 +08:00 via Android   1
    经典维表驱动流表的案例,之前遇到过该问题,用 Python udf 解决掉的。。。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2736 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 28ms UTC 06:35 PVG 14:35 LAX 22:35 JFK 01:35
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86