接上一篇:

实战SpringCloud响应式微服务系列教程(第一章)

实战SpringCloud响应式微服务系列教程(第二章)

实战SpringCloud响应式微服务系列教程(第三章)

实战SpringCloud响应式微服务系列教程(第四章)

 

1.1.5 创建Flux和Mono

在引入Fluxhe Mono之后,本节将关注于如何创建这两个核心组件。我们将介绍多种创建Flux和Mono的方法,并提供相应的代码示例。

1.1.5.1 创建Flux

创建Flux的方法非常多,这些方式一般可以分为两大类,一类是充分利用Flux的静态方法,另一类则是动态创建Flux。

1.通过静态方法创建Flux。

just()

just()方法可以指定序列中包含的全部元素,创建出来的Flux序列在发布这些元素之后会自动结束。一般情况下,在已知元素数量和内容时,使用just()方法是最简单的做法。(更加详细的图文介绍可在Reactor官网了解)

fromArray()、fromIterable()、fromStream()

如果已经有了一个数组,一个Iterable对象或者Stream对象,那么可以通过Flux提供的fromArray()、fromIterable()、fromStream()方法从这些对象中自动创建Flux。(更加详细的图文介绍可在Reactor官网了解)

empty()、error()和never()

根据上一节我们介绍的关于Reactor一步序列的语义,可以使用empty()方法创建一个不包含任何元素而只发布消息的序列,也可以使用error()方法创建一个只包含错误消息的序列,还可以使用never()方法创建一个不包含任何消息的通知序列。(更加详细的图文介绍可在Reactor官网了解)

range()

使用range(int start,int count)方法可以创建包含从start开始的count个对象的序列,序列中的所有对象类型都是Integer,这在有些场景下非常有用。(更加详细的图文介绍可在Reactor官网了解)

interval()

在Reactor框架中,interval()方法表现为一个方法系列,其中interval(Duration period)方法用来创建一个包含从0开始递增的Long对象的序列,序列中的元素按照指定的时间间隔来发布。

而interval(Duration delay,Duration period)方法除了可以指定时间间隔,还可以指定起始元素发布之前的延迟时间。另外的intervalMillis(Long period)和intervalMillis(Long delay,Long period)与前面两个方法的作用相同,只不过这两个方法通过毫秒数来指定时间间隔和延迟时间。(更加详细的图文介绍可在Reactor官网了解)

通过静态方法创建Flux的一些代码示例:所有的代码的订阅者都统一指向System.out.println()方法,将结果打印在控制台。

Flux.just("Hello","World").subscribe(System.out::println());
System.out.println("------------------------------------------")

Flux.fromArray(new Integer[] {1,2,3}).subscribe(System.out::println());
System.out.println("------------------------------------------")

Flux.empty().subscribe(System.out::println());
System.out.println("------------------------------------------")

Flux.range(1,5).subscribe(System.out::println());
System.out.println("------------------------------------------")

执行以上代码我们可以控制台看到如下结果:

Hello
World
------------------------------------------
1
2
3
------------------------------------------
------------------------------------------
1
2
3
4
5
------------------------------------------

以上介绍的这些静态方法只适合于简单的序列生成,当生成的序列包含复杂的逻辑时,就需要采用动态方法来创建Flux。

2.动态创建Flux示例如下

动态创建Flux可以使用常见的generate()方法和create()方法。

generate()

generate()方法通过同步和逐一的方式来产生Flux序列,序列的产生依赖于Reactor框架的SynchronousSink组件。SynchronousSink组件包括next()、complete()和error(Throwable)三个核心方法。从SynchronousSink组件的命名上可以看到“同步”的含义,而“逐一”的含义是在具体的元素生成逻辑中,next()方法最多只能被调用一次。使用generate()方法创建Flux的代码示例如下:

Flux.generate(sink -> {
    sink.next("Hello");
    sink.complete();
}).subscribe(System.out::println());

控制台结果:

Hello

create()

create()方法和generate()方法不同之处在于前者使用的是FluxSink组件。FluxSink支持同步和异步的消息产生方式,并且可以在一次调用中产生多个元素。create()创建Flux的示例代码如下:

Flux.create(sink -> {
   for(int i =0; i<10; i++){
      sink.next(i);
   }
   sink.complete();
}).subscribe(System.out::println());

运行该程序我们会在控制台得到0到9的数字序列。

1.1.5.2 创建Mono

对于Mono而言很多创建Flux的方法同样适用,Mono组件也包含了与Flux组件相同的静态方法,比如just()、empty()、error()和never()等。除了这些方法之外,Mono还有一些特有的静态方法,比较常见的有delay()和justOrEmpty()等。

delay()

delay(Duration duration)和delayMillis(Long duration)方法可以用于创建一个Mono序列。他们的特点是,在置顶的延迟时间之后会产生数字0作为唯一值。(更加详细的图文介绍可在Reactor官网了解)

justOrEmpty()

justOrEmpty(Optional data)方法从一个Optional对象创建Mono,只有当Optional对象中包含值时,Mono序列才产生对应的元素。而justOrEmpty(T data)方法从一个可能为null的对象中创建Mono,只有对象补位null时,Mono序列才产生对应的元素。

justOrEmpty() 传入一个Optional对象实例代码:

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println());

想要动态创建Mno,同样可以通过create()方法并使用MonoSink组件,示例代码如下:

Mono.create(sink ->  sink.success("Hello")).subscribe(System.out::println());

 


 

推荐阅读(点击即可跳转阅读)

1.SpringBoot内容聚合

2.面试题内容聚合

3.设计模式内容聚合

4.Mybatis内容聚合

5.多线程内容聚合

09-11 17:45