jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读从java8以后,jdk底层新增了StreamAPI,使我们写出更简洁、干净的代码来对集合、数组等进行操作,先简单介绍下流的几个概念:Stream(流)是一个来自数据源的元素队列并支持聚合操作元素是特定类型的对象,形成一个队列。Java中的Stream并不会存储元素,而是按需计算。 数据源流的来源。可以是集合,数组,I/Ochannel,产生器generator等。 聚…

大家好,欢迎来到IT知识分享网。

从java 8以后,jdk底层新增了Stream API,使我们写出更简洁、干净的代码来对集合、数组等进行操作,先简单介绍下流的几个概念:

Stream(流)是一个来自数据源的元素队列并支持聚合操作

  • 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
  • 数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
  • 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。

和以前的Collection操作不同, Stream操作还有两个基础的特征:

  • Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
  • 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。

下面首先看一下jdk源码中如何实现返回流对象(Stream),打开Collection接口源码,可以Collection增加了以下两个方法:

jdk8 stream与parallelStream实现源码阅读

其中可以看到,多出了我们经常使用的两个方法,stream和parallelStream方法,都返回了该集合的Stream对象(该接口中包含了许多操作流的方法,这里不一一列举,详细用法看API,在这里主要阅读源码理解其实现),一看名称,就能大概猜出来其中一个是串行流,另一个是并行流。并且接口中给了这两个方法的默认实现。其中采用了StreamSupport的stream方法,这个类也是java 8新增的一个底层创建和操作Stream的工具类。

jdk8 stream与parallelStream实现源码阅读

那这个StreamSupport中又是做了些啥呢?继续跟代码下去就会发现,它根据提供的元素分割器(Spliterator)和是否并行创建了一个Stream的实现类,ReferencePipeline的静态内部实现类Head类,这个类重写了ReferencePipeline类的几个方法,其中我们主要关注下图两个方法:

jdk8 stream与parallelStream实现源码阅读

forEach和forEachOrderd方法,可以看出其实现大概相同,都是先判断了该Stream是否是并行流,不是的话直接执行原分割迭代器的forEachRemaining方法,对该流的元素进行一一消费处理,是的话则调用父类(ReferencePipeline)的对应处理方法,以其中forEach为例,ReferencePipeline类中的forEach实现如下:

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

传入了TerminalOp的实现ForEachOp,其中TerminalOp有多种实现,其中包含了下图实现类:

jdk8 stream与parallelStream实现源码阅读

forEach实现类为ForEachOp的OfRef类,其中定义了对单个元素的Consumer,其中两者最大的不同为,并行流调用了evaluateParallel方法,而串行流调用了evaluateSequential方法,两个方法的实现如下:

jdk8 stream与parallelStream实现源码阅读

其中串行流的forEach方式源码实现如下:

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

可以看到关键方法还是在本线程中调用wrappedSink(为Consumer的实现类)的forEachRemaining或forEachWithCancel方法依次消费流中的元素。

并行流中相对复杂,分为有序和无序分别实现了两个基于CountedCompleter的实现类ForEachOrderedTask和ForEachTask,并调用了其共同父类ForkJoinTask的invoke方法,下图来看看这两个invoke方法中都做了些什么:

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

可以看到invoke方法中调用了执行方法doInvoke,doInvoke方法中又调用了doExec方法同步执行CountedCompleter的exec方法,其中执行了一个compute抽象方法并返回了false,我们可以在CountedCompleter其中一个实现中,找到compute方法的实现,下面以ForEachTask为例继续阅读:

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

可以看到,其中调用了ForkJoinTask的fork方法,这就应该很熟悉了,这是ForkJoinPool线程池API推荐的用法,用来并行计算的线程池:

jdk8 stream与parallelStream实现源码阅读

该方法中,首先判断当前线程是不是ForkJoinWorkerThread,如果是,直接将该任务加入到本线程的线程的ForkJoinPool.WorkQueue中,交给本线程引用ForkJoinPool线程池执行。否则交给ForkJoinPool自带的common公用线程池执行。

jdk8 stream与parallelStream实现源码阅读

其中common的最小划分为1(等同于核心线程数),最大线程数为当前系统可用核心数-1个(–除非超过32767,应该很少见吧),读到这里可以看出,系统中所有parallelStream流执行方法底层都是通过ForkJoinPool并行计算来提高效率的,我们也可以通过自己手动创建ForkJoinTask的任务来提交到自定义ForkJoinPool线程池,合理分配线程达到提高单个集合计算的效率。

读完了stream和parallelStream的实现区别,再回过头来,看StreamSupport中的第一个传参Spliterator,数据源分割迭代处理器器,这是java 8为了支持源数据的流操作抽象出来的一个接口,其中主要包含了以下几个方法:

jdk8 stream与parallelStream实现源码阅读

tryAdvance方法取源数据中剩下的元素,存在交给consumer消费;forEachRemaining顺序迭代剩下元素并交给consumer消费;trySplit方法将集合划分方法;estimateSize为返回大概的剩余元素个数;getExactSizeIfKnown返回确切的元素个数,当Spliterator是SIZED的时候;characteristics返回该characteristics具有的特征,比如SIZED,ORDERD等;hasCharacteristics是否包含某个特征;getComparator返回比较器,当Spliterator是ORDERD的时候。了解了这个接口的方法的基本作用,就可以灵活的实现自定义数据源生成器的分割迭代器了。

下面我们可以看下ArrayList实现分割迭代器的类ArrayListSpliterator:

jdk8 stream与parallelStream实现源码阅读

jdk8 stream与parallelStream实现源码阅读

可以看出其中trySplit实现了二分法;tryAdvance直接调用了当前index的元素Consumer的accept方法,并将index+1;forEachRemaining直接使用了数组迭代的随机访问方法。

阅读后小结:

     1、自定义stream实现的关键在于实现自己的Spliterator,然后调用StreamSupport的stream方法即可实现自定义的stream运算。

     2、串行stream目前从ArrayList本身看来并不会带来明显性能提升,比起外部自随机访问法迭代需要复制源数据元素,创建stream对象会带来一定的性能消耗(不排除jdk中其他集合有更优的实现或jdk后期优化);在数据量相对较多时,可以采用并行stream,多核机器下多线程处理数据可以提高处理效率,但需要注意如果不自定义ForkJoinPool线程池,系统所有并行stream处理都用同一个common线程池,可能会降低某些集合处理速度,增大响应时间。

    3、stream操作提供了常用集合操作,可以极大的精简代码量。

    4、并行流的实现原理为ForkJoinPool,需要保证每个stream是无状态,数据源中各个元素相互无依赖的,返回join结果时为多线程操作,则调用返回的集合也必须是线程安全的。

    5、看上图ArrayList的forEachRemaining实现,stream可以修改数据源。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/23761.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信