博客
关于我
Java流式操作中Collect收集器的源码分析
阅读量:321 次
发布时间:2019-03-04

本文共 15173 字,大约阅读时间需要 50 分钟。

环境

MacBook Pro

java8+

前言

最近公司升级JDK11,导致一些代码不兼容,好奇的看到了下面这么一段代码;

Collector.of(() -> {                       Pagination
pagination = new Pagination<>(); pagination.setPage(page); pagination.setSize(size); pagination.setTotal(total); return pagination; }, (pagination, elm) -> pagination.getContent().add(elm), (left, right) -> { left.getContent().addAll(right.getContent()); return left; })

看着上面的代码,心中有了些疑惑:

① 因为自己没有自定义过,很好奇,Collector.of()方法的这些参数都有什么作用?
② 第二个参数和第三个参数,单看的话,感觉都是类似的。官方为什么这样设计?

最近花时间研究了一下,也算了理解了stream和 parallelStream底层原理;

下面我简单来分析下,记录下自己的理解

Collector

通过IDEA看查看源码,Collector.of()里面有两个静态方法,先看参数少的:

public static
Collector
of(Supplier
supplier, BiConsumer
accumulator, BinaryOperator
combiner, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(characteristics); Set
cs = (characteristics.length == 0) ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);}

可以看出,前面三个参数都是lambda表达式形式的参数:

参数 说明
Supplier 技能:无中生有;作用:结果容器的提供者
BiConsumer 技能:万箭齐发;作用:累计元素
BinaryOperator 合并器

上面技能 只是为了方便自己记忆,比如无中生有,对应就是 无参,一个返回值 – Supplier

万箭齐发:对应就是 有一个参数 ,无返回值 – BiCunsumer;
正常情况下,其实有前面两个参数就够了,第三个参数是为了并行的情况,具体下面讲解;

第四个参数:当没有传characteristics的时候,其默认为IDENTITY_FINISH

这个Collector 对象创建好了之后,就要给list.parallelStream().collect(xxCollector);或者list.stream().collect(xxCollector);这样的方法来使用了;

collect()

先写个测试代码;

import com.google.common.collect.Lists;import com.google.common.collect.Maps;import com.xingren.common.data.Pagination;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collector;import static java.util.stream.Collector.Characteristics.*;/** * @author yutao * @since 2020/7/23 3:02 下午 */public class CollTest {       public static void main(String[] args) {           Collector
, Map
, Map
> mapSupplier = Collector.of(() -> { Map
map = new HashMap<>(); map.put("page", "1"); map.put("size", "2"); System.out.println("我是第一个参数"); return map; }, (t1, t2) -> { System.out.println("我是第二个参数"); t2.putAll(t1); }, (m1, m2) -> { Map
map = new HashMap<>(); map.putAll(m1); map.putAll(m2); System.out.println("我是第三个参数"); return map; }, UNORDERED); List
> ll = Lists.newArrayList(); Map
map = new HashMap<>(); map.put("page", "7"); map.put("size", "2"); map.put("yy", "3"); map.put("tt", "10"); map.put("ss", "100"); ll.add(map); Map
collect = ll.stream().collect(mapSupplier); System.out.println(collect); }}

在打断点的过程中,我发现,第三个参数居然没有执行,想知道答案就只能看源码了;

执行结果:

我是第一个参数我是第二个参数{   size=2, page=1}

我个人的猜测:可能是被某个参数控制了,需要设置相应的参数,才会执行第三个参数;

断点调试

public final 
R collect(Collector
collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { // 满足if里面三个条件才会执行 // parallelStream,并且设置CONCURRENT和UNORDERED,才会执行; container = collector.supplier().get(); BiConsumer
accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { // 不符合上面的特殊情况就执行这个(既可以串行stream也可以是parallelStream) container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);}

按照我的测试代码,很显然,是执行container = evaluate(ReduceOps.makeRef(collector));这段代码;

具体含义看代码注释;

ReduceOps.makeRef(collector)这个方法,可以说做了非常关键的前期工作:

makeRef(Collector
collector) { Supplier supplier = Objects.requireNonNull(collector).supplier(); BiConsumer
accumulator = collector.accumulator(); BinaryOperator
combiner = collector.combiner(); class ReducingSink extends Box implements AccumulatingSink
{ @Override // 说明后面的执行过程中,要执行第一个lambda,就得执行这个begin方法 public void begin(long size) { state = supplier.get(); } @Override // 要执行第二个lambda,就得执行这个accept方法 public void accept(T t) { accumulator.accept(state, t); } @Override // 要执行第三个lambda,就得执行这个combine方法 public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp
(StreamShape.REFERENCE) { @Override // 因为三个参数都通过ReducingSink 类封装起来了, // 那么要使用,就得通过ReducingSink类,而这里刚好创建出来了; public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } };}

evaluate()源码如下:

final 
R evaluate(TerminalOp
terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) // 流已经被打开或提前关闭的情况下,抛出异常 throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

按照我的测试代码,这里是执行串行代码terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()))

这里面有个sourceSpliterator(terminalOp.getOpFlags())方法,具体我也说不上来,大体看上去,感觉是记录操作的标记;

什么样的操作呢?比如combined合并操作,去重操作DISTINCT,排序操作ORDERED,这些操作都记录在StreamOpFlag枚举类中,

这些不是我这次重点要研究的,只要知道,这个方法并没有执行那三个lambda表达式;

那回到evaluateSequential()方法,其有很多类重载了:

在这里插入图片描述
这里选择ReduceOp类中的重载方法:

@Overridepublic 
R evaluateSequential(PipelineHelper
helper, Spliterator
spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get();}

这里wrapAndCopyInto()这个方法有两个参数,其中makeSink()被调用的是源码刚开始入口evaluate()方法中的作为参数的ReduceOps.makeRef(collector)方法,这个方法在new ReduceOp的时候,重载了makeSink();看源码得知,其创建了一个new ReducingSink()对象;

// 这个是在源码入口处,创建好的new ReduceOp
(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; }};

接着看wrapAndCopyInto源码:

@Overridefinal 
> S wrapAndCopyInto(S sink, Spliterator
spliterator) { // 实际干活的方法 copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink;}

这里面copyInto()这个方法很关键,因为它将会实际去调,测试代码中的lambda参数了;

final 
void copyInto(Sink
wrappedSink, Spliterator
spliterator) { Objects.requireNonNull(wrappedSink); // 这是短路判断,findAny、limit等类似操作,即为短路操作 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { // 调用第一个参数的lambda式 wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 调用第二个参数的lambda式 spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { // 执行短路操作 copyIntoWithCancel(wrappedSink, spliterator); }}

这段代码很简单,上面已经写了注释;执行完了之后,基本就结束了;

但是却没有看到第三个调用方法在哪里?
这我就很疑惑了,原以为第三个参数是某个参数控制的,看来是想错了,并不是;

但是我们已经知道,在执行第三个lambda,就得调用之前ReducingSink

类中重载方法combine(ReducingSink other);我们按住ctrl+ 单击 这个重载方法;

我们会在ReduceOps类里有么一段代码:

@Overridepublic void onCompletion(CountedCompleter
caller) { if (!isLeaf()) { S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller);}

也就是说,要执行第三个参数,就得执行onCompletion()这个方法;

具体看到leftResult.combine(rightChild.getLocalResult());这行代码,字面意思就是:将左边的结果与右边的结果进行合并;

为什么要合并呢?

无意中看到了这个方法所属的类ReduceTask上面有行注释:

/**  * // 大体的意思就是这个操作是用于parallel的  * A {@code ForkJoinTask} for performing a parallel reduce operation.  */@SuppressWarnings("serial")private static final class ReduceTask
> extends AbstractTask
> {

看到这句注释,恍然大悟,原来第三个参数是用于并行操作的;

难怪 第二个参数和第三个参数有点类似;

因为其开多线程去进行计算,计算完成后,当然得合并结果啦;

所以第三个参数的就是:结果容器的合并器

既然是并行操作,那把测试代码改为parallelStream()方法再试一次,

然后发现居然还是没有执行第三参数;
结果依然为:

我是第一个参数我是第二个参数{size=2, page=1}

那就不得不再次调试代码,看看并发操作时,代码是如何执行的,毕竟官方注释应该不会骗人的;

并行代码调试

源码入口,和之前类似,依然是执行container = evaluate(ReduceOps.makeRef(collector));这段代码;

之后,就有所不同了,因为是并行,接着其执行的是terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))这段代码;
之后:

在这里插入图片描述

其会执行ReduceOp里面的重载方法;源码如下:

@Overridepublic 
R evaluateParallel(PipelineHelper
helper, Spliterator
spliterator) { // 这里创建了一个ReduceTask,记住了 // 后面重载方法和它有关 return new ReduceTask<>(this, helper, spliterator).invoke().get();}

可以看到其创建了一个任务对象ReduceTask;接着调用的invoke()方法,来自ForkJoinTask类;

这个类,想必有人会觉得眼熟,因为Java 流式操作中的并发操作使用的就是ForkJoin pool框架;

接下来就是一系列调用:

invoke()源码:

public final V invoke() {       int s;    // 看到doInvoke    if ((s = doInvoke() & DONE_MASK) != NORMAL)        reportException(s);    return getRawResult();}// doInvoke()源码private int doInvoke() {      int s; Thread t; ForkJoinWorkerThread wt;   // 看到doExec()方法   return (s = doExec()) < 0 ? s :       ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?       (wt = (ForkJoinWorkerThread)t).pool.       awaitJoin(wt.workQueue, this, 0L) :       externalAwaitDone();}// doExec()源码:final int doExec() {      int s; boolean completed;   if ((s = status) >= 0) {          try {          	   // 看到这里 接下来又是个重载方法,会有很多实现类           completed = exec();       } catch (Throwable rex) {              return setExceptionalCompletion(rex);       }       if (completed)           s = setCompletion(NORMAL);   }   return s;}

在这里插入图片描述

这里看到CountedCompleter这个类,进去看后,发现又是一层调用:

protected final boolean exec() {      // 这又是一个重载方法,有很多的实现类    compute();    return false;}

在这里插入图片描述

还记得之前代码步骤中创建了一个ReduceTask对象不?正常情况应该找这个类的,

但是发现没有,说明这个类没有重写这个方法,使用的是父类;那么我们就去父类AbstractTask中去找;

这个时候你会发现如下源码:

@Overridepublic void compute() {        Spliterator
rs = spliterator, ls; // right, left spliterators // 读取分割迭代器的估计总元素个数 long sizeEstimate = rs.estimateSize(); // 读取单个任务的元素上限 long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; // 按照我的测试代码,sizeEstimate = 1, sizeThreshold 也为1 // 所以我的测试代码不会走while循环 // 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; // 基于leftSpliterator 创建子任务并写入 leftChild task.leftChild = leftChild = task.makeChild(ls); // 基于 rightSpliterator 创建子任务并写入 leftChild task.rightChild = rightChild = task.makeChild(rs); // 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】 task.setPendingCount(1); // 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】 if (forkRight) { forkRight = false; // 更新待分割的 spliterator rs = ls; // 待处理的任务 task = leftChild; // 待 fork 进线程池并行处理的任务 taskToFork = rightChild; } else { forkRight = true; // 待处理的任务 task = rightChild; // 待 fork 进线程池并行处理的任务 taskToFork = leftChild; } taskToFork.fork(); // 读取待分割 spliterator 的估计总元素个数 sizeEstimate = rs.estimateSize(); } // 此任务已经是叶子任务,则执行计算逻辑 task.setLocalResult(task.doLeaf()); // 此任务计算完毕后尝试完成主任务 // 这个方法将会执行第三个参数 task.tryComplete(); }

根据我的测试代码,while循环是不会进入的,因为就一个任务~

接着执行程序就到了这段代码 task.setLocalResult(task.doLeaf());

其中task.doLeaf()是执行叶子任务;
在这里插入图片描述
发现又有很多重载类,看到ReduceTask类,因为之前步骤创建的就是这个类对象嘛!

@Overrideprotected S doLeaf() {       return helper.wrapAndCopyInto(op.makeSink(), spliterator);}

看到这里是不是有点熟悉了,这个wrapAndCopyInto()方法和串行里的是同一个;

按照我们之前的分析,这个方法主要就是执行前面两个参数;
接着我们重点看到tryComplete()方法;源码如下:

public final void tryComplete() {       CountedCompleter
a = this, s = a; for (int c;;) { if ((c = a.pending) == 0) { // 终于看到这个方法了, // 因为只有调用这个方法,第三个参数(lambda表达式)才有可能会被执行 a.onCompletion(s); if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; }}

这里我们终于看到了onCompletion()方法,按照我们之前分析的,只有执行了这个方法,第三个参数(lambda表达式)才有可能会被执行;

@Overridepublic void onCompletion(CountedCompleter
caller) { // 是否是叶子任务 // 因为测试代码就只有一条数据,所以肯定是叶子任务, // 任务并没有拆分 if (!isLeaf()) { S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller);}

继续执行代码,会发现,我的测试代码并没有执行到leftResult.combine(rightChild.getLocalResult());这段代码;

因为就没有进入到if里面去;为什么呢?因为测试代码List里面只有一条数据,即使是并行流,也不存在左右结果容器进行合并的问题,所以没有执行;

至此大体上我们终于明白底层调用过程;

大体思路:

① 串行方式,其不会创建XXXtask(如:ReduceTask),而是直接调用AbstractPipeline类中的wrapAndCopyInto()方法去执行第一个和第二个参数;也就是SupplierBiconsumer;第三个lambda表达式不会执行;

在这里插入图片描述

接上图

在这里插入图片描述
接上图
在这里插入图片描述
②并行方式,其会创建xxxTask类来执行相应任务,然后通过invoke()一系列的调用后,分解任务,并利用多线程的方式来执行;重点是doLeaf()tryComplete()方法;

接上面的第一幅图

在这里插入图片描述
接上图:

在这里插入图片描述

虽然我们已经知道tryComplete()这个方法会去调第三个参数;

但是你有没有好奇,这个方法的名称,前面有个try,表示尝试,也就是说其会尝试去完成;
那么是不是也有类似complete()这样的方法呢?

tryComplete()源码附近找找,你会发现:propagateCompletion()complete(T rawResult)等这样的方法;

其中propagateCompletion()方法,和tryComplete()方法是高度类似的,唯一的区别就是没有调用a.onCompletion(s);这个方法;

这就是说,propagateCompletion()这个方法是不会调用第三个参数的;那这是什么场景呢?

特殊情况

这个场景,其实源码已经写明了,就是入口出的那个if判断

if (isParallel()        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {       container = collector.supplier().get();    BiConsumer
accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u));}

也就是上面这段代码里if的条件:

① 并行流parallelStream
②操作指示标识为CONCURRENT
③无序 UNORDERED

这三个条件同时成立的情况下,也不会执行第三个参数;具体实现方式,就是只使用一个结果容器;也就是第一个参数提供的容器;

第三个参数的作用就是合并多个结果容器;

我把这种情况称为特殊情况,毕竟通常情况下,我们不会去设置CONCURRENT这个参数;

特殊场景:无序,任务之间执行顺序和最终结果没有关系,并发,意思子任务之间没有关联;


最后给上我自己画的完整的流程图

在这里插入图片描述

总结

① 源码调试后,我们发现没有加锁的地方,也没有CAS那种自旋方式来保证线程安全,

所以并行流是非线程安全的;

② 基于①可以得出使用并行方式:各个子任务之间不能有依赖关系,不能有公用的东西;并且任务调用顺序不能影响到结果,否则就使用串行方式;

③ 串行执行是不会执行第三个参数

④ 并行执行里有个特殊情况,在无序和设置指示标识为CONCURRENT情况下,那么就只有一个结果容器,这时也不会执行第三个参数(lambda表达式);

参考地址:

转载地址:http://vryh.baihongyu.com/

你可能感兴趣的文章
mysql中的行转列
查看>>
shell脚本中冒泡排序、直接排序、反转排序
查看>>
WPS及Excel中Alt键的妙用 快捷键
查看>>
C - 食物链 并查集
查看>>
Pycharm 常用快捷键
查看>>
ValueError: check_hostname requires server_hostname
查看>>
基于LabVIEW的入门指南
查看>>
PCB布局系列汇总
查看>>
电阻入门知识
查看>>
电容入门知识
查看>>
C++面向对象
查看>>
专题(七)贪心——AcWing 112. 雷达设备
查看>>
深入理解JVM(一)JVM概述、类的声明周期、JVM整体架构、JMM、volatile
查看>>
2020.9.12 SSL普及组模拟(第4题)(树)(暴力邻接表80)
查看>>
2019CCPC女生专场赛_K - Tetris_打表/模拟_暴力之王
查看>>
HDU1559(二维前缀和模板 Java&C++)
查看>>
IIS express web 无法启动服务器
查看>>
“/”应用程序中的服务器错误。
查看>>
MUI之ajax获取后台接口数据
查看>>
使用sqlserver 查询不连续的数据
查看>>