使用concurrent.futures
创建线程池然后使用asyncio
的run_in_executor
方法去利用线程池执行某异步任务,具体代码如下:
# -*- coding: utf-8 -*- import asyncio import concurrent.futures import threading import time import logging from datetime import datetime logger = logging.getLogger(__file__) logger.setLevel('INFO') async def run_task(val): logger.info(f"run_task Owning thread - {threading.currentThread().native_id} {threading.currentThread().getName()}") await asyncio.sleep(val) return datetime.now() def thread_task(loop, val): logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}") v = asyncio.run_coroutine_threadsafe(run_task(val), loop) result = v.result() logger.info(f"thread - {threading.currentThread().ident} {threading.currentThread().getName()} dOne==== {result}") async def main(loop): result = [0.1, 0.2, 0.3, 0.4, 0.5] logger.info(f"main thread :: {threading.currentThread().ident} {threading.currentThread().getName()}") with concurrent.futures.ThreadPoolExecutor() as pool: blocking_tasks = [ loop.run_in_executor( pool, thread_task, loop, val ) for val in result ] await asyncio.gather(*blocking_tasks) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()
使用run_in_executor
调用thread_task
方法,再在子线程中使用 asyncio.run_coroutine_threadsafe
方法去调用真正的异步任务,而实际上源码中 asyncio.run_coroutine_threadsafe
方法通过ensure_future
创建了一个future
添加到主线程的 event loop 中并绑定当前线程 future,子线程的异步任务在主线程中被循环,循环完成后再从子线程中去获取结果
感觉上不如直接在主线程中使用asyncio.gather
或者 asyncio.as_completed
去并发执行这些任务,另一种办法是每个子线程再设置一个 loop 去进行事件循环,但是实际测试中这几种方案性能相差并不多
# 使用 asyncio.gather 或 asyncio.as_completed task_list = [] for val in result: task_list.append(asyncio.create_task(run_task(val))) await asyncio.gather(*task_list) for f in asyncio.as_completed(task_list, loop=loop): results = await f # 设置新的 loop def thread_task(val): logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}") loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) v = loop.run_until_complete(asyncio.gather(run_task(val))) logger.info(f"thread - {threading.currentThread().ident} {threading.currentThread().getName()} dOne==== {result}") finally: loop.close()
大家有什么好的多线程+协程的实现方案吗,这里对应的场景是同时处理多个文件的 io 任务
&nbs; 1 liuxingdeyu 2021-03-02 10:09:01 +08:00 没太搞明白,线程加协程的意义 |
![]() | 2 opengo OP @liuxingdeyu 可以多线程去进行网络请求,文件 IO,这里我想利用多线程去处理文件 IO,利用协程去提高性能,多进程+协程的方式面对大量文件 IO 操作性能更好,但是资源开销也更大,所以想尝试多线程+协程的最优方案 |
![]() | 3 linw1995 2021-03-02 11:37:47 +08:00 应该等同,但会更慢 |