
在使用 python 协程下载图片中,最终协程的任务数卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
这个是运行的结果,在任务数为 80 一直卡着
队列是否为空.... 80 队列是否为空.... 80 . . . 队列是为空.... 80 队列是否为空.... 80 下面贴上代码
from lxml import etree import os import pandas as pd import asyncio import aiohttp from random import randint import cchardet import aiofiles import logging class sikupicture_Spider(object): def __init__(self): # self.seens_url = [] self.loop = asyncio.get_event_loop() self.queue = asyncio.PriorityQueue() self._workers = 0 # 当前工作数 self._max_workers = 150 # 最大工作数 self.overtime = {} # {url: times,} 记录失败的 URL 的次数 self.overtime_threshold = 4 self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36", } self.list_cOntent= [] async def init_url(self): info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('') for ite in info.itertuples(): await self.queue.put((randint(1, 5), getattr(ite, 'url'))) async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None): _headers = self.headers if headers: _headers = headers try: async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp: status_code = resp.status if status_code == 403: print("url-403", url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None if binary: text = await resp.read() encoding = cchardet.detect(text) html = text.encode(encoding, errors='ignore') else: html = await resp.text() except TimeoutError: print("url-overtime", url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None return status_code, html async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None): _headers = self.headers if headers: _headers = headers try: async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp: status_code = resp.status if binary: html = await resp.read() else: html = await resp.text() except TimeoutError: print("url-overtime", img_url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass else: await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None return status_code, html def parse_source(self, source): try: response_1 = etree.HTML(source) except Exception as err: logging.error(f'parse error:{err}') url = "" else: img_url = response_1.xpath("//a[@href='Javascript:;']/@supsrc")[0] if len( response_1.xpath("//a[@href='Javascript:;']/@supsrc")[0]) else "" return img_url async def process(self, session, url, timeout): status, source = await self.fetch(session, url, timeout) file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "") if status == 200: img_url = self.parse_source(source) img_status, img_source = await self.download_img(session, img_url, timeout, url) if img_status == 200: async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f: await f.write(img_source) self._workers -= 1 print("任务完成", self._workers, "url_status", status, "img_status", img_status) else: self._workers -= 1 print("任务完成", self._workers, "url_status", status,) async def loop_crawl(self): await self.init_url() timeout = aiohttp.ClientTimeout(total=20) cOnn= aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) session = aiohttp.ClientSession(cOnnector=conn, timeout=timeout) while True: if self._workers >= self._max_workers: print("work 的判断") await asyncio.sleep(5) continue if self.queue.empty(): print("队列是否为空....", self._workers) await asyncio.sleep(5) if self._workers == 0: break continue _, url = await self.queue.get() asyncio.ensure_future(self.process(session, url, timeout)) self._workers += 1 print("队列剩余数量", self.queue.qsize(), self._workers) await session.close() def run(self): try: self.loop.run_until_complete(self.loop_crawl()) except KeyboardInterrupt: self.loop.close() if __name__ == '__main__': sp = sikupicture_Spider() sp.run() 1 itskingname May 27, 2020 试一试把 cOnn= aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) 里面的 limit 参数调整到 500 |
2 dawnzhu OP @itskingname 谢谢大佬。我试下,这个是什么原因呢,并发数量少? |
3 dawnzhu OP @itskingname 不行的 |
4 Vegetable May 27, 2020 fetch 中捕获了超时,其他异常还是有可能向上抛出的,而协程中的异常未处理异常是不会终止程序,只是会输出一段 Task exception was never retrieved 这样的信息。 process 里并没有捕获异常,一旦出现异常会出现_worker 不能正确扣减,while 循环就无法跳出了 目前只看到这个可能。你这个代码写的很有意思,工工整整的,但是很多地方都挺底层的,比如手动管理 worker 数量而不是 Semaphore,用 aiohttp 而不是 httpx |