本文共 15173 字,大约阅读时间需要 50 分钟。
MacBook Pro
java8+最近公司升级JDK11,导致一些代码不兼容,好奇的看到了下面这么一段代码;
Collector.of(() -> { Paginationpagination = new Pagination<>(); pagination.setPage(page); pagination.setSize(size); pagination.setTotal(total); return pagination; }, (pagination, elm) -> pagination.getContent().add(elm), (left, right) -> { left.getContent().addAll(right.getContent()); return left; })
看着上面的代码,心中有了些疑惑:
① 因为自己没有自定义过,很好奇,Collector.of()
方法的这些参数都有什么作用? ② 第二个参数和第三个参数,单看的话,感觉都是类似的。官方为什么这样设计? 最近花时间研究了一下,也算了理解了stream和 parallelStream底层原理;
下面我简单来分析下,记录下自己的理解通过IDEA
看查看源码,Collector.of()
里面有两个静态方法,先看参数少的:
public staticCollector of(Supplier supplier, BiConsumer accumulator, BinaryOperator combiner, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(characteristics); Set cs = (characteristics.length == 0) ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);}
可以看出,前面三个参数都是lambda
表达式形式的参数:
参数 | 说明 |
---|---|
Supplier | 技能:无中生有 ;作用:结果容器的提供者 |
BiConsumer | 技能:万箭齐发 ;作用:累计元素 |
BinaryOperator | 合并器 |
上面技能 只是为了方便自己记忆,比如无中生有,对应就是 无参,一个返回值 – Supplier
万箭齐发:对应就是 有一个参数 ,无返回值 – BiCunsumer; 正常情况下,其实有前面两个参数就够了,第三个参数是为了并行的情况,具体下面讲解;
第四个参数:当没有传characteristics
的时候,其默认为IDENTITY_FINISH
;
这个Collector
对象创建好了之后,就要给list.parallelStream().collect(xxCollector);
或者list.stream().collect(xxCollector);
这样的方法来使用了;
先写个测试代码;
import com.google.common.collect.Lists;import com.google.common.collect.Maps;import com.xingren.common.data.Pagination;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collector;import static java.util.stream.Collector.Characteristics.*;/** * @author yutao * @since 2020/7/23 3:02 下午 */public class CollTest { public static void main(String[] args) { Collector
在打断点的过程中,我发现,第三个参数居然没有执行,想知道答案就只能看源码了;
执行结果:我是第一个参数我是第二个参数{ size=2, page=1}
我个人的猜测:可能是被某个参数控制了,需要设置相应的参数,才会执行第三个参数;
public finalR collect(Collector collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { // 满足if里面三个条件才会执行 // parallelStream,并且设置CONCURRENT和UNORDERED,才会执行; container = collector.supplier().get(); BiConsumer accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { // 不符合上面的特殊情况就执行这个(既可以串行stream也可以是parallelStream) container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);}
按照我的测试代码,很显然,是执行container = evaluate(ReduceOps.makeRef(collector));
这段代码;
具体含义看代码注释;
ReduceOps.makeRef(collector)
这个方法,可以说做了非常关键的前期工作: makeRef(Collector collector) { Supplier supplier = Objects.requireNonNull(collector).supplier(); BiConsumer accumulator = collector.accumulator(); BinaryOperator combiner = collector.combiner(); class ReducingSink extends Box implements AccumulatingSink{ @Override // 说明后面的执行过程中,要执行第一个lambda,就得执行这个begin方法 public void begin(long size) { state = supplier.get(); } @Override // 要执行第二个lambda,就得执行这个accept方法 public void accept(T t) { accumulator.accept(state, t); } @Override // 要执行第三个lambda,就得执行这个combine方法 public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp (StreamShape.REFERENCE) { @Override // 因为三个参数都通过ReducingSink 类封装起来了, // 那么要使用,就得通过ReducingSink类,而这里刚好创建出来了; public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } };}
evaluate()
源码如下:
finalR evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) // 流已经被打开或提前关闭的情况下,抛出异常 throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
按照我的测试代码,这里是执行串行代码terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()))
;
这里面有个sourceSpliterator(terminalOp.getOpFlags())
方法,具体我也说不上来,大体看上去,感觉是记录操作的标记;
什么样的操作呢?比如
这些不是我这次重点要研究的,只要知道,这个方法并没有执行那三个combined
合并操作,去重操作DISTINCT
,排序操作ORDERED
,这些操作都记录在StreamOpFlag
枚举类中,lambda
表达式;
那回到evaluateSequential()
方法,其有很多类重载了:
ReduceOp
类中的重载方法: @OverridepublicR evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get();}
这里wrapAndCopyInto()
这个方法有两个参数,其中makeSink()
被调用的是源码刚开始入口evaluate()
方法中的作为参数的ReduceOps.makeRef(collector)
方法,这个方法在new ReduceOp
的时候,重载了makeSink()
;看源码得知,其创建了一个new ReducingSink()
对象;
// 这个是在源码入口处,创建好的new ReduceOp(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; }};
接着看wrapAndCopyInto
源码:
@Overridefinal> S wrapAndCopyInto(S sink, Spliterator spliterator) { // 实际干活的方法 copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink;}
这里面copyInto()
这个方法很关键,因为它将会实际去调,测试代码中的lambda
参数了;
finalvoid copyInto(Sink wrappedSink, Spliterator spliterator) { Objects.requireNonNull(wrappedSink); // 这是短路判断,findAny、limit等类似操作,即为短路操作 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { // 调用第一个参数的lambda式 wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 调用第二个参数的lambda式 spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { // 执行短路操作 copyIntoWithCancel(wrappedSink, spliterator); }}
这段代码很简单,上面已经写了注释;执行完了之后,基本就结束了;
但是却没有看到第三个调用方法在哪里? 这我就很疑惑了,原以为第三个参数是某个参数控制的,看来是想错了,并不是;但是我们已经知道,在执行第三个lambda,就得调用之前ReducingSink
combine(ReducingSink other)
;我们按住ctrl+ 单击
这个重载方法; 我们会在ReduceOps
类里有么一段代码:
@Overridepublic void onCompletion(CountedCompleter caller) { if (!isLeaf()) { S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller);}
也就是说,要执行第三个参数,就得执行onCompletion()
这个方法;
leftResult.combine(rightChild.getLocalResult());
这行代码,字面意思就是:将左边的结果与右边的结果进行合并; 为什么要合并呢?
无意中看到了这个方法所属的类ReduceTask
上面有行注释:
/** * // 大体的意思就是这个操作是用于parallel的 * A {@code ForkJoinTask} for performing a parallel reduce operation. */@SuppressWarnings("serial")private static final class ReduceTask> extends AbstractTask > {
看到这句注释,恍然大悟,原来第三个参数是用于并行操作的;
难怪 第二个参数和第三个参数有点类似;因为其开多线程去进行计算,计算完成后,当然得合并结果啦;
所以第三个参数的就是:结果容器的合并器
既然是并行操作,那把测试代码改为parallelStream()
方法再试一次,
我是第一个参数我是第二个参数{size=2, page=1}
那就不得不再次调试代码,看看并发操作时,代码是如何执行的,毕竟官方注释应该不会骗人的;
源码入口,和之前类似,依然是执行container = evaluate(ReduceOps.makeRef(collector));
这段代码;
terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
这段代码; 之后: ReduceOp
里面的重载方法;源码如下: @OverridepublicR evaluateParallel(PipelineHelper helper, Spliterator spliterator) { // 这里创建了一个ReduceTask,记住了 // 后面重载方法和它有关 return new ReduceTask<>(this, helper, spliterator).invoke().get();}
可以看到其创建了一个任务对象ReduceTask
;接着调用的invoke()
方法,来自ForkJoinTask
类;
ForkJoin pool
框架; 接下来就是一系列调用:
invoke()源码:public final V invoke() { int s; // 看到doInvoke if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult();}// doInvoke()源码private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; // 看到doExec()方法 return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone();}// doExec()源码:final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { // 看到这里 接下来又是个重载方法,会有很多实现类 completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s;}
CountedCompleter
这个类,进去看后,发现又是一层调用: protected final boolean exec() { // 这又是一个重载方法,有很多的实现类 compute(); return false;}
还记得之前代码步骤中创建了一个ReduceTask
对象不?正常情况应该找这个类的,
AbstractTask
中去找; 这个时候你会发现如下源码:
@Overridepublic void compute() { Spliteratorrs = spliterator, ls; // right, left spliterators // 读取分割迭代器的估计总元素个数 long sizeEstimate = rs.estimateSize(); // 读取单个任务的元素上限 long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; // 按照我的测试代码,sizeEstimate = 1, sizeThreshold 也为1 // 所以我的测试代码不会走while循环 // 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; // 基于leftSpliterator 创建子任务并写入 leftChild task.leftChild = leftChild = task.makeChild(ls); // 基于 rightSpliterator 创建子任务并写入 leftChild task.rightChild = rightChild = task.makeChild(rs); // 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】 task.setPendingCount(1); // 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】 if (forkRight) { forkRight = false; // 更新待分割的 spliterator rs = ls; // 待处理的任务 task = leftChild; // 待 fork 进线程池并行处理的任务 taskToFork = rightChild; } else { forkRight = true; // 待处理的任务 task = rightChild; // 待 fork 进线程池并行处理的任务 taskToFork = leftChild; } taskToFork.fork(); // 读取待分割 spliterator 的估计总元素个数 sizeEstimate = rs.estimateSize(); } // 此任务已经是叶子任务,则执行计算逻辑 task.setLocalResult(task.doLeaf()); // 此任务计算完毕后尝试完成主任务 // 这个方法将会执行第三个参数 task.tryComplete(); }
根据我的测试代码,while
循环是不会进入的,因为就一个任务
~
接着执行程序就到了这段代码 task.setLocalResult(task.doLeaf());
task.doLeaf()
是执行叶子任务; ReduceTask
类,因为之前步骤创建的就是这个类对象嘛! @Overrideprotected S doLeaf() { return helper.wrapAndCopyInto(op.makeSink(), spliterator);}
看到这里是不是有点熟悉了,这个wrapAndCopyInto()
方法和串行里的是同一个;
tryComplete()
方法;源码如下: public final void tryComplete() { CountedCompleter a = this, s = a; for (int c;;) { if ((c = a.pending) == 0) { // 终于看到这个方法了, // 因为只有调用这个方法,第三个参数(lambda表达式)才有可能会被执行 a.onCompletion(s); if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; }}
这里我们终于看到了onCompletion()
方法,按照我们之前分析的,只有执行了这个方法,第三个参数(lambda表达式)才有可能会被执行;
@Overridepublic void onCompletion(CountedCompleter caller) { // 是否是叶子任务 // 因为测试代码就只有一条数据,所以肯定是叶子任务, // 任务并没有拆分 if (!isLeaf()) { S leftResult = leftChild.getLocalResult(); leftResult.combine(rightChild.getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super.onCompletion(caller);}
继续执行代码,会发现,我的测试代码并没有执行到leftResult.combine(rightChild.getLocalResult());
这段代码;
if
里面去;为什么呢?因为测试代码List
里面只有一条数据,即使是并行流,也不存在左右结果容器进行合并的问题,所以没有执行; 至此大体上我们终于明白底层调用过程;
大体思路:
① 串行方式,其不会创建XXXtask
(如:ReduceTask
),而是直接调用AbstractPipeline
类中的wrapAndCopyInto()
方法去执行第一个和第二个参数;也就是Supplier
和Biconsumer
;第三个lambda表达式不会执行;
接上图
xxxTask
类来执行相应任务,然后通过invoke()
一系列的调用后,分解任务,并利用多线程的方式来执行;重点是doLeaf()
和tryComplete()
方法; 接上面的第一幅图
虽然我们已经知道tryComplete()
这个方法会去调第三个参数;
try
,表示尝试,也就是说其会尝试去完成; 那么是不是也有类似complete()
这样的方法呢? 在tryComplete()
源码附近找找,你会发现:propagateCompletion()
、complete(T rawResult)
等这样的方法;
其中propagateCompletion()
方法,和tryComplete()
方法是高度类似的,唯一的区别就是没有调用a.onCompletion(s);
这个方法;
这就是说,propagateCompletion()
这个方法是不会调用第三个参数的;那这是什么场景呢?
这个场景,其实源码已经写明了,就是入口出的那个if
判断
if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u));}
也就是上面这段代码里if
的条件:
parallelStream
②操作指示标识为CONCURRENT
③无序 UNORDERED
这三个条件同时成立的情况下,也不会执行第三个参数;具体实现方式,就是只使用一个结果容器;也就是第一个参数提供的容器;
第三个参数的作用就是合并多个结果容器;
我把这种情况称为特殊情况,毕竟通常情况下,我们不会去设置CONCURRENT
这个参数;
特殊场景:无序,任务之间执行顺序和最终结果没有关系,并发,意思子任务之间没有关联;
最后给上我自己画的完整的流程图
① 源码调试后,我们发现没有加锁的地方,也没有CAS
那种自旋方式来保证线程安全,
② 基于①可以得出使用并行方式:各个子任务之间不能有依赖关系,不能有公用的东西;并且任务调用顺序不能影响到结果,否则就使用串行方式;
③ 串行执行是不会执行第三个参数
④ 并行执行里有个特殊情况,在无序和设置指示标识为CONCURRENT
情况下,那么就只有一个结果容器,这时也不会执行第三个参数(lambda表达式);
参考地址:
转载地址:http://vryh.baihongyu.com/