Project Reactor,如何实现主线程消费报错时停止 Flux 流 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
git00ll
V2EX    Java

Project Reactor,如何实现主线程消费报错时停止 Flux 流

  •  
  •   git00ll Dec 27, 2021 2397 views
    This topic created in 1584 days ago, the information mentioned may be changed or developed.

    如下代码,本意是将 flux 流发送到子线程处理,再将处理结果汇聚到主线程。如何能够实现主线程处理报错时停止 Flux 的呢。

     public static void main(String[] args) { String[] data = {"2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15"}; Iterable<Integer> integers = Flux.fromArray(data) .flatMapSequential(s -> Mono.fromSupplier(() -> Integer.parseInt(s)).subscribeOn(Schedulers.boundedElastic()), 3) .doOnNext(s -> { System.out.println(Thread.currentThread().getName() + "---------->" + s); }) .toIterable(); for (Integer i : integers) { //如何实现这里报错时,停止 Flux System.out.println((10 / i) + "------>>>>" + Thread.currentThread().getName()); } } 
    5 replies    2021-12-28 12:50:30 +08:00
    yazinnnn
        1
    yazinnnn  
       Dec 28, 2021
    fun main() {
    val data = arrayOf("2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15")

    val integers = Flux.fromArray(data)
    .flatMapSequential({ s: String ->
    Mono.fromSupplier { s.toInt() }.subscribeOn(Schedulers.boundedElastic())
    }, 3)
    val a = integers.doOnNext { s ->
    println(Thread.currentThread().name + "---------->" + s)
    }.subscribe()

    for (i in integers.toIterable()) {
    try {
    10 / i
    } catch (t: Throwable) {
    a.dispose()
    }
    }

    CountDownLatch(1).await()
    }


    不清楚你的具体需求,如果整个链路不使用 reactivestream 的话,似乎性能(吞吐)并没什么提高
    toIterable()迭代时会阻塞当前线程,这样写跟直接用线程池处理比没啥优点
    Macolor21
        2
    Macolor21  
       Dec 28, 2021
    CompletableFuture 最后 join 回主线程?
    git00ll
        3
    git00ll  
    OP
       Dec 28, 2021
    @yazinnnn 假设 toInt 这个操作是比较耗时的,可以实现将 toInt 放置在多核上运行,最终结果再汇聚到主线程上。
    因为主线程上开启了传统注解事务,需要在主线程上操作 Flux 的处理结果
    yazinnnn
        4
    yazinnnn  
       Dec 28, 2021
    @git00ll
    那直接使用 stream 的并行 api 或者 2 楼的 CompletableFuture 是否更适合你的场景?
    git00ll
        5
    git00ll  
    OP
       Dec 28, 2021
    @yazinnnn
    业务场景里有一些限制,
    1. 需要保持输入和输出的顺序一致,
    2. 流中的数据从文件中读取的,数据量非常大,无法全部加载到内存。只能边读取边处理。
    3.处理过程中其中一条处理错误时,算失败,中断流不再继续。

    一方面 stream 的并行流没有拉模式,无法精准控制载入内存的数据行数。
    且并行 stream 提供的 api 太少,相比于 reactor 提供的控制选项不足
    About     Help     Advertise     Blog     API     FAQ     Solana     3322 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 33ms UTC 12:13 PVG 20:13 LAX 05:13 JFK 08:13
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86