一起来学Java8(七)——Stream(中)我们学习了Stream.collect的用法,今天我们来学习下Stream.reduce的用法。

reduce操作可以理解成对Stream中元素累计处理,它有三个重载方法。

  • 重载1:Optional<T> reduce(BinaryOperator<T> accumulator);
  • 重载2:T reduce(T identity, BinaryOperator<T> accumulator);
  • 重载3:<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);

reduce(accumulator)

先来看下重载1方法,这个方法需要我们传入一个参数,参数名字定义为收集器,顾名思义是需要我们对元素进行收集。

下面通过一个数值累加的例子来说明reduce的基本用法。

Optional<Integer> opt = Stream.of(1, 2, 3, 4)
		.reduce((n1, n2) -> {
			int ret = n1 + n2;
			System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret);
			return ret;
		});
int sum = opt.orElse(0);
System.out.println("sum:" + sum);

打印:

1(n1) + 2(n2) = 3
3(n1) + 3(n2) = 6
6(n1) + 4(n2) = 10
sum:10

这个例子中对Stream例子中的三个数字进行相加,得到总和。最后返回一个Optional<Integer>对象是因为考虑到Stream中没有元素的情况,因此返回结果是未知的,应该由开发者来确定返回值。

在Lambda表达式中提供了两个参数n1,n2。从打印结果中可以看出,n1,n2最开始分别是Stream中第一,第二两个元素,把这两个数进行相加后返回,然后带着这个结果再次进入到Lambda表达式中,n1是前一次相加后的值,n2是下一个元素值。

这段代码效果等同于:

int[] arr = { 1, 2, 3, 4};
int sum = 0;
for (int i : arr) {
	sum += i;
}

reduce(identity, accumulator, combiner)

再来看下重载3,这个方法有三个参数,每个参数说明如下:

  • identity:给定一个初始值
  • accumulator:基于初始值,对元素进行收集归纳
  • combiner:对每个accumulator返回的结果进行合并,此参数只有在并行模式中生效。

使用Stream.parallel()的方法开启并行模式,使用Stream.sequential()开启串行模式,默认开启的是串行模式。

并行模式可以简单理解为在多线程中执行,每个线程中单独执行它的任务。串行则是在单一线程中顺序执行。

下面来看下重载3的例子:

int sum = Stream.of(1, 2, 3, 4)
		.reduce(0, (n1, n2) -> {
			int ret = n1 + n2;
			System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret);
			return ret;
		}, (s1, s2) -> {
			int ret = s1 + s2;
			System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret);
			return ret;
		});
System.out.println("sum:" + sum);

打印:

0(n1) + 1(n2) = 1
1(n1) + 2(n2) = 3
3(n1) + 3(n2) = 6
6(n1) + 4(n2) = 10
sum:10

可以看到,在串行模式下并没运行combiner参数,只运行了accumulator参数,从给定的初始值0开始累加。

这里已经指定了初始值(identity),因此返回类型就是初始值的类型。

我们把例子改成并行模式,然后看下执行结果。

int sum = Stream.of(1, 2, 3, 4)
		.parallel() // 并行模式
		.reduce(0, (n1, n2) -> {
			int ret = n1 + n2;
			System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret);
			return ret;
		}, (s1, s2) -> {
			int ret = s1 + s2;
			System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret);
			return ret;
		});
System.out.println("sum:" + sum);

打印:

0(n1) + 3(n2) = 3
0(n1) + 1(n2) = 1
0(n1) + 2(n2) = 2
1(s1) + 2(s2) = 3
0(n1) + 4(n2) = 4
3(s1) + 4(s2) = 7
3(s1) + 7(s2) = 10
sum:10

从打印的结果中我们可以看到几个现象:

  1. combiner参数被执行了
  2. 打印的内容是无序的,说明它们在多线程环境下执行的
  3. n1参数始终是0

因为是并行模式,前2个现象很好理解,那为什么n1参数始终是0?

因为开了并行模式后,运行reduce方法的底层是使用了ForkJoinPool(分支/合并框架)。

分支/合并框架的原理是将一个大任务拆分成多个子任务,这些子任务并行处理自己的事情,然后框架将这些子任务的结果合并起来,生成一个最终结果。

每个子任务之间是没有关联的,它们的执行状态都是一样的,因此每个子任务给到的初始值(identity)都是一样的,在本例中是0

同时需要一个合并方法用来合并每个子任务的处理结果,然后最终返回,使用数学表达式即为:

(0+1) + (0+2) + (0+3) + (0+4) = 10

再来看下重载3这个方法签名,每个参数的分工都明确了。

reduce(identity, accumulator, combiner)

  • identity:初始值
  • accumulator:每个子任务执行的操作
  • combiner:合并每个子任务的结果

注意事项

查看reduce方法文档,发现有下面一段话:

Performs a reduction on the elements of this stream,
using the provided identity value and an associative accumulation function,
and returns the reduced value. This is equivalent to:

T result = identity;
for (T element : this stream)
 result = accumulator.apply(result, element)
return result;

The identity value must be an identity for the accumulator function.
This means that for all t, accumulator.apply(identity, t) is equal to t.
The accumulator function must be an associative function.

其中有一句重要的话:This means that for all t, accumulator.apply(identity, t) is equal to t.

简单来说,必须要满足下面这个公式:

accumulator.apply(identity, t) == t

如果不满足的话,在并行模式下执行accumulator会有问题。

我们把上一个例子中的初始值改成1,然后看看执行结果

int sum = Stream.of(1, 2, 3, 4)
		.parallel()
		// 这里改成了1
		.reduce(1, (n1, n2) -> {
			int ret = n1 + n2;
			System.out.println(n1 + "(n1) + " + n2 + "(n2) = " + ret);
			return ret;
		}, (s1, s2) -> {
			int ret = s1 + s2;
			System.out.println(s1 + "(s1) + " + s2 + "(s2) = " + ret);
			return ret;
		});
System.out.println("sum:" + sum);

打印:

1(n1) + 3(n2) = 4
1(n1) + 4(n2) = 5
4(s1) + 5(s2) = 9
1(n1) + 2(n2) = 3
1(n1) + 1(n2) = 2
2(s1) + 3(s2) = 5
5(s1) + 9(s2) = 14
sum:14

理想中的结果应该是11才对,即1 + 1 + 2 + 3 + 4。可以看到在并行模式下对identity的值是有要求的。必须满足公式:accumulator.apply(identity, t) == t

这里accumulator.apply(identity, t) == t即为:accumulator.apply(1, 1) == 1,使用数学表达式表示:

1(identity) + 1 == 1

显然这个等式是不成立的,把identity改成0则公式成立:0 + 1 == 1

紧接着,对于combiner参数,需要满足另一个公式:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

  • t:表示第一个参数
  • u:表示第二个参数

在这个例子中,我们取第一次执行combiner情况: 4(s1) + 5(s2) = 9,套用公式即为:

combiner.apply(5, accumulator.apply(1, 4)) == accumulator.apply(5, 4)

在这里u=5,identity=1,t=4

转换成数学表达式为:5 + (1 + 4) == 5 + 4

显然这个等式是不成立的,把identity改成0,等式就成立了:5 + (0 + 4) == 5 + 4

总结一下

使用reduce(identity, accumulator, combiner)方法时,必须同时满足下面两个公式:

  • 公式1,针对accumulator:accumulator.apply(identity, t) == t
  • 公式2,针对combiner:combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

reduce(identity, accumulator)

这个方法其实是reduce(identity, accumulator, combiner)的一种特殊形式,只不过是把combiner部分用accumulator来代替了,即

reduce(identity, accumulator)等同于reduce(identity, accumulator, accumulator)

因此reduce(identity, accumulator)的使用方式和注意事项是跟reduce(identity, accumulator, combiner)一样的,这里不再赘述。

小节

本篇主要讲解了Stream.reduce的使用方法及注意事项,在并行模式下,reduce是使用分支/合并框架实现的,在下一篇文章中我们开始学习分支/合并框架

11-02 19:08