创建 flux 代码如下,其中的 long consumer 可能会被下游多次调用。
Flux.create(new Consumer<FluxSink<Object>>() { @Override public void accept(FluxSink<Object> fluxSink) { fluxSink.onRequest(new LongConsumer() { @Override public void accept(long value) { log.info("我被多次调用了 request:" + value); for (long i = 0; i < value; i++) { fluxSink.next("request:" + i); } } }); } }) 也就是说,我们不能决定下游调用的时机,调用的次数,调用的所在线程。这样就很容易产生 bug 。
FluxArray 解决此问题的办法是使用 Operators.addCap(REQUESTED, this, n) == 0判断, 只有返回为 0 时,才进行处理,否则将请求的 n 叠加到 request 后就 return 。
public void request(long n) { if (Operators.validate(n)) { if (Operators.addCap(REQUESTED, this, n) == 0) { if (n == Long.MAX_VALUE) { fastPath(); } else { slowPath(n); } } } } 我们自己写 Flux.create() 时也可以借鉴 FluxArray 的处理办法,但是这样就变得麻烦了。 不知道有什么现有封装好的实现没有??
