问一个关于 asyncio 的问题,当多个 task 运行,其中一个正确返回后,怎么取消其他协程? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
dongcheng
V2EX    Python

问一个关于 asyncio 的问题,当多个 task 运行,其中一个正确返回后,怎么取消其他协程?

  •  
  •   dongcheng 2021-01-22 10:33:25 +08:00 3419 次点击
    这是一个创建于 1791 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个简单例子 import asyncio import time

    特殊函数

    async def get_request(url): print('正在下载',url) # time.sleep(2) 不支持异步的模块 会中断整个的异步效果 await asyncio.sleep(2) print('下载完成',url) return 'page_text'

    def parse(task): print(task.result())

    start = time.time() urls = ['www.xxx1.com','www.xxx2.com','www.xxx3.com']

    tasks = [] #存放多任务 for url in urls: # 调用特殊函数 func = get_request(url)

    # 创建任务对象 task = asyncio.ensure_future(func) # 给任务对象绑定回调函数 task.add_done_callback(parse) tasks.append(task) 

    创建事件循环对象

    loop = asyncio.get_event_loop()

    执行任务

    loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start) #2.0015313625335693

    可以在 get_request 判断任务是否成功,但是如何判断成功之后取消其他任务呢?

    谢谢

    23 条回复    2021-01-22 14:38:17 +08:00
    chanchancl
        1
    chanchancl  
       2021-01-22 10:46:51 +08:00
    将 func 包装成一个 function

    在结尾的地方,将 tasks 中,除了自己的 task 都 cancle 掉
    pursuer
        2
    pursuer  
       2021-01-22 10:52:53 +08:00
    如果想要让协程提前结束等待,可以使用 asyncio.wait(...,return_when='FIRST_COMPLETED')或者更灵活的方案 asyncio.Future,然后处理下资源释放
    dongcheng
        3
    dongcheng  
    OP
       2021-01-22 11:02:28 +08:00
    @chanchancl 这个 func 能获取到 task 吗?
    sujin190
        4
    sujin190  
       2021-01-22 11:08:05 +08:00   4
    你是不是想错了,对于协程来说,如果设计到 io,那么就算能取消协程其实并不能取消 io 操作

    比如 http 请求,就算你要取消协程其实并不能取消已经发送出去的 http 请求,取消协程完全没有意义,如果你能取消 io,比如关闭 http 连接来取消 http 请求,那么你应该通过关闭 http 请求的方式来触发协程返回,即先取消 io 操作再由 io 完成来触发协程返回,而不能倒过来

    对于非 io 请求,再整个协程完成前会独占线程,并不会调度到其他协程,所以你自然也不能用其他协程来取消当前正在运行的协程了
    keepeye
        5
    keepeye  
       2021-01-22 11:10:10 +08:00
    不是有 tasks 吗?遍历一下都 cancel 掉
    fiveelementgid
        6
    fiveelementgid  
       2021-01-22 11:10:13 +08:00 via Android
    看见 Task 和 continuation 还激动了一下以为有. NET 玩家一起研究 TAP 了,一看是 Python,告辞
    sujin190
        7
    sujin190  
       2021-01-22 11:12:28 +08:00
    对于非 io 请求协程再补充一点,似乎协程再创建的时候就会进行首次运行,没 io 操作,所以首次运行必然独占线程一直到运行完成,所以这种情况下完全做不了中途取消的操作
    dongcheng
        8
    dongcheng  
    OP
       2021-01-22 11:20:08 +08:00
    @sujin190 应该可以取消 task 吧。只是有局限。

    @keepeye asyncio.Task.all_tasks()可以获取 all task 。但是现在有个需求是嵌套的 task,可以取消所有 task,不能取消第 2 层的 task (保留第一层)
    sujin190
        9
    sujin190  
       2021-01-22 11:26:56 +08:00
    @dongcheng #8 但是没意义,不符合协程原理实现,从程序的可靠性严谨性来说也不应该出现这种设计,不关闭 io 而取消上层协程是很不科学的,很容易导致 io 、连接泄露啥的,而且还不容易排查是啥问题
    xiaoHuang3
        10
    xiaoHuang3  
       2021-01-22 11:48:58 +08:00
    @sujin190 附议~,我觉得楼主这种需求根本不适合用 asyncio 来做- -
    crclz
        11
    crclz  
       2021-01-22 12:10:44 +08:00
    @fiveelementgid .NET 的设计就很清晰,TAP 、cancellation token

    如果是.NET 的话,应该把一个 CancellationTokenSource ( cts )的 CancellationToken 传给所有异步方法,异步方法会返回 task,再等 Task.WhenAny 返回后,再取消 cts 。

    由于 CancellationToken 是层层深入的( HTTP 、TCP ),所以底层负责 TCP 的代码也会取消正在进行的操作,然后会扔出一个取消异常,最后捕获这个异常就行了(在异步方法中)。

    我想 python 的问题就是,底层的 api 都没有提供 cancellation token 的参数,导致无法让取消操作层层传播。
    abersheeran
        12
    abersheeran  
       2021-01-22 12:21:18 +08:00   1
    写过相关代码,在我的项目里使用过很久,一切良好。

    https://gist.github.com/abersheeran/bd2372bb35fec859d7fca453ca5f7826
    abersheeran
        13
    abersheeran  
       2021-01-22 12:32:01 +08:00
    刚看到楼主在 #8 说有嵌套 Task 。很遗憾,asyncio 不支持嵌套取消。可以试试 trio,宣传上说是解决了这个问题。我也没试过。
    optional
        14
    optional  
       2021-01-22 12:33:41 +08:00 via Android
    从设计上来说,这是不可取消的,如果有 transaction 对象之类,可以标记一下
    fiveelementgid
        15
    fiveelementgid  
       2021-01-22 13:01:58 +08:00 via Android
    @crclz Sure,我昨晚刚看完印象深刻,特地还去扒了一下源码。最底下就是 new 一个新的 Token 和上层传下来的 Token 然后 Link,整层整层地控制
    dongcheng
        16
    dongcheng  
    OP
       2021-01-22 13:30:11 +08:00
    @abersheeran 感谢分享代码。研究了下 asyncio.as_completed(tasks)可以实现类似的功能
    zhuangzhuang1988
        17
    zhuangzhuang1988  
       2021-01-22 13:36:50 +08:00
    @crclz 微软还是最牛的
    cancal 进度报告是 api 中设计的
    xuboying
        18
    xuboying  
       2021-01-22 13:41:24 +08:00
    @abersheeran #12 个人感觉这个代码片段用类来实现可读性可能会更好一点。loop = loop or asyncio.get_running_loop()学到了。之前写的时候还没这个函数。。。
    imn1
        19
    imn1  
       2021-01-22 13:44:33 +08:00
    刚好有类似需求
    没找到满意的解决方案
    目前是用异步队列,当满足一定条件就清空队列,但条件满足时,已经并行启动的不能取消,要继续工作,只是清空了未启动的队列
    js8510
        20
    js8510  
       2021-01-22 14:19:27 +08:00
    同 @sujin190 。 我猜测可能想的是 multiprocessing 而不是 asyncio.
    如果 pre request/ post response 有 cpu heavy 的处理。并发倒是有意义的。

    你可以 map_async 。如果 i/o 还没发生,或者已经发生,你可以 terminate 掉其他 process. 如果正在 i/o blocking 你还是要 wait 然后 kill 掉。

    这样调度有开销,要具体分析。。简单的 post 请求就返回应该不值得。
    zeroDev
        21
    zeroDev  
       2021-01-22 14:30:44 +08:00 via Android
    用协程里的 active 来判断
    zeroDev
        22
    zeroDev  
       2021-01-22 14:31:59 +08:00 via Android
    另外,我感觉对于你这个需求,不应该进行并发请求。
    muzuiget
        23
    muzuiget  
       2021-01-22 14:38:17 +08:00
    老生常谈,从外部取消相当于强杀进程,好容易造成数据一致性问题。

    正确姿势就是线程 /协程自己在某些关键点判断某个共享变量状态,然后自己优雅退出。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2926 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 33ms UTC 13:42 PVG 21:42 LAX 05:42 JFK 08:42
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86