求助 Python 异步多线程下载又拍云图片问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
asuraa
V2EX    Python

求助 Python 异步多线程下载又拍云图片问题

  •  3
     
  •   asuraa 2017-09-03 21:10:06 +08:00 5053 次点击
    这是一个创建于 2965 天前的主题,其中的信息可能已经有所发展或是发生改变。

    代码

    import asyncio import base64 import os import urllib import aiohttp # ----------------------- # ----------------------- bucket = 'xxx' username = 'xxx' password = 'xxxxxx' hostname = "xxxxxx" base_save_path = 'f:' # ----------------------- headers = {} auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8')) headers['Authorization'] = 'Basic ' + str(auth) headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT" headers['x-list-limit'] = '300' thread_sleep = 1 def is_dic(url): """判断 key 是否是目录 根据是否有后缀名判断""" url = url.replace('http://v0.api.upyun.com/', '') # print(len(url.split('.'))) if len(url.split('.')) == 1: return True else: return False class Crawler: def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50): '''初始化爬虫''' self.loop = asyncio.get_event_loop() self.max_tries = 4 # 每个图片重试次数 self.max_tasks = max_tasks # 接口请求进程数 self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列 self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列 self.session = aiohttp.ClientSession(loop=self.loop) #接口异步 http 请求 self.pic_session = aiohttp.ClientSession(loop=self.loop) #图片异步 http 请求 self.key_queue.put_nowait({'key': init_key, 'x-list-iter': None, 'hostname': hostname}) #初始化接口队列 push 需要下载的目录 self.pic_tsak = pic_tsak #图片下载进程数(接口有调用频率限制,http 下载没有限制) def close(self): """回收 http session""" self.session.close() self.pic_session.close() async def work(self): """接口请求队列消费者""" try: while True: url = await self.key_queue.get() # print('key 队列数量:' + await self.key_queue.qsize()) await self.handle(url) self.key_queue.task_done() await asyncio.sleep(1) except asyncio.CancelledError: pass async def work_pic(self): """图片请求队列消费者""" try: while True: url = await self.pic_queue.get() await self.handle_pic(url) self.pic_queue.task_done() await asyncio.sleep(1) except asyncio.CancelledError: pass async def handle_pic(self, key): """处理图片请求""" url = (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) pic_url = key['hostname'] + url + '!s400' tries = 0 while tries < self.max_tries: try: respOnse= await self.pic_session.get(pic_url) break except aiohttp.ClientError: pass tries += 1 try: if is_dic(url): # print('图片线程-目录 :{}'.format(url)) cOntent= await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except Exception as e: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + str(response.status) + "`" + str(iter_header) self.do_file(self.get_list(list_json_param), key['key'], key['hostname']) else: # print('图片线程-文件:{}'.format(key['save_path'])) with open(key['save_path'], 'wb') as f: f.write(await response.read()) finally: await response.release() async def handle(self, key): """处理接口请求""" url = '/' + bucket + (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) if key['x-list-iter'] is not None: if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg': headers['X-List-Iter'] = key['x-list-iter'] tries = 0 while tries < self.max_tries: try: respOnse= await self.session.get("http://v0.api.upyun.com" + url, headers=headers) break except aiohttp.ClientError: pass tries += 1 try: if is_dic(url): # print('目录线程-目录 :{}'.format(url)) cOntent= await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except Exception as e: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + str(response.sttus) + "`" + str(iter_header) self.do_file(self.get_list(list_json_param), key['key'], key['hostname']) else: # print('目录线程-文件:{}'.format(key['save_path'])) with open(key['save_path'], 'wb') as f: f.write(await response.read()) finally: await response.release() def get_list(self, content): # print(content) if content: cOntent= content.split("`") items = content[0].split('\n') cOntent= [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \ content[2].split() return content else: return None def do_file(self, list_json, key, hostname): """处理接口数据""" for i in list_json[:-2]: if not i['name']: continue new_key = key + i['name'] if key == '/' else key + '/' + i['name'] try: if i['type'] == 'F': self.key_queue.put_nowait({'key': new_key, 'x-list-iter': None, 'hostname': hostname}) else: try: if not os.path.exists(bucket + key): os.makedirs(bucket + key) except OSError as e: print('新建文件夹错误:' + str(e)) save_path = base_save_path + '/' + bucket + new_key if not os.path.isfile(save_path): print(f'请求图片:', new_key) self.pic_queue.put_nowait( {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname}) else: print(f'文件已存在:{save_path}') except Exception as e: print('下载文件错误!:' + str(e)) with open('download_err.txt', 'a') as f: f.write(new_key + '\n') if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg': self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname}) # self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname}) async def run(self): """初始化任务进程""" workers = [asyncio.Task(self.work(), loop=self.loop) for _ in range(self.max_tasks)] workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop) for _ in range(self.pic_tsak)] await self.key_queue.join() await self.pic_queue.join() workers.append(workers_pic) for w in workers: w.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150) loop.run_until_complete(crawler.run()) crawler.close() loop.close() 

    上面是代码

    问题

    1. 以上代码执行后没什么问题。但是当长时间执行后会卡主。。百思不得其解(猜测可能是队列问题?但是无法验证)。为何到达一定时间(大约 5 小时以上)脚本会卡死?
    2. 此脚本目的为了下载又拍云所有图片保存到本地。图片量非常大(大约 10T) 3 亿张左右。目前机器的下载宽带大概在 300M/下载速度大约 30M/S ,多次联系又拍云,又拍云表示只能这样下载。无法通过邮寄硬盘直接拷贝。我们也在杭州。但是又拍云无法拷贝 还有什么特殊方法可以快速下载所有图片?
    第 1 条附言    2017-09-05 05:33:00 +08:00

    说下最终解决方案吧

    1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了
    2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定

    速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。

    感谢各位

    第 2 条附言    2017-09-11 09:44:12 +08:00

    再次补充下。最后的代码在这里

    python异步多线程超高性能爬虫爬取又拍云图片

    第 3 条附言    2017-09-23 12:22:12 +08:00
    33 条回复    2017-09-08 01:42:07 +08:00
    asuraa
        1
    asuraa  
    OP
       2017-09-03 21:12:28 +08:00
    asuraa
        2
    asuraa  
    OP
       2017-09-03 21:16:17 +08:00
    昂 这么硬的问题木有人给解答下吗
    asuraa
        3
    asuraa  
    OP
       2017-09-03 21:19:26 +08:00
    难道是保存文件的时候引起的?
    asuraa
        4
    asuraa  
    OP
       2017-09-03 21:19:39 +08:00
    在线等。。。。急
    asuraa
        5
    asuraa  
    OP
       2017-09-03 21:20:02 +08:00
    asuraa
        6
    asuraa  
    OP
       2017-09-03 21:23:04 +08:00
    更正:
    ```python
    def is_dic(url):
    """判断 key 是否是目录 根据是否有后缀名判断"""
    url = url.replace('http://v0.api.upyun.com/', '')
    # print(len(url.split('.')))
    if len(url.split('.')) > 1:
    return True
    else:
    return False
    ```
    asuraa
        7
    asuraa  
    OP
       2017-09-03 21:29:02 +08:00
    擦。。。看错了 帖子主题的代码是正确的。。
    Lax
        8
    Lax  
       2017-09-03 21:35:24 +08:00   1
    脚本运行时,用 strace 看一下卡在哪里了

    strace -p <pid>
    asuraa
        9
    asuraa  
    OP
       2017-09-03 21:36:14 +08:00
    @Lax windows 下有这个玩意么
    asuraa
        10
    asuraa  
    OP
       2017-09-03 21:36:54 +08:00
    @Lax 脚本是跑在 windows 下的。。因为硬盘要用 ntfs 格式给某部门送过去
    Lax
        11
    Lax  
       2017-09-03 21:40:34 +08:00
    另外一个比较值得怀疑的一点是,你的所有文件操作没有关闭,有可能用尽 open files 限制。
    可以对比一下这三个值:
    用户限制:ulimit -n
    进程限制:cat /proc/<pid>/limits
    实际使用:ls /proc/<pid>/fd | wc -l
    Lax
        12
    Lax  
       2017-09-03 21:41:11 +08:00
    当我没说。。
    Lax
        13
    Lax  
       2017-09-03 21:42:13 +08:00
    cygwin / mingw 之类的可能有 strace
    asuraa
        14
    asuraa  
    OP
       2017-09-03 21:43:51 +08:00
    @Lax 这里使用了 with 语句,应该能保证 with 语句执行完毕后已经关闭了打开的文件句柄。应该不是这个问题呀。
    mengskysama
        15
    mengskysama  
       2017-09-03 21:53:26 +08:00 via iPhone   1
    加个 timeout 试试,要等 tcp 的 timeout 机制触发要好久的
    asuraa
        16
    asuraa  
    OP
       2017-09-03 21:55:42 +08:00
    @mengskysama 是在每次 http 请求的时候加的吗?
    asuraa
        17
    asuraa  
    OP
       2017-09-03 21:58:30 +08:00
    @mengskysama 但是是异步的啊 应该会等待请求完成的啊
    mengskysama
        18
    mengskysama  
       2017-09-03 21:58:52 +08:00 via iPhone
    asuraa
        19
    asuraa  
    OP
       2017-09-03 21:59:56 +08:00
    @mengskysama 对于单个进程而言 会等待的啊
    mengskysama
        20
    mengskysama  
       2017-09-03 22:01:56 +08:00 via iPhone
    @luodaoyi 一个可能是池子里面 task 全塞死了,
    asuraa
        21
    asuraa  
    OP
       2017-09-03 22:18:22 +08:00
    @mengskysama http 请求引起的卡死吗?
    asuraa
        22
    asuraa  
    OP
       2017-09-03 23:02:01 +08:00
    我觉得好像找到问题了。当 asyncio 队列满了之后 会阻塞线程。但是我这里用的 put_nowait

    http://python.usyiyi.cn/translate/python_352/library/asyncio-queue.html
    put_nowait(item)
    将项目放入队列而不阻塞。

    如果没有可用的空位,引发 QueueFull。
    NoAnyLove
        23
    NoAnyLove  
       2017-09-03 23:10:40 +08:00
    @luodaoyi 又没有设置 queue 的 max_size,怎么会 QueueFull
    NoAnyLove
        24
    NoAnyLove  
       2017-09-03 23:20:44 +08:00   1
    一些细节的东西:

    * 用一个 ClientSession 就好,或者多个 ClientSession 用同一个 TCPConnector
    * session.get 要加 timeout,我以前遇到过卡死过在请求上
    * response 可以用 async with 打开,可靠性和可读性都有提高
    * Windows 下要确认是不是有很多文件没有关闭,可以用 OpenedFilesViw
    asuraa
        25
    asuraa  
    OP
       2017-09-03 23:25:12 +08:00
    另外写文件使用的 io 阻塞操作

    写文件使用 aiofiles 实现异步写操作

    async with aiofiles.open('download_err.txt', 'a') as f:
    await f.write(new_key + '\n')
    asuraa
        26
    asuraa  
    OP
       2017-09-03 23:25:28 +08:00
    跑跑看 有问题在此贴继续讨论
    asuraa
        27
    asuraa  
    OP
       2017-09-03 23:40:54 +08:00
    @NoAnyLove 感谢 我都试试看 谢谢
    asuraa
        28
    asuraa  
    OP
       2017-09-04 00:15:24 +08:00
    @NoAnyLove 按您说的几点
    1. ClientSession 已改为一个
    2. session.get timeout=60
    3. response 使用 async with 打开

    目前再跑 明日看看会不会还卡死
    ysc3839
        29
    ysc3839  
       2017-09-04 01:20:41 +08:00 via Android
    @luodaoyi Windows 的话,我只知道 Visual Studio 能直接附加调试 Python 代码,不知道是啥黑科技。
    mert472114271
        30
    mert472114271  
       2017-09-04 01:55:33 +08:00
    在怀疑的代码快前后加两个变量计数,程序加个中断信号控制器,打印计数的变量
    asuraa
        31
    asuraa  
    OP
       2017-09-04 12:02:22 +08:00
    @ysc3839 目前没卡死了 一台电脑因为队列太大内存爆了
    @mert472114271 目前没有卡死了 一直在运行
    asuraa
        32
    asuraa  
    OP
       2017-09-05 05:31:53 +08:00
    说下最终解决方案吧
    1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了
    2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定

    速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。

    感谢各位
    xbtlin
        33
    xbtlin  
       2017-09-08 01:42:07 +08:00
    m
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1516 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 16:27 PVG 00:27 LAX 09:27 JFK 12:27
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86