2022-06-28

Java

Reactor

ReactiveX/RxJava文档中文版 (推荐) reactor-core github Reactor 指南中文版 v2.0 Reactor 指南 v3.0 中文翻译 Reactor 3 参考指南 N_WebFlux

Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每秒钟可处理 1500 万事件。

通常,Java开发人员通过使用阻塞代码来编写程序。在没有遇到瓶颈之前,这种做法收效甚好。然后是时候引入其他线程,(也就是多线程编程)运行类似的阻塞代码了。 但是这种形式的资源利用会迅速引发竞争和并发问题。更糟糕的是,阻塞会浪费资源。如果仔细观察,程序一旦遇到一些耗时操作(特别是I/O,例如数据库请求或网络调用),就会浪费资源,因为线程(可能有很多线程)现在处于空闲状态,啥也不干,就呆呆地等待数据。因此,并行化方法不是灵丹妙药。我们有必要榨干硬件全部性能。

核心概念

RxJava 有四个基本概念:

  1. Observable (可观察者/被观察者)
  2. Observer (观察者)
  3. subscribe (订阅)、事件。

Observable(被观察者)Observer(观察者) 通过 subscribe (订阅) 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

参考: 17_行为型 - 观察者模式 (Observer Pattern) 观察者模式

一个场景

考虑这样一种情景:在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容)

userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  1. 我们 异步地转换 它们(ID) 为 Favorite 对象(使用 flatMap),现在我们有了 Favorite流。
  2. 一旦 Favorite 为空,切换到 suggestionService。
  3. 我们只关注流中的最多5个元素。
  4. 最后,我们希望在 UI 线程中进行处理。
  5. 通过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe)。

in short 本质上将所有逻辑处理成流式的; 核心优点是在多线程的情况下, 将所有复杂逻辑都能穿成一条线

从命令式到响应式编程

可编排性与可读性

可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)

像流水线

你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

操作符(Operators)

在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。

不要试图从流中获取数据出来,而是先思考需要对流中元素做什么, 需要对流中的数据进行操作时,都应该使用操作符来处理

最终,一个订阅者 (Subscriber) 终结这个过程请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生(其中间所有op都是无效的)。

背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。

Reactor API

Reactor 引入了可组合的响应式类型,这些类型实现了Publisher,但也提供了丰富的操作工集合:Flux和Mono。

Flux 和 Mono 是 Reactor 中的两个基本概念

  1. Flux 对象表示0..N个元素的响应序列
  2. Mono 对象表示单值或空(0..1)序列。

关键就是函数定义和其代表处理逻辑

Flux/Mono 创建和订阅

生成

Flux#just(T...)Mono#just(T)Mono#justOrEmpty(T) just 方法就是显式指定序列, 类似Stream#of

Flux#from(Publisher)Flux#fromArray Flux#fromIterableFlux#rangeFlux#fromStream(Supplier<Stream>) 从Publisher、数组、集合、数据范围、流中生产序列

Mono#from(Publisher)Mono#fromSupplier Mono#fromRunnableMono#fromCallableMono#fromFuture 从其他地方获取结果,其中Mono#from(Publisher)是截取 Publisher 第一个元素

Flux#defer 延迟加载,参数需要传一个Supplier(无参数,1个返回值),返回值就是数据流,只有订阅了才会去初始化数据源 类似

//源代码方法签名
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
//使用例子
public static void deferTest() {
    Flux flux = Flux.defer(() -> {
    	return Flux.range(1, 3);
    });
    flux.subscribe(i -> System.out.println(i)); //这一步才会去执行Supplier#get代码
}

Mono 转为 Flux

// 创建一个 Mono 对象
Mono<String> mono = Mono.just("Hello");
// 将 Mono 转换为 Flux
Flux<String> flux = mono.flux();
// 订阅 Flux 来处理元素
flux.subscribe(System.out::println);

推送

Flux#generate Flux#generate是一个同步的简易的程序化生成序列的方法,你需要提供一个初始值以及一个控制sink的回调方法

其完整参数序列是generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

Flux<String> flux = Flux.generate(
() -> 0,//初始值 (更接近 流处理上下文, 没有就不知道什么时候结束)
(state, sink) -> {// SynchronousSink (生产数据,同步回调推送出去)
    sink.next("3 x " + state + " = " + 3*state);
    if (state == 10) sink.complete();//结束
    return state + 1;
});
System.out.println( flux.collectList().blockOptional() ); //获取结果

Flux#create 与 generate 不同的是,create 不需要状态值, create 有个好处就是可以将现有的 API 转为响应式,比如监听器的异步方法。

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(
      new MyEventListener<String>() { 
        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);//生成并推送
          }
        }
        public void processComplete() {
            sink.complete();//完成结束
        }
    });
});
 

每一个 chunk 的数据转化为 Flux 中的一个元素。 processComplete 事件转换为 onComplete。

Mono 也有一个用于 create 的生成器(generator)—— MonoSink,它不能生成多个元素, 因此会抛弃第一个元素之后的所有元素。

背压

此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为。 IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。 ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。 DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。 LATEST:让下游只得到上游最新的元素。 BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。

flux.create(sink->{
            //do generate
        },  FluxSink.OverflowStrategy.DROP);//ERROR 下游处理不及的时候 抛错

Flux#push (单线程) create 的一个变体是 push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种溢出策略(overflow strategies)管理背压。

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { 
        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);
          }
        }
        public void processComplete() {
            sink.complete();
        }
        public void processError(Throwable e) {
            sink.error(e);
        }
    });
});

事件通过调用 next 被推送到 sink。 complete 事件也在同一个线程中。 error 事件也在同一个线程中。

订阅/消费

订阅(subscribe)的时候,Flux 和 Mono 使用 Java 8 lambda 表达式。 .subscribe() 方法有多种不同的方法签名,你可以传入各种不同的 lambda 形式的参数来定义回调

subscribe(); 
subscribe(Consumer<? super T> consumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

以上方法会返回一个 Subscription 的引用,如果不再需要更多元素你可以通过它来取消订阅。 取消订阅时, 源头会停止生成新的数据,并清理相关资源。取消和清理的操作在 Reactor 中是在 接口 Disposable 中定义的。

pull

Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
    new MyMessageListener<String>() {
    public void onMessage(List<String> messages) {
        for(String s : messages) {
        sink.next(s);
        }
    }
});
sink.onRequest(n -> {//此回调由下游调用;  下游可以从上游拉取已经就绪的数据
    List<String> messages = myMessageProcessor.request(n);
    for(String s : message) {
        sink.next(s);
    }
});

更多可以参考4.4. 可编程式地创建一个序列

行为

flatMap() 和 map()

flatMap() 和 map() 的区别在于:

  1. flatMap() 中的入参 Function 的返回值要求是一个 Mono 对象 转换流中的元素为新的流
  2. 而 map 的入参 Function 只要求返回一个普通对象

then…

then() 看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。 then() 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMap() 和 map() 的参数都是一个 Function,入参是上一步的执行结果。

then() 上游流完成后执行其他的操作.

…并且我希望用 Mono 来表示序列已经结束:then …并且我想在序列结束后等待另一个任务完成:thenEmpty …并且我想在序列结束之后返回一个 Mono:Mono#then(mono) …并且我想在序列结束之后返回一个值:Mono#thenReturn(T) …并且我想在序列结束之后返回一个 Flux:thenMany

zip

zip

 
// zip 组装多个 mono
Mono.zip(
        Mono.just(0),
        Mono.just(6),
        Mono.just(9)
).flatMap(tp->{
    tp.getT1()//0
    tp.getT2()//6
    tp.getT3()//9
})
 

zip():组装多个Mono(类型可以不一样)。

zipWith

zipWith 操作符把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流;也可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。 in short 像拉链一样

Flux.just("a", "b").zipWith(Flux.just("c", "d"))
		.subscribe(System.out::println);
Flux.just("a", "b")
		.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
		.subscribe(System.out::println);

zipWith():不需要上一个Mono的结果(类型可以不一样)。

zipWhen

先等待当前 Mono 发出值,然后用该值作为输入,动态生成一个新的 Mono(通过 rightGenerator 函数)。 然后将原始值新生成的 Mono 的结果组合成 Tuple2

zipWhen():需要上一个Mono的结果(类型可以不一样)。

 
// zipWhen 组合两个Mono, 并返回新的Mono类型; 
 
// 例一个假期, 你需要预约酒店,再预约滴滴, 然后你才知道去哪里
Mono<HotelBooking> first = bookHotel(startDate, EndDate);
 
// you need the chosen hotel location before you can book a taxi,
// but you still need both bookings in the end
Mono<Tuple2<HotelBooking, TaxiBooking>> result = first
    .zipWhen(booking -> bookTaxi(airport, booking.getLocation());//根据酒店的结果 预约DD
 
//其结果是个元组对象
result.map((tuple)->{
	tuple.getT1();// first
	tuple.getT2();// taxi
})
 

Mone 合并为 Flux

如果你有多个 Mono 对象,想要将它们合并成一个 Flux 对象,你可以使用 Flux.concatFlux.merge 操作符。这两个操作符都可以将多个 Mono 序列连接成一个 Flux 序列

Flux.concat: 保持原始 Mono 的顺序,一个 Mono 完成后再处理下一个。适用于有序的场景。

Mono<String> mono1 = Mono.just("Mono 1");
Mono<String> mono2 = Mono.just("Mono 2");
 
Flux<String> flux = Flux.concat(mono1, mono2);
 
 

Flux.merge: 并行处理多个 Mono,无法保证原始 Mono 的顺序。适用于无序的场景。

Mono<String> mono1 = Mono.just("Mono 1");
Mono<String> mono2 = Mono.just("Mono 2");
 
Flux<String> flux = Flux.merge(mono1, mono2);

doOn… 只读/通知

在不对序列造成改变的情况下

得到通知或执行一些操作:

发出元素:doOnNext 序列完成:Flux#doOnComplete,Mono#doOnSuccess 因错误终止:doOnError 取消:doOnCancel 订阅时:doOnSubscribe 请求时:doOnRequest 完成或错误终止:doOnTerminate(Mono的方法可能包含有结果) 但是在终止信号向下游传递 之后 :doAfterTerminate 所有类型的信号(Signal):Flux#doOnEach 所有结束的情况(完成complete、错误error、取消cancel):doFinally 记录日志:log

doOnNext 会在 onXxx, doOnXxx 之前调用; 优先级: doOnNext > onNext > onComplete > doFinally/doAfterTerminate

doFinallydoAfterTerminate执行顺序不固定,由onComplete方法执行万抽后向上游依次执行,离得”近”的优先执行。

handle

handle 用于一个 “映射 + 过滤 null” 的场景

例如,下边的方法可以用于 Integer 序列,映射为字母或 null

public String alphabet(int letterNumber) {
    if (letterNumber < 1 || letterNumber > 26) {
            return null;
    }
    int letterIndexAscii = 'A' + letterNumber - 1;
    return "" + (char) letterIndexAscii;
}
 
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i);
        //如果返回的是 null …
        //就不调用 sink.next 从而过滤掉。
        if (letter != null)
            sink.next(letter);
    });
alphabet.subscribe(System.out::println);

遍历元素及元素中的Mono

Flux.fromIterable(operators)
	.flatMap(operator->{
		Mono<Byte> stateMono = operator.getState();
		String deviceId = operator.getDeviceId();
		String thirdId = DevicceIdUtils.getThirdId(deviceId);
		return Mono.zip(Mono.just(operator), stateMono, getDeviceState(thirdId));//用zip 全部 Mono 合在一起
	}).flatMap(tp->{
		DeviceOperator operator = tp.getT1();
		Byte stat = tp.getT2();
		JsonNode result = tp.getT3();
		/////////
		DeviceMessage deviceMessage = ConvetMessageUtils.convert_getRealTimeStatus(result);
		if(deviceMessage instanceof DeviceOfflineMessage){
			if (stat.equals(DeviceState.offline)) {
				return Mono.empty();
			}
		}
		return handleMessage(deviceMessage);
	});

流截止的问题

Mono<Integer> res = Mono.just(0)
	.flatMap(e -> {
		System.out.println("  out 1111 ");
		return Mono.just(1111);
	}).flatMap(f -> {
		System.out.println("  out 1122 ");
		return Mono.just(1122);
	});
 
	res.flatMap(f -> {//这部分语句不会输出, 不会执行;
		System.out.println("  out 2222 ");
		return Mono.just(2222);
	});// .block() // 除非这里触发, 将 1111 1122 2222 组成一个新的流
 
Flux flx = res.flatMap(f -> {
	System.out.println("  out 3333 ");
	return Mono.just(3333);
}).flux();
flx.subscribe();//总订阅

错误处理

当订阅的时候,位于链结尾的 onError 回调方法和 catch 块类似,一旦有异常,执行过程会跳入到 catch:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v))//第一次 map 映射
    .map(v -> doSecondTransform(v));//第二次 map 映射
s.subscribe(//订阅
    value -> System.out.println("RECEIVED " + value),//正常的情况
    error -> System.err.println("CAUGHT " + error)//一旦有错误,序列(sequence)终止, 并回调
);
 

onErrorReturn, onErrorResum, doOnError …

Flux.just(10)
    .map(this::doSomethingDangerous)
    //在错误发生时 返回这个做为结果
    //.onErrorReturn("RECOVERED");
 
    //在错误发生时, 尝试使用下面的逻辑,决定是否终止流
    .onErrorResume(error -> {
        if (error instanceof TimeoutException)
            return getFromCache(k);//处理1
        else if (error instanceof UnknownKeyException)
            return registerNewEntry(k, "DEFAULT");//处理2 
        else
            return Flux.error(error);//处理不了, 将问题'重新抛出'
    })
 
    //如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用 doOnError 方法。
    .doOnError(e -> {
        log("uh oh, falling  ");
    })
 
    // doFinally 在序列终止(无论是 onComplete、onError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消?)。
    .doFinally(type -> {
        if (type == SignalType.CANCEL)
          statsCancel.increment();
    })
    // ... 
    
;
 

简略列表

map: 转换流中的元素: flux.map(UserEntity::getId) mapNotNull: 转换流中的元素,并忽略null值.(reactor 3.4提供) flatMap: 转换流中的元素为新的流: flux.flatMap(this::findById) flatMapMany: 转换Mono中的元素为Flux(1个转多个): mono.flatMapMany(this::findChildren) concat: 将多个流连接在一起组成一个流(按顺序订阅) : Flux.concat(header,body) merge: 将多个流合并在一起,同时订阅流: Flux.merge(save(info),saveDetail(detail)) zip: 压缩多个流中的元素: Mono.zip(getData(id),getDetail(id),UserInfo::of) then: 上游流完成后执行其他的操作. doOnNext: 流中产生数据时执行. doOnError: 发送错误时执行. doOnCancel: 流被取消时执行.如: http未响应前,客户端断开了连接. onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流. defaultIfEmpty: 当流为空时,使用默认值. switchIfEmpty: 当流为空时,切换为另外一个流. as: 将流作为参数,转为另外一个结果:flux.as(this::save)

empty, error, never: 1)empty 指的是一个直接完成的序列 2)error 指的是一个直接报错的序列 3)never 值的是一个不做任何事情的序列,比如发出数据、报告完成、报告错误等等

延迟执行

delay(Duration duration)和 delayMillis(long duration) 创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。

消息队列 (Sinks.Many)

类似消息队列

官文

private final Sinks.Many<Message> replaySink = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE,false);

Sinks.many().multicast() 如果没有订阅者,那么接收的消息直接丢弃 Sinks.many().unicast() 如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅 Sinks.many().replay() 不管有多少订阅者,都保存所有消息

one(): a sink that will play a single element to its subscribers empty(): a sink that will play a terminal signal only to its subscribers (error or complete), but can still be viewed as a Mono<T> (notice the generic type <T>).

提供数据

/thread1
replaySink.emitNext(1, FAIL_FAST);
 
//thread2, later
replaySink.emitNext(2, FAIL_FAST);
 
//thread3, concurrently with thread 2
EmitResult result = replaySink.tryEmitNext(3);

数据提交有两个方法: emitNext 指定提交失败策略同步提交 tryEmitNext 异步提交,返回提交成功、失败状态

下游消费

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();

指定调度 (SCHEDULERS, publishOn, subscribeOn)

如果无法避免阻塞操作(如执行JDBC), 如果要切换执行线程怎么办?可以使用 publishOn 和 SubscribeOn

subscribeOn() 方法适用于订阅过程。我们可以把它放在响应链条中的任意位置。它接收 Scheduler 参数,且在提供的线程池中选择线程执行。

publishOn(Schedulers.elastic()) 方法跟 subscribeOn() 类似

publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。所以,publishOn 的影响范围和它的位置有关,而 subscribeOn 的影响范围则和位置无关。 subscribeOn 方法使发布者使用给定的线程池来发布值。管道中可能有N个subscribeOn方法。最接近的将会生效。

RxJava内置的几种线程调度器

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
 
Flux.fromIterable(operators)  
.flatMap(operator->{  
	Mono<Byte> stateMono = operator.getState();  
	String deviceId = operator.getDeviceId();  
	String thirdId = DevicceIdUtils.getThirdId(deviceId);
	// 主 响应式流程
	return Mono.zip(Mono.just(operator), stateMono, getRealTimeStatus(thirdId) );  
})  
.flatMap(tp->{  
	DeviceOperator operator = tp.getT1();  
	Byte stat = tp.getT2();  
	JsonNode result = tp.getT3();
	})
....
 
 
 
 
 private Mono<JsonNode> getRealTimeStatus(String thirdId) {
		// 如果想生效 必须从主流程 中断开
        return Mono.fromCallable(() -> {
            JsonNode realTimeStatus = operator.getRealTimeStatus(thirdId);
            if (NjjacOperator.IS_DEBUG){
                log.info("[D] thirdId ={}, getDeviceState = {}",thirdId, realTimeStatus);
            }
            return realTimeStatus;
        }).subscribeOn(Schedulers.boundedElastic());
    }
 

实例 Spring WebFlux

https://docs.spring.io/spring-framework/reference/web/webflux.html

依赖

dependencies {  
    implementation("org.springframework.boot:spring-boot-starter")  
//    implementation("org.springframework.boot:spring-boot-starter-web")  //MVC 
    implementation("org.springframework.boot:spring-boot-starter-webflux")//响应式
}

Enabling WebFlux Config

See equivalent in the Servlet stack

You can use the @EnableWebFlux annotation in your Java config, as the following example shows:

@Configuration
@EnableWebFlux
public class WebConfig {
 
}

In your Java configuration, you can implement the WebFluxConfigurer interface, as the following example shows:

@Configuration
public class WebConfig implements WebFluxConfigurer {
 
	// Implement configuration methods...
}

测试代码

package cn.simae.llm.tny.web;  
  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.boot.context.event.ApplicationReadyEvent;  
import org.springframework.context.event.EventListener;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping;  
import reactor.core.publisher.Mono;  
  
import java.util.Map;  
  
/**  
 * @author yangfh  
 * @date 2025/6/14 11:34  
 **/@RestController  
@RequestMapping(value = "/index")  
public class IndexRest {  
  
    private static final Logger log = LoggerFactory.getLogger(IndexRest.class);  
    private final RequestMappingHandlerMapping handlerMapping;  
    public IndexRest(RequestMappingHandlerMapping handlerMapping) {  
        this.handlerMapping = handlerMapping;  
    }  
    @GetMapping(value = "")  
    public Mono<Map<String, String>> index() {  
        log.info("index");  
        return Mono.just(Map.of("result", "000") );  
    }  
  
    @EventListener(ApplicationReadyEvent.class)  
    protected void ApplicationReady() {  
        handlerMapping.getHandlerMethods().forEach((info, method) ->  
//                log.info("{} -> {}.{}",  
//                        info.getPatternsCondition().getPatterns(),  
//                        method.getBeanType().getSimpleName(),  
//                        method.getMethod().getName())  
                        System.out.println("web url mapping ["+ info.getPatternsCondition().getPatterns()  
                                +  "] -> " + method.getBeanType().getSimpleName()  
                                + "." + method.getMethod().getName())  
        );  
    }  
}

调试

全是流式, 异步 API, 对于调试是个噩梦!!!

即便 stack trace 能够对有些许经验的开发者传递一些信息,但是在一些复杂的情况下, 这并不是一种理想的方式。 幸运的是,Reactor 内置了一种面向调试的能力—— 操作期测量(assembly-time instrumentation)。 这通过 在应用启动的时候 (或至少在有问题的 Flux 或 Mono 实例化之前) 加入自定义的 Hook.onOperator 钩子(hook),如下:

开启调试钩子

reactor.core.publisher.Hooks.onOperatorDebug(); 开启 hook 在报错时会多打印一些 stack

一个例子

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
...
...
	at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
...
	org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s):
	|_	Flux.single(TestWatcher.java:55)//有用的 就这一行

触发消费, 阻塞获取结果

不提倡阻塞获取, 虽然它这个理念很好, 但有时候调试要用到

对于单个元素(即 Mono)

Mono<String> singleSeq = Mono.just("foo");
Optional<String> s = singleSeq.blockOptional();
System.out.println(s.get());//blockOptional
System.out.println(singleSeq.block());//block

对于多个元素的(即 Flux)

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> fluxList = Flux.fromIterable(iterable);
System.out.println( fluxList
    .collectList()//先收集
    .blockOptional() //再阻塞获得结果
);