终止流:收集 与 汇聚
管道是由终止操作结束的,上一篇文章介绍过,终止操作分为3组:搜索操作、汇聚、副作用的操作。
虽然已经简单介绍过这3组终止操作,但关于汇聚还有很多内容值得探讨,本章节就完成这个任务。
广义上来讲。所谓的汇聚就是返回单个值的操作,它以某种方式总结了流元素的值。
虽然现代Java编程实践一般来说会鼓励不变性,但大多数管道还是会以可变收集做为结束。因此,收集(也称为可变汇聚)是Stream API中最为重要的终止操作。
下面对传统的Java批处理与收集做一个简单的比较。如果将实际的数据源建模为 Iterable<Book>(这里指library),那么我们通常可以将它的值汇聚到 List<Book> 中,就像下面这样
List<Book> bookList = new ArrayList<>(); for (Book b : library) { bookList.add(b); } // 或者 library.forEach(book -> bookList.add(book));
在 Java8 中,可以通过 Stream<Book> 对数据源建模,因此如下代码效果与上面是一样的。
List<Book> bookList = library.stream().collect(Collectors.toList());
除了显而易见的简洁,可读性,这个收集器版本还有很多优势:即便不断积聚流元素的 List(当前实现中使用的是ArrayList)不是线程安全的,流操作还是能够安全的以并行方式执行。
收集器模式是非常灵活的,并且可以轻松的进行组合:下面逐个介绍使用示例
收集器模式的示例
// 根据主题对图书进行分类的Map Map<Topic, List<Book>> booksByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic)); // 从图书标题映射到最新版本发布日期的有序Map Map<String, Year> titleToPubDate = library.stream() .collect(Collectors.toMap( Book::getTitle, Book::getPubDate, BinaryOperator.maxBy(Comparator.naturalOrder()), TreeMap::new)); // 将图书划分为小说(对应true)于非小说(false)的map Map<Boolean, List<Book>> fictionOrNon = library.stream() .collect(Collectors.partitioningBy(b -> b.getTopic() == Topic.FICTION)); // 将每个主题关联到该主题下拥有最多作者的图书上 Map<Topic, Optional<Book>> mostAuthorsByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.maxBy(Comparator.comparing(b -> b.getAuthors().size())))); // 将每个主题关联到该主题总的卷书上 Map<Topic, Integer> volumeCountByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.summingInt(b -> b.getPageCounts().length))); // 拥有最多图书的主题 Optional<Topic> mostPopularTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.counting())) .entrySet() .stream() .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); // 将每个主题关联到该主题下所有图书标题并拼成字符串 Map<Topic, String> concatenatedTitlesByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.mapping( Book::getTitle, Collectors.joining(";") )));
使用收集器
本章的后续章节会深入探索收集器模式的想法,同时会介绍何时以及如何创建自定义收集器。
在这之前,我们应该考虑其最常见和最直接的使用模式:即 Collections 类的工厂方法所提供的预定义收集器的作用。
可以将其分为两类:独立收集器(单独使用)以及与其他收集器组合使用的收集器。
独立的预定义收集器
根据功能,独立收集器可以分为3组
积聚到框架提供的容器中
积聚到自定义的集合中
将元素积聚到分类Map中
对于 toSet 来说就是 Set 的一个实现,对于 toList 来说就是 List的一个实现,对于 toMap 来说就是 Map的一个实现。这些收集器的积聚也是显而易见的:对于 Collection 实现就是 add,对于 toMap 收集器来说就是 put。
Collectors 源码 joining方法
该组中还有一个 joining 方法,它返回的收集器会将 String 对象流连接到 StringBuilder中,接下来其内容会以 String 形式返回
joining()Collector<CharSequence, ?, String>
joining(CharSequence delimiter)Collector<CharSequence, ?, String>
joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix)Collector<CharSequence, ?, String>
String title = library.stream().map(Book::getTitle).collect(Collectors.joining()); System.out.println(title); String title2 = library.stream().map(Book::getTitle).collect(Collectors.joining("::")); System.out.println(title2); String title3 = library.stream().map(Book::getTitle).collect(Collectors.joining("::","前缀","后缀")); System.out.println(title3); // 运行结果 China 's image baseComputer use 3000 askedVossNew WorldStarbucksTeslaYunnanbaiyaoProgramming guide China 's image base::Computer use 3000 asked::Voss::New World::Starbucks::Tesla::Yunnanbaiyao::Programming guide 前缀China 's image base::Computer use 3000 asked::Voss::New World::Starbucks::Tesla::Yunnanbaiyao::Programming guide后缀
我们可以通过如下代码:创建一个字符串列表,其中每个字符串都包含一本书的所有作者名。
List<String> authorsForBooks = library.stream() .map(b -> b.getAuthors().stream() .collect(Collectors.joining("、", b.getTitle() + ",作者:", "\n"))) .collect(Collectors.toList()); authorsForBooks.forEach(System.out::print); // 运行结果 China 's image base,作者:Li、Fu、Li Computer use 3000 asked,作者:Aho、Lam、听风、Ullman Voss,作者:Patrick White New World,作者:Tolkien Starbucks,作者:Li Tesla,作者:听风、man Yunnanbaiyao,作者:White Programming guide,作者:Jekl
到目前为止,我们见到的所有收集器都会将元素积聚到框架选择的集合中。很多 Collections 方法都有一些变种,可以指定容器的提供者。
Collectors 源码 toMap方法
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction) Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction, Supplier<M> mapSupplier)
例如,我们之间看到合并功能中使用的 toMap 就是用于构建一个从图书映射到最新版日期的 Map,如果随后要以标题的字母表顺序显示 Map 的内容,那么将其放到有序的 Map 中就会改进性能。
Map<String, Year> titleToPubDate = library.stream() .collect(Collectors.toMap( Book::getTitle, Book::getPubDate, (x, y) -> x.isAfter(y) ? x : y, TreeMap::new)); titleToPubDate.forEach((title, year) -> System.out.println("标题:" + title + ", 年份:" + year)); // 运行结果 标题:China 's image base, 年份:2016 标题:Computer use 3000 asked, 年份:2013 标题:New World, 年份:1999 标题:Programming guide, 年份:1981 标题:Starbucks, 年份:2013 标题:Tesla, 年份:2017 标题:Voss, 年份:1980 标题:Yunnanbaiyao, 年份:1980
由于合并功能实际上是使用自然顺序选择两个 java.time.Year 值中较大的一个,因此可以替换如下代码
Map<String, Year> titleToPubDate = library.stream() .collect(Collectors.toMap( Book::getTitle, Book::getPubDate, BinaryOperator.maxBy(Comparator.naturalOrder()), TreeMap::new));
我们看到3个 toMap 重载方法的通用性越来越高:第1个只接收键值抽取函数;第2个还接收一个合并函数;第3个则又接收一个提供器。
Collectors 源码 toCollection方法
public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { return new CollectorImpl<>(collectionFactory, Collection<T>::add, (r1, r2) -> { r1.addAll(r2); return r1; }, CH_ID); }
相对于 toList 与 toSet 增加了额外的重载方法,现在已经有一个更为通用的 toCollection 方法,它不仅可以选择 Set 与 List 的任意实现,还可以选择 Collection 的任意子接口。
例如:我们可以将流元素收集到一个有序集合中或是阻塞队列中
TreeSet<String> sortedTitles = library.stream() .map(Book::getTitle) .collect(Collectors.toCollection(TreeSet::new)); sortedTitles.stream().forEach(System.out::println); System.out.println("---------------------------------"); LinkedBlockingDeque<Book> queueInPubDateOrder = library.stream() .sorted(Comparator.comparing(Book::getPubDate)) .collect(Collectors.toCollection(LinkedBlockingDeque::new)); queueInPubDateOrder.forEach(book -> System.out.println(book.getTitle() + " : " + book.getPubDate())); // 运行结果 China 's image base Computer use 3000 asked New World Programming guide Starbucks Tesla Voss Yunnanbaiyao --------------------------------- Voss : 1980 Yunnanbaiyao : 1980 Programming guide : 1981 New World : 1999 Computer use 3000 asked : 2013 Starbucks : 2013 China 's image base : 2016 Tesla : 2017
Collectors 源码 groupingBy 方法
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream)
例如:根据主题对图书进行分类
Map<Topic, List<Book>> booksByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic)); booksByTopic.forEach((topic, book) -> book.forEach(b -> System.out.println(topic + " : " + b.getTitle()))); // 运行结果 COMPUTING : Computer use 3000 asked COMPUTING : Tesla COMPUTING : Programming guide FICTION : Voss FICTION : New World HISTORY : Yunnanbaiyao MEDICINE : China 's image base MEDICINE : Starbucks
Collectors 源码 partitioningBy 方法
groupBy 的一个变种是便捷方法 partitioningBy,其中键类型 K 被指定为 Boolean
public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) { public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
例如,将 true 映射为小说列表,将 false 映射为非小说列表
Map<Boolean, List<Book>> fictionOrNon = library.stream() .collect(Collectors.partitioningBy(b -> b.getTopic() == Topic.FICTION)); fictionOrNon.forEach((x, y) -> y.forEach(b -> System.out.println(x + ",类型:" + b.getTopic().name() + ",标题: " + b.getTitle())));
组合收集器
Collector API 真正强大之处在于收集器可以协同工作,并与其他功能搭配使用。
举个例子,我想表示一组值的分布情况,例如图书馆中的图书根据每个主题的分布情况。
前面已经介绍过分类收集器,它可以生成一组映射,其中键是属性,例如图书的主题。不过映射的值类型已经确定是流元素的一个列表。想要表示分布情况,该列表还需要被替换为元素数量。这只是使用值替换映射中的元素列表的一个示例:还可以使用元素衍生属性的列表(例如图书的发布者),或是其他针对元素值的汇聚(时间最为久远的主题,页数总和等)。相对于这么多用例提供专门的分类收集器,Collector API 则提供了一个扩展点,可以将收集器组合起来。
通过组合,我们可以将多个收集器或其他操作的结果组合起来创建新的收集器。其中最为重要的一种形式就是将 groupingBy 与 第2个“下游”收集器组合起来。在该组合中,groupingBy 提供了分类功能与分类键,与给定键相关的元素则被发送给下游收集器进行下一步处理。
groupingBy 默认下游收集器就是 Collectors.toList 所返回的,所以
Map<Topic, List<Book>> booksByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.toList())); // 等同于 Map<Topic, List<Book>> booksByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.toList()));
1、组合 Collectors.counting,统计每种类型书籍的总数
Map<Topic, Long> distributionByTopic = library.stream().collect(Collectors.groupingBy(Book::getTopic, Collectors.counting())); distributionByTopic.forEach((t, l) -> System.out.println("类型:" + t + ",总数:" + l)); // 运行结果 类型:FICTION,总数:2 类型:COMPUTING,总数:3 类型:HISTORY,总数:1 类型:MEDICINE,总数:2
其他收集器也可以用作 groupingBy 的下游,事实上,有很多Collectors 工厂方法都针对这个目的的。
例如 counting 返回的收集器是终止操作 count,它可以用作下游收集器。还比如如下更多。
2、对于终止操作 max 与 min 是 Collectors 工厂方法 maxBy 与 minBy
例如,我们创建一个映射,其中包含每一个主题下作者最多的图书。
// 将每个主题关联到该主题下拥有最多作者的图书上 Map<Topic, Optional<Book>> mostAuthorsByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.maxBy(Comparator.comparing(b -> b.getAuthors().size())))); mostAuthorsByTopic.forEach((t, b) -> System.out.println("类型:" + t + ",标题:" + b.get().getTitle())); // 运行结果 类型:MEDICINE,标题:China 's image base 类型:FICTION,标题:Voss 类型:COMPUTING,标题:Computer use 3000 asked 类型:HISTORY,标题:Yunnanbaiyao
3、对于原生流终止操作 sum 与 average 的是 summingInt、summingLong 与 summingDouble 及其平均数版本返回的收集器
例如:我们创建一个很映射,其中包含每一个主题下卷的总和(回忆上文,getPageCounts 返回的是一个 int 值数组,数组长度等于该书总卷数)
// 将每个主题关联到该主题总的卷书上 Map<Topic, Integer> volumeCountByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.summingInt(b -> b.getPageCounts().length))); volumeCountByTopic.forEach((x, y) -> System.out.println("类型:" + x + ",总卷数:" + y)); // 运行结果 类型:COMPUTING,总卷数:3 类型:FICTION,总卷数:4 类型:HISTORY,总卷数:1 类型:MEDICINE,总卷数:2
还可以根据主题创建一个包含图书平均厚度的 Map
// 创建一个包含着图书平均厚度的 Map Map<Topic, Double> averageHeightByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.averagingDouble(Book::getHeight))); averageHeightByTopic.forEach((x, y) -> System.out.println("类型:" + x + ",平均厚度:" + y));
4、对于终止操作 summaryStatistics 的是 summarizingInt、summarizingLong 与 summarizingDouble 返回的是收集器,它们会将原生值收集到相应的 SummaryStatistics 对象中。
例如:生成针对卷数流的一个 IntSummaryStatistics 实例(而非上一个示例那样的根据主题得到的图书卷数)
// 汇总统计 Map<Topic, IntSummaryStatistics> volumeStats = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.summarizingInt(b -> b.getPageCounts().length))); volumeStats.forEach((x, y) -> System.out.println("类型:" + x + ",统计结果:" + y)); // 运行结果 类型:FICTION,统计结果:IntSummaryStatistics{count=2, sum=4, min=1, average=2.000000, max=3} 类型:MEDICINE,统计结果:IntSummaryStatistics{count=2, sum=2, min=1, average=1.000000, max=1} 类型:HISTORY,统计结果:IntSummaryStatistics{count=1, sum=1, min=1, average=1.000000, max=1} 类型:COMPUTING,统计结果:IntSummaryStatistics{count=3, sum=3, min=1, average=1.000000, max=1}
对于终止操作 reduce 的3个重载方法的是 reducing 的 3 个重载方法返回的收集器。
上述示例存在一些共同点:下游收集器(无论是统计元素数量,根据某个属性排序,还是对某个属性统计)都会接收并处理整个流元素。
不过,有时下游收集器只需要处理一个属性:例如,我想要创建一个从每个主题到该主题下所有标题字符串拼接的映射。
从每本书中抽取出标题这个操作非常类似于对流的 map 操作,不过对于该示例来说,要在下游收集器接收到之前应用到 groupingBy 所分发到流对象上。这个需求可以通过 Collectors.mapping 所创建的收集器来实现,用户可以提供映射与下游收集器。
Collectors 源码 mapping 方法
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream)
示例,借助于mapping的帮助,创建一个从标题到该主题下所有标题拼接的字符串的映射是很直接的。
// 将每个主题关联到该主题下所有图书标题并拼成字符串 Map<Topic, String> concatenatedTitlesByTopic = library.stream() .collect(Collectors.groupingBy( Book::getTopic, Collectors.mapping( Book::getTitle, Collectors.joining("、") ))); concatenatedTitlesByTopic.forEach((t, s) -> System.out.println("类型:" + t + ", 标题串:" + s)); // 运行结果 类型:MEDICINE, 标题串:China 's image base、Starbucks 类型:COMPUTING, 标题串:Computer use 3000 asked、Tesla、Programming guide 类型:HISTORY, 标题串:Yunnanbaiyao 类型:FICTION, 标题串:Voss、New World
链接管道
例如:图书数量最多的主题,我们知道如何生成一个从主题到图书数量的映射;
Map<Topic, Long> distributionByTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.counting()));
不过,现在 Map<Topic, Long> 中的每个条目都需要进行比较才能找到最大的那个。在比较之前,图书馆中所有元素都要经由收集器进行处理来创建分类映射:接下来才能比较主题、数量对,从而找到最大的那个,要想将 Stream API 操作应用到下一阶段的处理,我们需要开启另一个流,它将这些配对作为自己的元素。
Stream<Map.Entry<Topic, Long>> entries = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.counting())) .entrySet() .stream(); entries.forEach(entry -> System.out.println("标题:" + entry.getKey() + ",总数:" + entry.getValue())); // 运行结果同上代码一致
现在可以通过方法 Map.Entry.getKey 与 Map.Entry.getValue 处理每个 Map.Entry 对象的组件了。
Java 8 增加了用于这种情况处理的比较器生成方法 comparingByKey 与 comparingByValue。对于找到最流行的主题这一问题,可以通过如下实现。
Optional<Topic> mostPopularTopic = entries .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); System.out.println("最流行的主题:"+mostPopularTopic.get().name()); // 最流行的主题:COMPUTING
可以通过两个管道连接起来模仿流式管道处理:
Optional<Topic> mostPopularTopic = library.stream() .collect(Collectors.groupingBy(Book::getTopic, Collectors.counting())) .entrySet() .stream() .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey);
对于 Java 8 的早期阶段来说,我们没办法预测惯用方式到底会如何演化,这么做的优缺点如下所示:
简洁,可读性好
如果没有注意到优化只能应用到单独的两个管道上,以及需要足够的内存来容纳中间集合这一事实,那么这么做就具有迷惑性。这样,它与某些中间操作的行为(例如sorted,在进行到下一步之前需要将内部收集流的整个内容)就没什么差别了。
尽管存在性能问题,但这么做的目的取决于其他队代码的可读性与维护性的整体影响。因此,用的人还是很多的。
并发收集
并发收集器是指具备 CONCURRENT 特性的收集器,它告诉框架可以从多个线程中对相同的结果容器调用积聚器函数。这会改进并发性能。代价则是失去了流元素的顺序。因此,对于并发收集来说,最常见的用例就是收集到 Map 中,而且 Collectors 类中的每个 toMap 方法都有一个相应的 toCurrentMap 方法,它会生成一个针对 CoucurrentMap 的收集器,与之类似,groupingBy的每个重载方法都有一个对应的 groupingByConcurrent方法,它会返回一个 ConcurrentMap而非map。
未完待补充。。。