0%

Reactor响应式编程阅读源码有感

官方参考文档
baeldung参考文章

Flux

an Asynchronous Sequence of 0-N Items(是一个异步的处理序列,它有0或者多个条目)
官方对Flux的原理解析图解:

Spring Gateway的源码中重度依赖Reactor Project,比如:

1
return responseFlux.then(chain.filter(exchange));

这句代码的逻辑会这么走:要等到responseFlux被subscribe了之后,chain.filter(exchange)(这是一个Mono),才会被继续流转运行。

Mono

Mono.just

I mean that calling Mono.just(System.currentTimeMillis()) will immediately invoke the currentTimeMillis() method and capture the result

1
2
3
4
5
6
7
8
9
10
Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0(代表这个时候获取clock的实际时间依然还是上面第一次调用System.currentTimeMillis()拿到的时间,下面雷同)

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0

Mono.defer

The defer operator is there to make this source lazy, re-evaluating the content of the lambda each time there is a new subscriber:

1
2
3
4
5
6
7
8
9
10
Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10(这里标识获取到的时间是线程等待了10秒之后的时间,表明defer()这个方法会延时参数里面声明的lambda表达式,在需要的时候再去实际调用计算结果,下面雷同)

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17