
使用 Scrapy 爬取内容,使用 Pipeline 将处理后内容 POST 到远程。
使用request是同步的会有阻塞。
使用scrapy.FormRequest受限于全局DOWNLOAD_DELAY限制,每次 POST 都会有延迟。
使用 treq ,不确定如何获取内容
async def process_post(self, url, data): req = treq.post(url, data=data) res = await deferred_to_future(req) return res 如何获取 res 的内容,打印是<treq.response._Response 200 'text/html; charset=' unknown size>
1 encro 2023-12-16 13:46:47 +08:00 不看源码? 看 treq.response._Response 的源码啊!!! |
3 kekeones OP |
4 ayugesheng 2023-12-27 17:56:19 +08:00 @kekeones 既然都 async 了,推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例: import asyncio import aiohttp from scrapy.utils.defer import deferred_from_coro class DemoPipeline: def __init__(self) -> None: # 一些参数初始化 pass def open_spider(self, spider): # 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等 return deferred_from_coro(self._open_spider(spider)) async def _open_spider(self, spider): # 这里一般是连接池,async 连接等预备操作 await asyncio.sleep(0.1) async def process_item(self, item, spider): # 这里可以使用一些 async 存储库来实现存储逻辑 ... # 看你想 post 到 data 还是 form # post_data = json.dumps('{"content": "test"}') post_data = {"content": "test"} async with aiohttp.ClientSession() as session: async with session.post( "http://httpbin.org/post", data=post_data ) as additional_response: # 获取响应内容 additional_data = await additional_response.text() print("additional_data:", additional_data) return item async def _close_spider(self): # 这里一般是 async 连接或连接池关闭逻辑 await asyncio.sleep(0.1) def close_spider(self, spider): return deferred_from_coro(self._close_spider()) 注意: 使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor" |
5 ayugesheng 2023-12-27 17:58:27 +08:00 @kekeones 推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例: ``` import asyncio import aiohttp from scrapy.utils.defer import deferred_from_coro class DemoPipeline: def __init__(self) -> None: # 一些参数初始化 pass def open_spider(self, spider): # 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等 return deferred_from_coro(self._open_spider(spider)) async def _open_spider(self, spider): # 这里一般是连接池,async 连接等预备操作 await asyncio.sleep(0.1) async def process_item(self, item, spider): # 这里可以使用一些 async 存储库来实现存储逻辑 ... # 看你想 post 到 data 还是 form # post_data = json.dumps('{"content": "test"}') post_data = {"content": "test"} async with aiohttp.ClientSession() as session: async with session.post( "http://httpbin.org/post", data=post_data ) as additional_response: # 获取响应内容 additional_data = await additional_response.text() print("additional_data:", additional_data) return item async def _close_spider(self): # 这里一般是 async 连接或连接池关闭逻辑 await asyncio.sleep(0.1) def close_spider(self, spider): return deferred_from_coro(self._close_spider()) ``` 注意: 使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor" 以上代码乱了,无语,重发一次。 |
6 ayugesheng 2023-12-27 18:00:06 +08:00 不好意思,v2ex 是不支持 markdown 吗,不怎么在论坛发东西。 |
7 kekeones OP 好的,谢谢了哈,后面用了 treq 来处理了。 |