Spark Streaming 的优化之从 Receiver 模式到 Direct 模式 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
getui
V2EX    互联网

Spark Streaming 的优化之从 Receiver 模式到 Direct 模式

  •  
  •   getui 2019-06-17 11:10:40 +08:00 2713 次点击
    这是一个创建于 2309 天前的主题,其中的信息可能已经有所发展或是发生改变。

    作者:个推数据研发工程师 学长

    1 业务背景

    随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析、决策。Spark Streaming 是一种分布式的大数据实时计算框架,他提供了动态的,高吞吐量的,可容错的流式数据处理,不仅可以实现用户行为分析,还能在金融、舆情分析、网络监控等方面发挥作用。个推开发者服务消息推送“应景推送”正是应用了 Spark Streaming 技术,基于大数据分析人群属性,同时利用 LBS 地理围栏技术,实时触发精准消息推送,实现用户的精细化运营。此外,个推在应用 Spark Streaming 做实时处理 kafka 数据时,采用 Direct 模式代替 Receiver 模式的手段,实现了资源优化和程序稳定性提升。

    本文将从 Spark Streaming 获取 kafka 数据的两种模式入手,结合个推实践,带你解读 Receiver 和 Direct 模式的原理和特点,以及从 Receiver 模式到 Direct 模式的优化对比。

    2 两种模式的原理和区别

    Receiver 模式

    1. Receiver 模式下的运行架构

    1)InputDStream: 从流数据源接收的输入数据。

    2)Receiver:负责接收数据流,并将数据写到本地。

    3)Streaming Context:代表 SparkStreaming,负责 Streaming 层面的任务调度,生成 jobs 发送到 Spark engine 处理。

    4)Spark Context: 代表 Spark Core,负责批处理层面的任务调度,真正执行 job 的 Spark engine。

    2. Receiver 从 kafka 拉取数据的过程

    该模式下:

    1)在 executor 上会有 receiver 从 kafka 接收数据并存储在 Spark executor 中,在到了 batch 时间后触发 job 去处理接收到的数据,1 个 receiver 占用 1 个 core ;

    2)为了不丢数据需要开启 WAL 机制,这会将 receiver 接收到的数据写一份备份到第三方系统上(如:HDFS );

    3)receiver 内部使用 kafka High Level API 去消费数据及自动更新 offset。

    Direct 模式

    1. Direct 模式下的运行架构

    与 receiver 模式类似,不同在于 executor 中没有 receiver 组件,从 kafka 拉去数据的方式不同。

    2. Direct 从 kafka 拉取数据的过程

    该模式下:

    1)没有 receiver,无需额外的 core 用于不停地接收数据,而是定期查询 kafka 中的每个 partition 的最新的 offset,每个批次拉取上次处理的 offset 和当前查询的 offset 的范围的数据进行处理;

    2)为了不丢数据,无需将数据备份落地,而只需要手动保存 offset 即可;

    3)内部使用 kafka simple Level API 去消费数据, 需要手动维护 offset,kafka zk 上不会自动更新 offset。

    Receiver 与 Direct 模式的区别

    1.前者在 executor 中有 Receiver 接受数据,并且 1 个 Receiver 占用一个 core ;而后者无 Receiver,所以不会暂用 core ;

    2.前者 InputDStream 的分区是 num_receiver *batchInterval/blockInteral,后者的分区数是 kafka topic partition 的数量。Receiver 模式下 num_receiver 的设置不合理会影响性能或造成资源浪费;如果设置太小,并行度不够,整个链路上接收数据将是瓶颈;如果设置太多,则会浪费资源;

    3.前者使用 zookeeper 来维护 consumer 的偏移量,而后者需要自己维护偏移量;

    4.为了保证不丢失数据,前者需要开启 WAL 机制,而后者不需要,只需要在程序中成功消费完数据后再更新偏移量即可。

    3 Receiver 改造成 Direct 模式

    个推使用 Spark Streaming 做实时处理 kafka 数据,先前使用的是 receiver 模式;

    receiver 有以下特点

    1.receiver 模式下,每个 receiver 需要单独占用一个 core ;

    2.为了保证不丢失数据,需要开启 WAL 机制,使用 checkpoint 保存状态;

    3.当 receiver 接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。

    由于以上特点,receiver 模式下会造成一定的资源浪费;使用 checkpoint 保存状态, 如果需要升级程序,则会导致 checkpoint 无法使用;第 3 点 receiver 模式下会导致程序不太稳定;并且如果设置 receiver 数量不合理也会造成性能瓶颈在 receiver。为了优化资源和程序稳定性,应将 receiver 模式改造成 direct 模式。

    修改方式如下:

    1. 修改 InputDStream 的创建

    将 receiver 的:

    val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

    改成 direct 的:

    val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume]) 

    2. 手动维护 offset

    receiver 模式代码: ( receiver 模式不需要手动维护 offset,而是内部通过 kafka consumer high level API 提交到 kafka/zk 保存)

    kafkaStream.map { ... }.foreachRDD { rdd => // 数据处理 doCompute(rdd) } 

    direct 模式代码:

    directKafkaStream.map { ... }.foreachRDD { rdd => // 获取当前 rdd 数据对应的 offset val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 数据处理 doCompute(rdd) // 自己实现保存 offset commitOffsets(offsetRanges) } 

    4 其他优化点

    1. 在 receiver 模式下

    1)拆分 InputDStream,增加 Receiver,从而增加接收数据的并行度;

    2)调整 blockInterval,适当减小,增加 task 数量,从而增加并行度(在 core 的数量>task 数量的情况下);

    3)如果开启了 WAL 机制,数据的存储级别设置为 MOMERY_AND_DISK_SER。

    2.数据序列化使用 Kryoserializationl,相比 Java serializationl 更快,序列化后的数据更小;

    3.建议使用 CMS 垃圾回收器降低 GC 开销;

    4.选择高性能的算子(mapPartitions, foreachPartitions, aggregateByKey 等);

    5.**repartition 的使用:**在 streaming 程序中因为 batch 时间特别短,所以数据量一般较小,所以 repartition 的时间短,可以解决一些因为 topicpartition 中数据分配不均匀导致的数据倾斜问题;

    6.因为 SparkStreaming 生产的 job 最终都是在 sparkcore 上运行的,所以sparkCore 的优化也很重要;

    7.BackPressure 流控

    1)为什么引入 Backpressure ? 当 batch processing time>batchinterval 这种情况持续过长的时间,会造成数据在内存中堆积,导致 Receiver 所在 Executor 内存溢出等问题;

    2)Backpressure:根据 JobScheduler 反馈作业的执行信息来动态调整数据接收率;

    3)配置使用:

    spark.streaming.backpressure.enabled 含义: 是否启用 SparkStreaming 内部的 backpressure 机制, 默认值:false ,表示禁用 spark.streaming.backpressure.initialRate 含义:receiver 为第一个 batch 接收数据时的比率 spark.streaming.receiver.maxRate 含义:receiver 接收数据的最大比率,如果设置值<=0, 则 receiver 接收数据比率不受限制 spark.streaming.kafka.maxRatePerPartition 含义: 从每个 kafka partition 中读取数据的最大比率 

    8.speculation 机制

    spark 内置 speculation 机制,推测 job 中的运行特别慢的 task,将这些 task kill,并重新调度这些 task 执行。 默认 speculation 机制是关闭的,通过以下配置参数开启:

    spark.speculation=true 

    注意:在有些情况下,开启 speculation 反而效果不好,比如:streaming 程序消费多个 topic 时,从 kafka 读取数据直接处理,没有重新分区,这时如果多个 topic 的 partition 的数据量相差较大那么可能会导致正常执行更大数据量的 task 会被认为执行缓慢,而被中途 kill 掉,这种情况下可能导致 batch 的处理时间反而变长;可以通过 repartition 来解决这个问题,但是要衡量 repartition 的时间;而在 streaming 程序中因为 batch 时间特别短,所以数据量一般较小,所以 repartition 的时间短,不像 spark_batch 一次处理大量数据一旦 repartition 则会特别久,所以最终还是要根据具体情况测试来决定。

    5 总结

    将 Receiver 模式改成 Direct 模式,实现了资源优化,提升了程序的稳定性,缺点是需要自己管理 offset,操作相对复杂。未来,个推将不断探索和优化 Spark Streaming 技术,发挥其强大的数据处理能力,为建设实时数仓提供保障。

    目前尚无回复
    关于     帮助文档     自助推广系统     博客   /span>   API     FAQ     Solana     2733 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 12:28 PVG 20:28 LAX 05:28 JFK 08:28
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86