请教这段 Python 协程代码还能如何优化? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
princelai
V2EX    Python

请教这段 Python 协程代码还能如何优化?

  •  
  •   princelai 2019-12-18 15:48:49 +08:00 5535 次点击
    这是一个创建于 2124 天前的主题,其中的信息可能已经有所发展或是发生改变。

    这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。

    import asyncio import pandas as pd import random import re async def crawler(u): """ 模拟爬虫返回结果 :param u: fake url :return: """ i = int(re.search(r"\d+", u).group(0)) await asyncio.sleep(random.random()) return {'url': u, 'result': i} async def worker(qin, qout, w): """ 消费者异步函数 :param qin: Queue1,用于生产者写入和消费者读出 :param qout: Queue2,用于让消费者回写结果 :param w: worker id :return: """ while True: if qin.empty(): break u = await qin.get() print(f"Worker-{w} crawling {u}") resp = await crawler(u) await qout.put(resp) qout.task_done() async def generate_url(url, qin): """ 生产者异步函数 :param url: :param qin: Queue1,写出生产者产出的(这里为传入的) url :return: """ await qin.put(url) print(f"Queue size = {qin.qsize()}") # qin.task_done() async def main(qmax=20): q_in = asyncio.Queue(qmax) q_out = asyncio.Queue() urls = [f"url{i}" for i in range(100)] producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls] # producers = [await q_in.put(u) for u in urls] cOnsumers= [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)] await asyncio.gather(*consumers) await asyncio.gather(*producers) # await q_in.join() await q_out.join() for c in consumers: c.cancel() return [await q_out.get() for _ in range(q_out.qsize())] if __name__ == "__main__": result = asyncio.run(main(30)) df = pd.DataFrame(result).set_index('url') 

    目前有三个问题,

    1. 被注释掉的 qin.join,qin.task_done 是需要?还是让第二个 queue 阻塞就好了?
    2. main 函数中的 producers 是我写的异步列表推导式,想用于替代 generate_url 函数,但是不能正常运行
    3. 是否还有更优雅的实现方式?
    19 条回复    2019-12-22 11:41:26 +08:00
    princelai
        1
    princelai  
    OP
       2019-12-18 17:09:45 +08:00
    没有大佬来指点下吗?
    GoLand
        2
    GoLand  
       2019-12-18 17:34:22 +08:00
    queue 是多余的。直接:
    urls = [f"url{i}" for i in range(100)]
    tasks = [crawler(url) for url in urls]
    results = await asyncio.gather(*tasks)

    就可以。
    princelai
        3
    princelai  
    OP
       2019-12-18 17:43:34 +08:00
    @GoLand #2 谢谢,不过你说这个我知道可以,我这么设计的目的我忘说了,因为 url 是本地生成的,所以会很快,如果一次性把 url 全部创建为 task,那么 gather 后会一次性创建非常多的链接链接目标网站,我怕网站受不了,也怕自己 IP 被封,所以才不得已使用生产 /消费者,用输入的 Queue 的最大容量限制爬取速度。
    ClericPy
        4
    ClericPy  
       2019-12-18 17:53:42 +08:00
    限频可以用 semphore 和 sleep, Queue 这么用有点怪
    princelai
        5
    princelai  
    OP
       2019-12-18 18:00:53 +08:00
    @ClericPy #4 那就如 2 楼的代码,不需要用 Queue,请问信号量应该在哪里写啊?
    superrichman
        6
    superrichman  
       2019-12-18 18:20:41 +08:00 via iPhone   2
    @princelai 搜一下 aiohttp 和 async with semaphore 就知道怎么写了
    ClericPy
        7
    ClericPy  
       2019-12-18 18:29:55 +08:00   1
    @princelai #5 信号量可以粗暴的理解成并发锁, 类似 golang 里 channel 的那个数字, 也就是同时并发的个数, 只有申请到的人才有资格执行, 其他人等待, 楼上已经给你看了
    你那个 producers 是想用 async for 么
    xiaozizayang
        8
    xiaozizayang  
       2019-12-18 21:40:21 +08:00
    楼主,不知道我写的异步爬虫框架能不能帮到你:

    https://github.com/howie6879/ruia

    楼上说的信号量也支持~
    gwy15
        9
    gwy15  
       2019-12-18 22:48:59 +08:00   1
    楼主你要的这个功能能抽象出来啊,没必要写这么复杂。

    我写的库(不推荐,当时没发现成熟第三方库)

    https://github.com/gwy15/async_pool

    调用方式:
    ```
    with Pool(4) as pool:
    results = pool.map(fetch, urls)
    ```

    更好更全的第三方库:

    https://github.com/h2non/paco

    调用方式:
    ```
    respOnses= await paco.map(fetch, urls, limit=3)
    ```
    princelai
        10
    princelai  
    OP
       2019-12-19 11:39:24 +08:00
    @gwy15 #9 感谢,这个库试了下,写出来很简洁,就是可能是我的 py 版本太高,在 pycharm 里有错误提示,但是稍微修改下可以正常运行。

    ```python
    import asyncio
    import random
    import re

    import paco


    async def crawler(u):
    i = int(re.search(r"\d+", u).group(0))
    await asyncio.sleep(random.random() * 3)
    print(f"crawled {u}")
    return i


    async def main():
    urls = [f"url{i}" for i in range(100)]
    gather = await paco.map(crawler, urls, limit=20)
    return gather


    if __name__ == "__main__":
    result = asyncio.run(main())

    ```
    princelai
        11
    princelai  
    OP
       2019-12-19 11:41:06 +08:00
    @ClericPy #7
    @superrichman #6

    感谢二位,用信号量的代码写出来了,比原来好很多

    ```python
    import asyncio
    import random
    import re


    async def crawler(u, sem):
    async with sem:
    i = int(re.search(r"\d+", u).group(0))
    await asyncio.sleep(random.random() * 5)
    print(f"crawled {u}")
    return i


    async def main():
    sem = asyncio.Semaphore(20)
    urls = [f"url{i}" for i in range(100)]
    tasks = [crawler(u, sem) for u in urls]
    gather = await asyncio.gather(*tasks)
    return gather


    if __name__ == "__main__":
    result = asyncio.run(main())

    ```
    princelai
        12
    princelai  
    OP
       2019-12-19 11:41:51 +08:00
    不支持 markdown 吗,格式全乱了
    yedashuai
        13
    yedashuai  
       2019-12-19 12:59:15 +08:00
    @princelai queue 的使用是不是也有一点好处,可以限制内存的使用量,如果数据量很大,所有的 tasks 都存在 list 里
    sxd96
        14
    sxd96  
       2019-12-20 23:59:20 +08:00
    @yedashuai 这些都相当于是 generator 吧?不会把所有数据都直接放进内存的吧
    sx96
        15
    sxd96  
       2019-12-21 10:01:40 +08:00 via iPhone
    @princelai 想问下如果爬虫 async 的话,requests 支持嘛?好像是要换用 httpx 或者 aiohttp ?这俩哪个比较好用?
    princelai
        16
    princelai  
    OP
       2019-12-21 19:49:04 +08:00 via Android
    @sxd96 如果所有都并发开始了在那就不是生成器,就已经在内存中运行了,我一般都用官方的 aiohttp,没用过另一个
    sxd96
        17
    sxd96  
       2019-12-21 21:17:59 +08:00
    @princelai 哦哦是这样啊。那我的需求如果是从数据库里拿 url 出来给 crawler,也就是说那边 coroutine 在跑,然后生产者在产生新的 url,是不是还是得用 asyncio.Queue ?
    princelai
        18
    princelai  
    OP
       2019-12-21 22:37:29 +08:00 via Android
    @sxd96 应该是,生产者 aiomysql 取数据存入 queue,我这么写主要是因为我的 URL 是本地生成但又想控制速度,和你的不一样
    sxd96
        19
    sxd96  
       2019-12-22 11:41:26 +08:00
    @princelai 明白了,感谢啦。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2574 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 04:47 PVG 12:47 LAX 21:47 JFK 00:47
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86