关于准实时流数据处理问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
alexfarm
V2EX    程序员

关于准实时流数据处理问题

  •  
  •   alexfarm 2020-05-05 16:45:10 +08:00 2918 次点击
    这是一个创建于 1985 天前的主题,其中的信息可能已经有所发展或是发生改变。

    场景:生产上有很多客服和用户的会话数据,上游系统通过 kafka 给到我们,我们会对数据做些处理便于运营人员查询会话数据。 实时性:生产 30 分钟后,运营能查询到数据即可,这应该算个准实时场景吧。 问题:其中一个就是调用机器学习模型产出一些标签,目前的处理方式是单通对话同步请求模型得到结果再持久化。算法工程师说并发高的时候离线模型压力比较大,希望数据能平稳点请求模型。 现在就是几个方案,因为上游数据是从 kafka 消费到的,要么控制好消费者的数量,每次请求都同步等待,但感觉比较挫且不易扩展。或者就是用流数据计算框架,但是没什么实战经验。想问问大家有没有什么好的方案。

    15 条回复    2020-05-06 22:44:50 +08:00
    MinQ
        1
    MinQ  
       2020-05-05 17:26:12 +08:00
    batch 呗,比如数据每 10s 且每 100 个一起输入模型
    alexfarm
        2
    alexfarm  
    OP
       2020-05-05 20:15:55 +08:00
    @MinQ 10 秒的时间窗口不大好控制 batch size 啊。而且这个有点 spark streaming 的意思,是不是可以用这个去实现?
    bigmercu
        3
    bigmercu  
       2020-05-05 21:47:29 +08:00
    可以考虑使用 structured streaming,参考
    https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets
    Rate Source
    应该可以实现控制消费数量或时间。
    MinQ
        4
    MinQ  
       2020-05-05 23:35:25 +08:00 via Android
    @alexfarm 时间到了去取,每次取前 100 个,不够的话全部取出
    MinQ
        5
    MinQ  
       2020-05-05 23:37:26 +08:00 via Android
    @alexfarm 本质上就是借鉴了消息队列的削峰填谷的用处
    tairan2006
        6
    tairan2006  
       2020-05-06 00:59:50 +08:00 via Android
    Spark 就行
    alexfarm
        7
    alexfarm  
    OP
       2020-05-06 11:15:18 +08:00
    @tairan2006 就是感觉没有很多计算的工作,用 spark 有点怪怪的
    alexfarm
        8
    alexfarm  
    OP
       2020-05-06 11:16:09 +08:00
    @MinQ 一开始也是这样设计的,但是请求时候是同步等待的,等下游模型处理完了再去取一些数据
    MinQ
        9
    MinQ  
       2020-05-06 11:35:15 +08:00 via Android
    所以你这个是有两个模型,第一个模型会先调用第二个模型,然后等第二个模型处理完了第一个模型才能得到输入所需要的数据?
    alexfarm
        10
    alexfarm  
    OP
       2020-05-06 12:57:35 +08:00
    @MinQ 两个模型不相关的,之所以同步等待,还是想匹配上模型处理的速度,不然会堆积许多的请求
    MinQ
        11
    MinQ  
       2020-05-06 14:25:17 +08:00
    @alexfarm 那为啥不完全设计成异步的形式,同步等待难道性能不是取决于最慢的那个模型,不是会堆积更多的请求?
    alexfarm
        12
    alexfarm  
    OP
       2020-05-06 15:35:30 +08:00
    @MinQ 这样这些数据是堆积在消息系统中的。如果异步的话,算法前面没有缓冲的地方,应该自身的线程池满了就挂了
    owenliang
        13
    owenliang  
       2020-05-06 16:49:58 +08:00
    flink,window 。
    MinQ
        14
    MinQ  
       2020-05-06 19:41:05 +08:00
    @alexfarm 不明白,消息队列本身不就是缓冲么?
    alexfarm
        15
    alexfarm  
    OP
       2020-05-06 22:44:50 +08:00
    @MinQ 我们是中间层,消费消息队列里的数据做一次处理,然后再调用算法层的,算法层之前没有消息队列做缓存的。我说的同步或者异步是中间层调用算法层的。本身生产数据和中间层已经通过消息队列解耦了
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2633 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 28ms UTC 06:04 PVG 14:04 LAX 23:04 JFK 02:04
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86