使用生成器把 Kafka 写入速度提高 1000 倍 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
locktionc
V2EX    分享创造

使用生成器把 Kafka 写入速度提高 1000 倍

  •  1
     
  •   locktionc 2018-04-13 23:50:10 +08:00 5566 次点击
    这是一个创建于 2739 天前的主题,其中的信息可能已经有所发展或是发生改变。

    原文地址:使用生成器把 Kafka 写入速度提高 1000 倍

    疑惑

    多年以前,当我刚刚开始学习 Python 协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部可以随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:

    import time def consumer(): product = None while True: if product is not None: print('consumer: {}'.format(product)) product = yield None def producer(): c = consumer() next(c) for i in range(10): c.send(i) start = time.time() producer() end = time.time() print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒') 

    运行效果如下图所示。

    这些文章的说法,就像统一好了口径一样,说这样写可以减少线程切换开销,从而大大提高程序的运行效率。但是当年我始终想不明白,这种写法与直接调用函数有什么区别,如下图所示。

    直到后来我需要操作 Kafka 的时候,我明白了使用 yield 的好处。

    探索

    为了便于理解,我会把实际场景做一些简化,以方便说明事件的产生发展和解决过程。事件的起因是我需要把一些信息写入到 Kafka 中,我的代码一开始是这样的:

    import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product): with topic.get_producer(delivery_reports=True) as producer: producer.produce(str(product).encode()) def feed(): for i in range(10): consumer(i) start = time.time() feed() end = time.time() print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒') 

    这段代码的运行效果如下图所示。

    写入 10 条数据需要 100 秒,这样的龟速显然是有问题的。问题就出在这一句代码:

    with topic.get_producer(delivery_reports=True) as producer 

    获得 Kafka 生产者对象是一个非常耗费时间的过程,每获取一次都需要 10 秒钟才能完成。所以写入 10 个数据就获取十次生产者对象。这消耗的 100 秒主要就是在获取生产者对象,而真正写入数据的时间短到可以忽略不计。

    由于生产者对象是可以复用的,于是我对代码作了一些修改:

    import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] products = [] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): for i in range(10): products.append(i) consumer(products) start = time.time() feed() end = time.time() print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒') 

    首先把所有数据存放在一个列表中,最后再一次性给 consumer 函数。在一个 Kafka 生产者对象中展开列表,再把数据一条一条塞入 Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。

    这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。

    于是我又修改了代码。每 100 条数据保存一次,并清空暂存的列表:

    import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): products = [] for i in range(1003): products.append(i) if len(products) >= 100: consumer(products) products = [] if products: consumer(products) start = time.time() feed() end = time.time() print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒') 

    由于最后一轮循环可能无法凑够 100 条数据,所以feed函数里面,循环结束以后还需要判断products列表是否为空,如果不为空,还要再消费一次。这样的写法,在上面这段代码中,一共 1003 条数据,每 100 条数据获取一次生产者对象,那么需要获取 11 次生产者对象,耗时至少为 110 秒。

    显然,要解决这个问题,最直接的办法就是减少获取 Kafka 生产者对象的次数并最大限度复用生产者对象。如果读者举一反三的能力比较强,那么根据开关文件的两种写法:

    # 写法一 with open('test.txt', 'w', encoding='utf-8') as f: f.write('xxx') # 写法二 f = open('test.txt', 'w', encoding='utf-8') f.write('xxx') f.close() 

    可以推测出获取 Kafka 生产者对象的另一种写法:

    # 写法二 producer = topic.get_producer(delivery_reports=True) producer.produce(b'xxxx') producer.close() 

    这样一来,只要获取一次生产者对象并把它作为全局变量就可以一直使用了。

    然而,pykafka 的官方文档中使用的是第一种写法,通过上下文管理器with来获得生产者对象。暂且不论第二种方式是否会报错,只从写法上来说,第二种方式必需要手动关闭对象。开发者经常会出现开了忘记关的情况,从而导致很多问题。而且如果中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然需要开发者手动关闭。

    函数 VS 生成器

    但是如果使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是 yield 派上用场的时候。

    首先需要明白,使用 yield 以后,函数就变成了一个生成器。生成器与普通函数的不同之处可以通过下面两段代码来进行说明:

    def funciton(i): print('进入') print(i) print('结束') for i in range(5): funciton(i) 

    运行效果如下图所示。

    函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。

    而生成器可以从中间开始运行,从中间跳出。例如下面的代码:

    def generator(): print('进入') i = None while True: if i is not None: print(i) print('跳出') i = yield None g = generator() next(g) for i in range(5): g.send(i) 

    运行效果如下图所示。

    从图中可以看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据可以通过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据以后继续执行下一轮while循环,打印出被传进来的内容,然后到i = yield None的时候又跳出。如此反复。

    所以回到最开始的 Kafka 问题。如果把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只需要获取一次 Kafka 生产者对象,然后就可以一直使用了?

    根据这个逻辑,设计如下代码:

    import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(): with topic.get_producer(delivery_reports=True) as producer: print('init finished..') next_data = '' while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def feed(): c = consumer() next(c) for i in range(1000): c.send(i) start = time.time() feed() end = time.time() print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒') 

    这一次直接插入 1000 条数据,总共只需要 10 秒钟,相比于每插入一次都获取一次 Kafka 生产者对象的方法,效率提高了 1000 倍。运行效果如下图所示。

    后记

    读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。但是第一段代码,也就是网上很多人讲 yield 的时候举的生产者-消费者的例子之所以会让人觉得毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差别。而我最后这一段代码,它的消费者分成两个部分,第一部分是获取 Kafka 生产者对象,这个过程非常耗时;第二部分是把数据通过 Kafka 生产者对象插入 Kafka,这一部分运行速度极快。在这种情况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优势。

    12 条回复    2018-04-15 21:14:10 +08:00
    ligyxy
        1
    ligyxy  
    2018-04-14 02:58:15 +08:00   1
    1. 速度慢是因为它写错了,和生成器没有关系
    2. 根据这篇文章,confluent_kafka 有更好的效率 http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/
    3. 既然你提到生成器,aiokafka 也可以了解一下
    kingname
        2
    kingname  
       2018-04-14 07:27:10 +08:00 via iPhone
    @ligyxy 这篇文章的目的是列举一个更合适的 yield 应用场景。kafka 只是一个例子而已。
    lusi1990
        3
    lusi1990  
       2018-04-14 09:43:38 +08:00 via Android
    mark 回去了解一下
    laxenade
        4
    laxenade  
       2018-04-14 10:19:48 +08:00
    脑洞:如果你把 with ... as producer 放在 feed()里,然后把 producer 传进 consumer()会怎么样,理论上也能解决问题吧。(就单线程而言)
    locktionc
        5
    locktionc  
    OP
       2018-04-14 10:38:03 +08:00
    @laxenade 是这样的,解决方法有很多种。我觉得我这篇文章的标题没有取好。Kafka 只是一个例子而已。结果大家都去关心 Kafka 去了。
    mengzx
        6
    mengzx  
       2018-04-14 11:04:29 +08:00 via Android
    mark
    freshpassport
        7
    freshpassport  
       2018-04-14 16:08:33 +08:00
    谢谢分享
    KIDJourney
        8
    KIDJourney  
       2018-04-15 12:29:06 +08:00
    。。。。这明显是代码问题。
    KIDJourney
        9
    KIDJourney  
       2018-04-15 12:29:34 +08:00
    我理解把 bug 修好不算优化。
    dbdd
        10
    dbdd  
       2018-04-15 20:10:11 +08:00 via iPhone   1
    想起了

    通过去掉代码中预先写好的 sleep 把程序速度提高 1000 倍
    locktionc
        11
    locktionc  
    OP
       2018-04-15 21:06:24 +08:00
    @KIDJourney 是我的标题取的不好。我只是想写 yield 的一个应用场景。Kafka 只是一个例子而已。这篇文章不是讲 Kafka 优化的。
    KIDJourney
        12
    KIDJourney  
       2018-04-15 21:14:10 +08:00
    @locktionc 你要解决的无非是 kafka client init 时间过长,你用生成器无非是绕了个圈子解决了这个问题。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5406 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 28ms UTC 07:42 PVG 15:42 LAX 00:42 JFK 03:42
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86