
之前自己遇到简单的需求需要异步并发的时候,经过 v2 大佬的指点之后用的 asyncio.Queue + aiohttp 搞定了,大概步骤是这样:(在下不是专业程序员,可能写的东西看起来像玩具,大佬们见谅)
async def worker(session): while True: uri = await queue.get() uri, status_code = await delete_url(uri, session) ...... async def delete_url(uri, session): ...... async with session.delete(url, headers=headers) as response: status_code = response.status return uri, status_code async def main(): async with aiohttp.ClientSession() as session: tasks = [] for _ in range(max_worker): task = asyncio.create_task(worker(session)) ...... 早几天看到有个大佬编的那个 [ Python 潮流周刊] 里面推荐一个异步队列 saq
github 地址: https://github.com/tobymao/saq
由于没玩过异步任务队列,就很想试试,结果遇到这样一个问题
async def delete_url(ctx, *, uri, session): async with session.delete(url, headers=headers) as response: status_code = response.status return uri, status_code settings = { "functions": [delete_url], "concurrency": 50 } async def main(): async with aiohttp.ClientSession() as session: with open(filename, 'r') as f: for line in f: uri = line.strip() job = await queue.enqueue("delete_url", uri=uri, session=session) session=session 不能这样传参,因为 saq 的这个 queue.enqueue 只能接收可序列化的作为参数, 而 aiohttp.ClientSession 不是一个可以 JSON 序列化的。这个 session 又不能写全局变量,要 session 共享的话,又不能写在 delete_url 函数里面,想问问大佬,这种情况要咋处理啊?
1 NessajCN 2023-12-18 11:39:57 +08:00 写个类,session 是成员变量,delete_url 是成员函数 |
2 fzzff 2023-12-18 13:14:57 +08:00 session 为什么不能写为全局变量? |
3 Vegetable 2023-12-18 15:18:51 +08:00 草草看了一下这个 saq ,他通过 redis 通信,和你本身实现的异步并不搭配,如果你之前能用 asyncio.queue ,证明你本来只是一个单进程的程序,这时候你在程序内部共享 session 是不错的做法。 但是 saq 通过 redis 通信,就意味着他的设计目的是实现分布式结构,分布式想共享对象就需要序列化了。但分布式之下共享 session 这个事儿其实没什么必要做了,至少不是一个好的方案。 |
4 ClericPy 2023-12-18 21:56:26 +08:00 消费者那边每个进程共享一个 Session |