一个是 asyncio run_forever (我希望它一直运行,维护一个队列)的协程函数, 用 celery 装饰成 task,这样做有效吗,代码像这样的:
import asyncio improt celery async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) def worker(): loop = asyncio.get_event_loop() for _ in range(_concurrency): loop.create_task(do_some_work(x)) loop.run_forever() @app.task def run(msg): worker()
![]() | 1 dingyaguang117 2019-12-09 09:35:57 +08:00 - - 这样做的意义是啥? |
![]() | 2 Harlaus OP @dingyaguang117 asyncio 的代码写好了,不想改 |
3 superrichman 2019-12-09 10:05:17 +08:00 没有试过 celery, 我用 apscheduler 的 AsyncIOScheduler 可以和 asyncio 一起使用 |
![]() | 4 hustlibraco 2019-12-09 10:10:36 +08:00 run_forever 肯定不行,run_until_complete 应该可以,但是这样做很不好,还是不要偷懒了。python 代码改起来也不是很麻烦。 |
![]() | 5 rogwan 2019-12-09 10:17:10 +08:00 via iPhone 程序是拿来解决问题的,不是拿来玩死自己的。 |
![]() | 6 forrestshuang 2019-12-09 10:19:13 +08:00 celery 本身就是异步的 |
![]() | 7 ClericPy 2019-12-09 10:19:47 +08:00 协程可以挂在任何线程上跑, 非主线程上跑事件循环没什么经验的话有一定可能出问题, 还是那句, 何必呢 把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop |
![]() | 8 Harlaus OP @hustlibraco 也不完全是偷懒的原因,我用 asyncio 和 concurrent.futures 写了一个 work with 队列的代码( https://gitee.com/Harlaus/pipenode/blob/master/pipenode/server.py)见笑了非常粗糙,我想用它集成我写的一些任务,同时我又不想放弃 celery,就酱 |
![]() | 9 hustlibraco 2019-12-09 10:28:44 +08:00 @Harlaus 你可以把队列的部分去掉,替换成 celery,loop 只能有一个,然后去调度 celery 的 task。 |
![]() | 10 Harlaus OP @ClericPy ‘把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop' 现在已经是了 |
11 lolizeppelin 2019-12-09 15:22:52 +08:00 至少之前我看 kombu 代码的时候还不支持 asyncio asyncio 就在大量库支持之前根本没法用,还不如老老实实 eventlet |
12 18620610600 2020-04-23 17:19:03 +08:00 celery 官方要 5.0 才支持 asyncio 我的是这么实现在 celery 中跑 async def 的 ``` import asyncio def run_async(coro): return asyncio.run(coro) @app.task def celery_task(*args, **kwargs): return run_async(async_func(*args, **kwargs)) async def async_func(*args, **kwargs): rv = await sub_func() # do sth return rv async def sub_func(): return 1 # Usage: def view(request): task = celery_task.delay(request) return Response({'task_id': task.id}) ``` |