Python asyncio 求助,要被搞疯了 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Drahcir
V2EX    Python

Python asyncio 求助,要被搞疯了

  •  
  •   Drahcir 2023-02-26 12:30:30 +08:00 3804 次点击
    这是一个创建于 1034 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我最近在尝试使用 FastAPI + Prefect 2(2.8.3),目的是使用 FastAPI 构建的 RESTful API 触发 Prefect 构建的 Workflow 。 代码如下: main.py

    from fastapi import FastAPI from test_workflow import test app = FastAPI( title="Data Services", description="REST API for data services", version="0.1.0", ) @app.get("/") async def root(): return {"message": "Data Services"} @app.get("/test") def trigger(): test() return "completed" 

    test_workflow.py

    from prefect import flow, task import time @task def sleep1(): print("sleep1") time.sleep(5) //模拟需要一定运行时间的计算流 return 1 @flow() def test(): task_1 = sleep1.submit() x = task_1.result() print(x) 

    需首先运行下列命令开启服务:

    $ prefect server start

    $ uvicorn main:app --reload

    如果尝试 GET /test ,会得到如下错误:

    RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop

    如果 FastAPI 中定义为异步函数async def trigger():, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。

    想了很多办法都没法解决,有没有懂这方面的帮忙看看?

    8 条回复    2023-02-26 14:25:33 +08:00
    louistayd2
        1
    louistayd2  
       2023-02-26 13:06:09 +08:00
    问问 ChatGPT:


    这个错误通常发生在应用程序中存在多个事件循环时。根据你的描述,这可能是因为 FastAPI 和 Prefect 分别使用不同的事件循环,导致了该错误。

    为了解决这个问题,你可以尝试使用一个事件循环来管理整个应用程序,这样就可以避免出现多个事件循环的情况。以下是一个可能的解决方案:

    main.py 中创建一个事件循环对象并将其作为参数传递给 test() 函数,确保 FastAPI 和 Prefect 使用相同的事件循环。

    ```python
    import asyncio

    loop = asyncio.get_event_loop()

    @app.get("/test")
    def trigger():
    loop.run_until_complete(test(loop))
    return "completed"
    ```

    test_workflow.py 中,将任务函数装饰器从 @task 改为 @task(run_on_executor=True),以确保它在一个新的线程中运行,从而不会与 FastAPI 使用的事件循环发生冲突。

    ```python
    from prefect import flow, task
    import time

    @task(run_on_executor=True)
    def sleep1():
    print("sleep1")
    time.sleep(5)
    return 1

    @flow
    def test(loop):
    with loop:
    task_1 = sleep1.submit()
    x = task_1.result()
    print(x)
    ```

    这样,通过将事件循环对象传递给 test() 函数并使用 @task(run_on_executor=True) 装饰器,你可以确保 FastAPI 和 Prefect 使用相同的事件循环,并且在运行任务时不会发生事件循环冲突。
    NCZkevin
        2
    NCZkevin  
       2023-02-26 13:19:16 +08:00   2
    没用过 Prefect 2(2.8.3),出现这种情况大概率是因为在异步里使用了同步。 可以考虑把 test() 放到 loop.run_in_executor 去异步调用。
    ```
    @app.get("/test")
    async def trigger():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, test)
    return "completed"
    ```
    Drahcir
        3
    Drahcir  
    OP
       2023-02-26 13:40:22 +08:00
    @louistayd2 虽然 AI 给的细节不对,比如 @task 并不存在 run_on_executor 这个参数。但是我感觉思路是对的,如果能让 Prefect 2 使用 FastAPI 的事件循环或许能解决这个问题。不过我不太了解 Prefect 2 源代码,暂时无解。
    slowgen
        4
    slowgen  
       2023-02-26 13:48:45 +08:00 via Android   1
    常见错误了,异步循环里别用同步的库,time.sleep 改 asyncio.sleep
    Drahcir
        5
    Drahcir  
    OP
       2023-02-26 13:50:58 +08:00
    @NCZkevin 你好,谢谢,用这种解决方案的话,确实 FastAPI 本身不会被阻塞。比如说如果我执行了 GET /test, 在 sleep 的同时,我可以继续访问 /docs 页面。
    不过,问题是如果我想触发同一个请求的话,还是会按顺序执行。比如说,如果我同时执行两个 GET /test ,那么还是会一个接一个执行,不能并行。
    实际上,我试了试 uvicorn 指定多个 worker ,发现即使是多进程下仍有这个问题。

    这次,直接抛开 Prefect 测试:

    ```Python
    def sleep():
    print("sleep", time.time())
    time.sleep(5)

    @app.get("/test")
    async def trigger():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sleep)
    return "completed"

    ```
    Drahcir
        6
    Drahcir  
    OP
       2023-02-26 13:54:19 +08:00
    @shuimugan 我用 time.sleep 只是为了模拟一个需要一定运行时间的 blocking 计算过程。这个计算过程是阻塞的,如果用 async def 就会在这段时间内无响应。
    理论上,直接用 def 的话 FastAPI 会自动用多线程运行,但问题是出错 RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop
    NCZkevin
        7
    NCZkevin  
       2023-02-26 14:03:07 +08:00   1
    @Drahcir 我试了下,就用这个代码,用 ab 测试同时发 10 个请求看了下结果是并行的呀。
    Drahcir
        8
    Drahcir  
    OP
       2023-02-26 14:25:33 +08:00
    @NCZkevin 不好意思,是我的问题
    我用 /docs 页面手动测试时有这个问题。不过我在命令行里面测试,发现的确是并行的。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2619 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 14:46 PVG 22:46 LAX 06:46 JFK 09:46
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86