Python 协程任务卡住不动 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
dawnzhu

Python 协程任务卡住不动

  •  
  •   dawnzhu May 27, 2020 2696 views
    This topic created in 2163 days ago, the information mentioned may be changed or developed.

    在使用 python 协程下载图片中,最终协程的任务数卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久

    这个是运行的结果,在任务数为 80 一直卡着

    队列是否为空.... 80 队列是否为空.... 80 . . . 队列是为空.... 80 队列是否为空.... 80 

    下面贴上代码

    from lxml import etree import os import pandas as pd import asyncio import aiohttp from random import randint import cchardet import aiofiles import logging class sikupicture_Spider(object): def __init__(self): # self.seens_url = [] self.loop = asyncio.get_event_loop() self.queue = asyncio.PriorityQueue() self._workers = 0 # 当前工作数 self._max_workers = 150 # 最大工作数 self.overtime = {} # {url: times,} 记录失败的 URL 的次数 self.overtime_threshold = 4 self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36", } self.list_cOntent= [] async def init_url(self): info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('') for ite in info.itertuples(): await self.queue.put((randint(1, 5), getattr(ite, 'url'))) async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None): _headers = self.headers if headers: _headers = headers try: async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp: status_code = resp.status if status_code == 403: print("url-403", url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None if binary: text = await resp.read() encoding = cchardet.detect(text) html = text.encode(encoding, errors='ignore') else: html = await resp.text() except TimeoutError: print("url-overtime", url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None return status_code, html async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None): _headers = self.headers if headers: _headers = headers try: async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp: status_code = resp.status if binary: html = await resp.read() else: html = await resp.text() except TimeoutError: print("url-overtime", img_url) if url in self.overtime: self.overtime[url] += 1 if self.overtime[url] > self.overtime_threshold: pass else: await self.queue.put((randint(1, 5), url)) else: self.overtime[url] = 1 await self.queue.put((randint(1, 5), url)) status_code = 0 html = None return status_code, html def parse_source(self, source): try: response_1 = etree.HTML(source) except Exception as err: logging.error(f'parse error:{err}') url = "" else: img_url = response_1.xpath("//a[@href='Javascript:;']/@supsrc")[0] if len( response_1.xpath("//a[@href='Javascript:;']/@supsrc")[0]) else "" return img_url async def process(self, session, url, timeout): status, source = await self.fetch(session, url, timeout) file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "") if status == 200: img_url = self.parse_source(source) img_status, img_source = await self.download_img(session, img_url, timeout, url) if img_status == 200: async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f: await f.write(img_source) self._workers -= 1 print("任务完成", self._workers, "url_status", status, "img_status", img_status) else: self._workers -= 1 print("任务完成", self._workers, "url_status", status,) async def loop_crawl(self): await self.init_url() timeout = aiohttp.ClientTimeout(total=20) cOnn= aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) session = aiohttp.ClientSession(cOnnector=conn, timeout=timeout) while True: if self._workers >= self._max_workers: print("work 的判断") await asyncio.sleep(5) continue if self.queue.empty(): print("队列是否为空....", self._workers) await asyncio.sleep(5) if self._workers == 0: break continue _, url = await self.queue.get() asyncio.ensure_future(self.process(session, url, timeout)) self._workers += 1 print("队列剩余数量", self.queue.qsize(), self._workers) await session.close() def run(self): try: self.loop.run_until_complete(self.loop_crawl()) except KeyboardInterrupt: self.loop.close() if __name__ == '__main__': sp = sikupicture_Spider() sp.run() 
    6 replies    2020-05-27 14:15:50 +08:00
    itskingname
        1
    itskingname  
       May 27, 2020
    试一试把 cOnn= aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) 里面的 limit 参数调整到 500
    dawnzhu
        2
    dawnzhu  
    OP
       May 27, 2020
    @itskingname 谢谢大佬。我试下,这个是什么原因呢,并发数量少?
    dawnzhu
        3
    dawnzhu  
    OP
       May 27, 2020
    @itskingname 不行的
    Vegetable
        4
    Vegetable  
       May 27, 2020
    fetch 中捕获了超时,其他异常还是有可能向上抛出的,而协程中的异常未处理异常是不会终止程序,只是会输出一段
    Task exception was never retrieved
    这样的信息。
    process 里并没有捕获异常,一旦出现异常会出现_worker 不能正确扣减,while 循环就无法跳出了
    目前只看到这个可能。你这个代码写的很有意思,工工整整的,但是很多地方都挺底层的,比如手动管理 worker 数量而不是 Semaphore,用 aiohttp 而不是 httpx
    dawnzhu
        5
    dawnzhu  
    OP
       May 27, 2020
    @Vegetable 明白了,应该是这个问题,谢谢了
    ruanimal
        6
    ruanimal  
       May 27, 2020
    @Vegetable 估计是 c 程序员出身
    About     Help     Advertise     Blog     API     FAQ     Solana     1062 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 93ms UTC 18:44 PVG 02:44 LAX 11:44 JFK 14:44
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86