import asyncio import base64 import os import urllib import aiohttp # ----------------------- # ----------------------- bucket = 'xxx' username = 'xxx' password = 'xxxxxx' hostname = "xxxxxx" base_save_path = 'f:' # ----------------------- headers = {} auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8')) headers['Authorization'] = 'Basic ' + str(auth) headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT" headers['x-list-limit'] = '300' thread_sleep = 1 def is_dic(url): """判断 key 是否是目录 根据是否有后缀名判断""" url = url.replace('http://v0.api.upyun.com/', '') # print(len(url.split('.'))) if len(url.split('.')) == 1: return True else: return False class Crawler: def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50): '''初始化爬虫''' self.loop = asyncio.get_event_loop() self.max_tries = 4 # 每个图片重试次数 self.max_tasks = max_tasks # 接口请求进程数 self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列 self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列 self.session = aiohttp.ClientSession(loop=self.loop) #接口异步 http 请求 self.pic_session = aiohttp.ClientSession(loop=self.loop) #图片异步 http 请求 self.key_queue.put_nowait({'key': init_key, 'x-list-iter': None, 'hostname': hostname}) #初始化接口队列 push 需要下载的目录 self.pic_tsak = pic_tsak #图片下载进程数(接口有调用频率限制,http 下载没有限制) def close(self): """回收 http session""" self.session.close() self.pic_session.close() async def work(self): """接口请求队列消费者""" try: while True: url = await self.key_queue.get() # print('key 队列数量:' + await self.key_queue.qsize()) await self.handle(url) self.key_queue.task_done() await asyncio.sleep(1) except asyncio.CancelledError: pass async def work_pic(self): """图片请求队列消费者""" try: while True: url = await self.pic_queue.get() await self.handle_pic(url) self.pic_queue.task_done() await asyncio.sleep(1) except asyncio.CancelledError: pass async def handle_pic(self, key): """处理图片请求""" url = (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) pic_url = key['hostname'] + url + '!s400' tries = 0 while tries < self.max_tries: try: respOnse= await self.pic_session.get(pic_url) break except aiohttp.ClientError: pass tries += 1 try: if is_dic(url): # print('图片线程-目录 :{}'.format(url)) cOntent= await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except Exception as e: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + str(response.status) + "`" + str(iter_header) self.do_file(self.get_list(list_json_param), key['key'], key['hostname']) else: # print('图片线程-文件:{}'.format(key['save_path'])) with open(key['save_path'], 'wb') as f: f.write(await response.read()) finally: await response.release() async def handle(self, key): """处理接口请求""" url = '/' + bucket + (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) if key['x-list-iter'] is not None: if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg': headers['X-List-Iter'] = key['x-list-iter'] tries = 0 while tries < self.max_tries: try: respOnse= await self.session.get("http://v0.api.upyun.com" + url, headers=headers) break except aiohttp.ClientError: pass tries += 1 try: if is_dic(url): # print('目录线程-目录 :{}'.format(url)) cOntent= await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except Exception as e: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + str(response.sttus) + "`" + str(iter_header) self.do_file(self.get_list(list_json_param), key['key'], key['hostname']) else: # print('目录线程-文件:{}'.format(key['save_path'])) with open(key['save_path'], 'wb') as f: f.write(await response.read()) finally: await response.release() def get_list(self, content): # print(content) if content: cOntent= content.split("`") items = content[0].split('\n') cOntent= [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \ content[2].split() return content else: return None def do_file(self, list_json, key, hostname): """处理接口数据""" for i in list_json[:-2]: if not i['name']: continue new_key = key + i['name'] if key == '/' else key + '/' + i['name'] try: if i['type'] == 'F': self.key_queue.put_nowait({'key': new_key, 'x-list-iter': None, 'hostname': hostname}) else: try: if not os.path.exists(bucket + key): os.makedirs(bucket + key) except OSError as e: print('新建文件夹错误:' + str(e)) save_path = base_save_path + '/' + bucket + new_key if not os.path.isfile(save_path): print(f'请求图片:', new_key) self.pic_queue.put_nowait( {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname}) else: print(f'文件已存在:{save_path}') except Exception as e: print('下载文件错误!:' + str(e)) with open('download_err.txt', 'a') as f: f.write(new_key + '\n') if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg': self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname}) # self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname}) async def run(self): """初始化任务进程""" workers = [asyncio.Task(self.work(), loop=self.loop) for _ in range(self.max_tasks)] workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop) for _ in range(self.pic_tsak)] await self.key_queue.join() await self.pic_queue.join() workers.append(workers_pic) for w in workers: w.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150) loop.run_until_complete(crawler.run()) crawler.close() loop.close()
上面是代码
说下最终解决方案吧
速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。
感谢各位
再次补充下。最后的代码在这里
1 asuraa OP 又拍云官方回复的脚本连接在此:[upyun-sdk-script]( https://github.com/monkey-wenjun/upyun-sdk-script/blob/master/download_file/download_file_with_iter.py) |
2 asuraa OP 昂 这么硬的问题木有人给解答下吗 |
3 asuraa OP 难道是保存文件的时候引起的? |
4 asuraa OP 在线等。。。。急 |
6 asuraa OP 更正: ```python def is_dic(url): """判断 key 是否是目录 根据是否有后缀名判断""" url = url.replace('http://v0.api.upyun.com/', '') # print(len(url.split('.'))) if len(url.split('.')) > 1: return True else: return False ``` |
7 asuraa OP 擦。。。看错了 帖子主题的代码是正确的。。 |
8 Lax 2017-09-03 21:35:24 +08:00 ![]() 脚本运行时,用 strace 看一下卡在哪里了 strace -p <pid> |
11 Lax 2017-09-03 21:40:34 +08:00 另外一个比较值得怀疑的一点是,你的所有文件操作没有关闭,有可能用尽 open files 限制。 可以对比一下这三个值: 用户限制:ulimit -n 进程限制:cat /proc/<pid>/limits 实际使用:ls /proc/<pid>/fd | wc -l |
12 Lax 2017-09-03 21:41:11 +08:00 当我没说。。 |
13 Lax 2017-09-03 21:42:13 +08:00 cygwin / mingw 之类的可能有 strace |
14 asuraa OP @Lax 这里使用了 with 语句,应该能保证 with 语句执行完毕后已经关闭了打开的文件句柄。应该不是这个问题呀。 |
![]() | 15 mengskysama 2017-09-03 21:53:26 +08:00 via iPhone ![]() 加个 timeout 试试,要等 tcp 的 timeout 机制触发要好久的 |
16 asuraa OP @mengskysama 是在每次 http 请求的时候加的吗? |
17 asuraa OP @mengskysama 但是是异步的啊 应该会等待请求完成的啊 |
![]() | 18 mengskysama 2017-09-03 21:58:52 +08:00 via iPhone @luodaoyi 对 |
19 asuraa OP @mengskysama 对于单个进程而言 会等待的啊 |
![]() | 20 mengskysama 2017-09-03 22:01:56 +08:00 via iPhone @luodaoyi 一个可能是池子里面 task 全塞死了, |
21 asuraa OP @mengskysama http 请求引起的卡死吗? |
22 asuraa OP 我觉得好像找到问题了。当 asyncio 队列满了之后 会阻塞线程。但是我这里用的 put_nowait http://python.usyiyi.cn/translate/python_352/library/asyncio-queue.html put_nowait(item) 将项目放入队列而不阻塞。 如果没有可用的空位,引发 QueueFull。 |
24 NoAnyLove 2017-09-03 23:20:44 +08:00 ![]() 一些细节的东西: * 用一个 ClientSession 就好,或者多个 ClientSession 用同一个 TCPConnector * session.get 要加 timeout,我以前遇到过卡死过在请求上 * response 可以用 async with 打开,可靠性和可读性都有提高 * Windows 下要确认是不是有很多文件没有关闭,可以用 OpenedFilesViw |
25 asuraa OP 另外写文件使用的 io 阻塞操作 写文件使用 aiofiles 实现异步写操作 async with aiofiles.open('download_err.txt', 'a') as f: await f.write(new_key + '\n') |
26 asuraa OP 跑跑看 有问题在此贴继续讨论 |
28 asuraa OP @NoAnyLove 按您说的几点 1. ClientSession 已改为一个 2. session.get timeout=60 3. response 使用 async with 打开 目前再跑 明日看看会不会还卡死 |
![]() | 29 ysc3839 2017-09-04 01:20:41 +08:00 via Android @luodaoyi Windows 的话,我只知道 Visual Studio 能直接附加调试 Python 代码,不知道是啥黑科技。 |
30 mert472114271 2017-09-04 01:55:33 +08:00 在怀疑的代码快前后加两个变量计数,程序加个中断信号控制器,打印计数的变量 |
31 asuraa OP |
32 asuraa OP 说下最终解决方案吧 1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了 2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定 速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。 感谢各位 |
![]() | 33 xbtlin 2017-09-08 01:42:07 +08:00 m |