目前需要从一个 FTP 服务器下载 3 万多个小文件,之前用 multiprocessing 总是下一部分之后就停了,所以尝试用异步加快下载:
class pdb: def __init__(self): self.ids = [] self.dl_id = [] self.err_id = [] async def download_file(self, session, url): try: with async_timeout.timeout(10): async with session.get(url) as remotefile: if remotefile.status == 200: data = await remotefile.read() return {"error": "", "data": data} else: return {"error": remotefile.status, "data": ""} except Exception as e: return {"error": e, "data": ""} async def unzip(self, session, work_queue): while not work_queue.empty(): queue_url = await work_queue.get() print(queue_url) data = await self.download_file(session, queue_url) id = queue_url[-11:-7] ID = id.upper() if not data["error"]: saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb') if ID not in self.dl_id: self.dl_id.append(ID) with open(f"{id}.ent.gz", 'wb') as f: f.write(data["data"].read()) with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile: shutil.copyfileobj(inFile, outFile) os.remove(f"{id}.ent.gz") else: self.err_id.append(ID) def download_queue(self, urls): loop = asyncio.get_event_loop() q = asyncio.Queue(loop=loop) [q.put_nowait(url) for url in urls] con = aiohttp.TCPConnector(limit=10) with aiohttp.ClientSession(loop=loop, cOnnector=con) as session: tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close() if __name__ == "__main__": x = pdb() urls = ['ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/nf/pdb4nfn.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ny/pdb4nyj.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/mn/pdb2mnz.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ra/pdb4ra4.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/x5/pdb4x5w.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/dm/pdb2dmq.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/n7/pdb2n7r.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/om/pdb2omv.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/oy/pdb3oy8.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/fe/pdb3fej.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/hw/pdb2hw9.ent.gz'] x.download_queue(urls) 报错信息如下:
Traceback (most recent call last):
File "test.py", line 111, in <module>
x.download_queue(urls)
File "test.py", line 99, in download_queue
loop.run_until_complete(asyncio.gather(*tasks))
File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "test.py", line 73, in unzip
data = await self.download_file(session, queue_url)
File "test.py", line 65, in download_file
return {"error": remotefile.status, "data": ""}
File "/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py", line 46, in exit
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
请大家帮忙看看。谢谢!

1