• 把Stream流学透了你也能写出简洁高效的代码,来看看吧(建议收藏)


    最近刚好有空给大家整理下JDK8的特性,这个在实际开发中的作用也是越来越重了,本文重点讲解下Stream API

    文章目录

     

    Stream API

    1.集合处理数据的弊端

      当我们在需要对集合中的元素进行操作的时候,除了必需的添加,删除,获取外,最典型的操作就是集合遍历,

    1. package com.bobo.jdk.stream;
    2. import java.util.ArrayList;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. public class StreamTest01 {
    6. public static void main(String[] args) {
    7. // 定义一个List集合
    8. List<String> list = Arrays.asList("张三","张三丰","成龙","周星驰");
    9. // 1.获取所有 姓张的信息
    10. List<String> list1 = new ArrayList<>();
    11. for (String s : list) {
    12. if(s.startsWith("张")){
    13. list1.add(s);
    14. }
    15. }
    16. // 2.获取名称长度为3的用户
    17. List<String> list2 = new ArrayList<>();
    18. for (String s : list1) {
    19. if(s.length() == 3){
    20. list2.add(s);
    21. }
    22. }
    23. // 3. 输出所有的用户信息
    24. for (String s : list2) {
    25. System.out.println(s);
    26. }
    27. }
    28. }

      上面的代码针对与我们不同的需求总是一次次的循环循环循环.这时我们希望有更加高效的处理方式,这时我们就可以通过JDK8中提供的Stream API来解决这个问题了。

      Stream更加优雅的解决方案:

    1. package com.bobo.jdk.stream;
    2. import java.util.ArrayList;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. public class StreamTest02 {
    6. public static void main(String[] args) {
    7. // 定义一个List集合
    8. List<String> list = Arrays.asList("张三","张三丰","成龙","周星驰");
    9. // 1.获取所有 姓张的信息
    10. // 2.获取名称长度为3的用户
    11. // 3. 输出所有的用户信息
    12. list.stream()
    13. .filter(s->s.startsWith("张"))
    14. .filter(s->s.length() == 3)
    15. .forEach(s->{
    16. System.out.println(s);
    17. });
    18. System.out.println("----------");
    19. list.stream()
    20. .filter(s->s.startsWith("张"))
    21. .filter(s->s.length() == 3)
    22. .forEach(System.out::println);
    23. }
    24. }

      上面的SteamAPI代码的含义:获取流,过滤张,过滤长度,逐一打印。代码相比于上面的案例更加的简洁直观

    2. Steam流式思想概述

      注意:Stream和IO流(InputStream/OutputStream)没有任何关系,请暂时忘记对传统IO流的固有印象!

    Stream流式思想类似于工厂车间的“生产流水线”,Stream流不是一种数据结构,不保存数据,而是对数据进行加工

    处理。Stream可以看作是流水线上的一个工序。在流水线上,通过多个工序让一个原材料加工成一个商品。

     

     

     

    Stream API能让我们快速完成许多复杂的操作,如筛选、切片、映射、查找、去除重复,统计,匹配和归约。

    3. Stream流的获取方式

    3.1 根据Collection获取

      首先,java.util.Collection 接口中加入了default方法 stream,也就是说Collection接口下的所有的实现都可以通过steam方法来获取Stream流。

    1. public static void main(String[] args) {
    2. List<String> list = new ArrayList<>();
    3. list.stream();
    4. Set<String> set = new HashSet<>();
    5. set.stream();
    6. Vector vector = new Vector();
    7. vector.stream();
    8. }

      但是Map接口别没有实现Collection接口,那这时怎么办呢?这时我们可以根据Map获取对应的key value的集合。

    1. public static void main(String[] args) {
    2. Map<String,Object> map = new HashMap<>();
    3. Stream<String> stream = map.keySet().stream(); // key
    4. Stream<Object> stream1 = map.values().stream(); // value
    5. Stream<Map.Entry<String, Object>> stream2 = map.entrySet().stream(); // entry
    6. }

    3.1 通过Stream的of方法

      在实际开发中我们不可避免的还是会操作到数组中的数据,由于数组对象不可能添加默认方法,所有Stream接口中提供了静态方法of

    1. public class StreamTest05 {
    2. public static void main(String[] args) {
    3. Stream<String> a1 = Stream.of("a1", "a2", "a3");
    4. String[] arr1 = {"aa","bb","cc"};
    5. Stream<String> arr11 = Stream.of(arr1);
    6. Integer[] arr2 = {1,2,3,4};
    7. Stream<Integer> arr21 = Stream.of(arr2);
    8. arr21.forEach(System.out::println);
    9. // 注意:基本数据类型的数组是不行的
    10. int[] arr3 = {1,2,3,4};
    11. Stream.of(arr3).forEach(System.out::println);
    12. }
    13. }

    4.Stream常用方法介绍

      Stream常用方法

      Stream流模型的操作很丰富,这里介绍一些常用的API。这些方法可以被分成两种:

     

    终结方法:返回值类型不再是 Stream 类型的方法,不再支持链式调用。本小节中,终结方法包括 count 和 forEach 方法。

    非终结方法:返回值类型仍然是 Stream 类型的方法,支持链式调用。(除了终结方法外,其余方法均为非终结方法。)

    Stream注意事项(重要)

    1. Stream只能操作一次
    2. Stream方法返回的是新的流
    3. Stream不调用终结方法,中间的操作不会执行

    4.1 forEach

       forEach用来遍历流中的数据的

    void forEach(Consumer<? super T> action);

      该方法接受一个Consumer接口,会将每一个流元素交给函数处理

    1. public static void main(String[] args) {
    2. Stream.of("a1", "a2", "a3").forEach(System.out::println);;
    3. }

    4.2 count

      Stream流中的count方法用来统计其中的元素个数的

    long count();

      该方法返回一个long值,代表元素的个数。

    1. public static void main(String[] args) {
    2. long count = Stream.of("a1", "a2", "a3").count();
    3. System.out.println(count);
    4. }

    4.3 filter

      filter方法的作用是用来过滤数据的。返回符合条件的数据

     

    可以通过filter方法将一个流转换成另一个子集流

    Stream<T> filter(Predicate<? super T> predicate);

      该接口接收一个Predicate函数式接口参数作为筛选条件

    1. public static void main(String[] args) {
    2. Stream.of("a1", "a2", "a3","bb","cc","aa","dd")
    3. .filter((s)->s.contains("a"))
    4. .forEach(System.out::println);
    5. }

    输出:

    1. a1
    2. a2
    3. a3
    4. aa

    4.4 limit

     

     

    limit方法可以对流进行截取处理,支取前n个数据,

    1. Stream<T> limit(long maxSize);
    2.   参数是一个long类型的数值,如果集合当前长度大于参数就进行截取,否则不操作:
    1. public static void main(String[] args) {
    2. Stream.of("a1", "a2", "a3","bb","cc","aa","dd")
    3. .limit(3)
    4. .forEach(System.out::println);
    5. }

    输出:

    1. a1
    2. a2
    3. a3

    4.5 skip

     

    如果希望跳过前面几个元素,可以使用skip方法获取一个截取之后的新流:

       Stream<T> skip(long n);

    操作:

    1. public static void main(String[] args) {
    2. Stream.of("a1", "a2", "a3","bb","cc","aa","dd")
    3. .skip(3)
    4. .forEach(System.out::println);
    5. }

    输出:

    1. bb
    2. cc
    3. aa
    4. dd

    4.6 map

      如果我们需要将流中的元素映射到另一个流中,可以使用map方法:

    <R> Stream<R> map(Function<? super T, ? extends R> mapper);

    该接口需要一个Function函数式接口参数,可以将当前流中的T类型数据转换为另一种R类型的数据

    1. public static void main(String[] args) {
    2. Stream.of("1", "2", "3","4","5","6","7")
    3. //.map(msg->Integer.parseInt(msg))
    4. .map(Integer::parseInt)
    5. .forEach(System.out::println);
    6. }

    4.7 sorted

      如果需要将数据排序,可以使用sorted方法:

        Stream<T> sorted();

      在使用的时候可以根据自然规则排序,也可以通过比较强来指定对应的排序规则

    1. public static void main(String[] args) {
    2. Stream.of("1", "3", "2","4","0","9","7")
    3. //.map(msg->Integer.parseInt(msg))
    4. .map(Integer::parseInt)
    5. //.sorted() // 根据数据的自然顺序排序
    6. .sorted((o1,o2)->o2-o1) // 根据比较强指定排序规则
    7. .forEach(System.out::println);
    8. }

    4.8 distinct

      如果要去掉重复数据,可以使用distinct方法:

        Stream<T> distinct();

     

    使用:

    1. public static void main(String[] args) {
    2. Stream.of("1", "3", "3","4","0","1","7")
    3. //.map(msg->Integer.parseInt(msg))
    4. .map(Integer::parseInt)
    5. //.sorted() // 根据数据的自然顺序排序
    6. .sorted((o1,o2)->o2-o1) // 根据比较强指定排序规则
    7. .distinct() // 去掉重复的记录
    8. .forEach(System.out::println);
    9. System.out.println("--------");
    10. Stream.of(
    11. new Person("张三",18)
    12. ,new Person("李四",22)
    13. ,new Person("张三",18)
    14. ).distinct()
    15. .forEach(System.out::println);
    16. }

      Stream流中的distinct方法对于基本数据类型是可以直接出重的,但是对于自定义类型,我们是需要重写hashCode和equals方法来移除重复元素。

    4.9 match

      如果需要判断数据是否匹配指定的条件,可以使用match相关的方法

    1. boolean anyMatch(Predicate<? super T> predicate); // 元素是否有任意一个满足条件
    2. boolean allMatch(Predicate<? super T> predicate); // 元素是否都满足条件
    3. boolean noneMatch(Predicate<? super T> predicate); // 元素是否都不满足条件

    使用

    1. public static void main(String[] args) {
    2. boolean b = Stream.of("1", "3", "3", "4", "5", "1", "7")
    3. .map(Integer::parseInt)
    4. //.allMatch(s -> s > 0)
    5. //.anyMatch(s -> s >4)
    6. .noneMatch(s -> s > 4)
    7. ;
    8. System.out.println(b);
    9. }

      注意match是一个终结方法

    4.10 find

      如果我们需要找到某些数据,可以使用find方法来实现

    1. Optional<T> findFirst();
    2. Optional<T> findAny();

     

    使用:

    1. public static void main(String[] args) {
    2. Optional<String> first = Stream.of("1", "3", "3", "4", "5", "1", "7").findFirst();
    3. System.out.println(first.get());
    4. Optional<String> any = Stream.of("1", "3", "3", "4", "5", "1", "7").findAny();
    5. System.out.println(any.get());
    6. }

    4.11 max和min

     

    如果我们想要获取最大值和最小值,那么可以使用max和min方法

    1. Optional<T> min(Comparator<? super T> comparator);
    2. Optional<T> max(Comparator<? super T> comparator);

    使用

    1. public static void main(String[] args) {
    2. Optional<Integer> max = Stream.of("1", "3", "3", "4", "5", "1", "7")
    3. .map(Integer::parseInt)
    4. .max((o1,o2)->o1-o2);
    5. System.out.println(max.get());
    6. Optional<Integer> min = Stream.of("1", "3", "3", "4", "5", "1", "7")
    7. .map(Integer::parseInt)
    8. .min((o1,o2)->o1-o2);
    9. System.out.println(min.get());
    10. }

    4.12 reduce方法

     

    如果需要将所有数据归纳得到一个数据,可以使用reduce方法

    T reduce(T identity, BinaryOperator<T> accumulator);

    使用:

    1. public static void main(String[] args) {
    2. Integer sum = Stream.of(4, 5, 3, 9)
    3. // identity默认值
    4. // 第一次的时候会将默认值赋值给x
    5. // 之后每次会将 上一次的操作结果赋值给x y就是每次从数据中获取的元素
    6. .reduce(0, (x, y) -> {
    7. System.out.println("x="+x+",y="+y);
    8. return x + y;
    9. });
    10. System.out.println(sum);
    11. // 获取 最大值
    12. Integer max = Stream.of(4, 5, 3, 9)
    13. .reduce(0, (x, y) -> {
    14. return x > y ? x : y;
    15. });
    16. System.out.println(max);
    17. }

    4.13 map和reduce的组合

      在实际开发中我们经常会将map和reduce一块来使用

    1. public static void main(String[] args) {
    2. // 1.求出所有年龄的总和
    3. Integer sumAge = Stream.of(
    4. new Person("张三", 18)
    5. , new Person("李四", 22)
    6. , new Person("张三", 13)
    7. , new Person("王五", 15)
    8. , new Person("张三", 19)
    9. ).map(Person::getAge) // 实现数据类型的转换
    10. .reduce(0, Integer::sum);
    11. System.out.println(sumAge);
    12. // 2.求出所有年龄中的最大值
    13. Integer maxAge = Stream.of(
    14. new Person("张三", 18)
    15. , new Person("李四", 22)
    16. , new Person("张三", 13)
    17. , new Person("王五", 15)
    18. , new Person("张三", 19)
    19. ).map(Person::getAge) // 实现数据类型的转换,符合reduce对数据的要求
    20. .reduce(0, Math::max); // reduce实现数据的处理
    21. System.out.println(maxAge);
    22. // 3.统计 字符 a 出现的次数
    23. Integer count = Stream.of("a", "b", "c", "d", "a", "c", "a")
    24. .map(ch -> "a".equals(ch) ? 1 : 0)
    25. .reduce(0, Integer::sum);
    26. System.out.println(count);
    27. }

    输出结果

    1. 87
    2. 22
    3. 3

    4.14 mapToInt

      如果需要将Stream中的Integer类型转换成int类型,可以使用mapToInt方法来实现

     

    使用

    1. public static void main(String[] args) {
    2. // Integer占用的内存比int多很多,在Stream流操作中会自动装修和拆箱操作
    3. Integer arr[] = {1,2,3,5,6,8};
    4. Stream.of(arr)
    5. .filter(i->i>0)
    6. .forEach(System.out::println);
    7. System.out.println("---------");
    8. // 为了提高程序代码的效率,我们可以先将流中Integer数据转换为int数据,然后再操作
    9. IntStream intStream = Stream.of(arr)
    10. .mapToInt(Integer::intValue);
    11. intStream.filter(i->i>3)
    12. .forEach(System.out::println);
    13. }

    4.15 concat

      如果有两个流,希望合并成为一个流,那么可以使用Stream接口的静态方法concat

    1. public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
    2. Objects.requireNonNull(a);
    3. Objects.requireNonNull(b);
    4. @SuppressWarnings("unchecked")
    5. Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
    6. (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
    7. Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
    8. return stream.onClose(Streams.composedClose(a, b));
    9. }

    使用:

    1. public static void main(String[] args) {
    2. Stream<String> stream1 = Stream.of("a","b","c");
    3. Stream<String> stream2 = Stream.of("x", "y", "z");
    4. // 通过concat方法将两个流合并为一个新的流
    5. Stream.concat(stream1,stream2).forEach(System.out::println);
    6. }

    4.16 综合案例

      定义两个集合,然后在集合中存储多个用户名称。然后完成如下的操作:

    1. 第一个队伍只保留姓名长度为3的成员
    2. 第一个队伍筛选之后只要前3个人
    3. 第二个队伍只要姓张的成员
    4. 第二个队伍筛选之后不要前两个人
    5. 将两个队伍合并为一个队伍
    6. 根据姓名创建Person对象
    7. 打印整个队伍的Person信息
    1. package com.bobo.jdk.stream;
    2. import com.bobo.jdk.lambda.domain.Person;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. import java.util.stream.Stream;
    6. public class StreamTest21Demo {
    7. /**
    8. * 1. 第一个队伍只保留姓名长度为3的成员
    9. * 2. 第一个队伍筛选之后只要前3个人
    10. * 3. 第二个队伍只要姓张的成员
    11. * 4. 第二个队伍筛选之后不要前两个人
    12. * 5. 将两个队伍合并为一个队伍
    13. * 6. 根据姓名创建Person对象
    14. * 7. 打印整个队伍的Person信息
    15. * @param args
    16. */
    17. public static void main(String[] args) {
    18. List<String> list1 = Arrays.asList("迪丽热巴", "宋远桥", "苏星河", "老子", "庄子", "孙子", "洪七 公");
    19. List<String> list2 = Arrays.asList("古力娜扎", "张无忌", "张三丰", "赵丽颖", "张二狗", "张天爱", "张三");
    20. // 1. 第一个队伍只保留姓名长度为3的成员
    21. // 2. 第一个队伍筛选之后只要前3个人
    22. Stream<String> stream1 = list1.stream().filter(s -> s.length() == 3).limit(3);
    23. // 3. 第二个队伍只要姓张的成员
    24. // 4. 第二个队伍筛选之后不要前两个人
    25. Stream<String> stream2 = list2.stream().filter(s -> s.startsWith("张")).skip(2);
    26. // 5. 将两个队伍合并为一个队伍
    27. // 6. 根据姓名创建Person对象
    28. // 7. 打印整个队伍的Person信息
    29. Stream.concat(stream1,stream2)
    30. //.map(n-> new Person(n))
    31. .map(Person::new)
    32. .forEach(System.out::println);
    33. }
    34. }
    35. 输出结果:
    1. Person{name='宋远桥', age=null, height=null}
    2. Person{name='苏星河', age=null, height=null}
    3. Person{name='张二狗', age=null, height=null}
    4. Person{name='张天爱', age=null, height=null}
    5. Person{name='张三', age=null, height=null}

    5.Stream结果收集

    5.1 结果收集到集合中

    1. /**
    2. * Stream结果收集
    3. * 收集到集合中
    4. */
    5. @Test
    6. public void test01(){
    7. // Stream<String> stream = Stream.of("aa", "bb", "cc");
    8. List<String> list = Stream.of("aa", "bb", "cc","aa")
    9. .collect(Collectors.toList());
    10. System.out.println(list);
    11. // 收集到 Set集合中
    12. Set<String> set = Stream.of("aa", "bb", "cc", "aa")
    13. .collect(Collectors.toSet());
    14. System.out.println(set);
    15. // 如果需要获取的类型为具体的实现,比如:ArrayList HashSet
    16. ArrayList<String> arrayList = Stream.of("aa", "bb", "cc", "aa")
    17. //.collect(Collectors.toCollection(() -> new ArrayList<>()));
    18. .collect(Collectors.toCollection(ArrayList::new));
    19. System.out.println(arrayList);
    20. HashSet<String> hashSet = Stream.of("aa", "bb", "cc", "aa")
    21. .collect(Collectors.toCollection(HashSet::new));
    22. System.out.println(hashSet);
    23. }

    输出:

    1. [aa, bb, cc, aa]
    2. [aa, bb, cc]
    3. [aa, bb, cc, aa]
    4. [aa, bb, cc]

    5.2 结果收集到数组中

      Stream中提供了toArray方法来将结果放到一个数组中,返回值类型是Object[],如果我们要指定返回的类型,那么可以使用另一个重载的toArray(IntFunction f)方法

    1. /**
    2. * Stream结果收集到数组中
    3. */
    4. @Test
    5. public void test02(){
    6. Object[] objects = Stream.of("aa", "bb", "cc", "aa")
    7. .toArray(); // 返回的数组中的元素是 Object类型
    8. System.out.println(Arrays.toString(objects));
    9. // 如果我们需要指定返回的数组中的元素类型
    10. String[] strings = Stream.of("aa", "bb", "cc", "aa")
    11. .toArray(String[]::new);
    12. System.out.println(Arrays.toString(strings));
    13. }

    5.3 对流中的数据做聚合计算

      当我们使用Stream流处理数据后,可以像数据库的聚合函数一样对某个字段进行操作,比如获得最大值,最小值,求和,平均值,统计数量。

    1. /**
    2. * Stream流中数据的聚合计算
    3. */
    4. @Test
    5. public void test03(){
    6. // 获取年龄的最大值
    7. Optional<Person> maxAge = Stream.of(
    8. new Person("张三", 18)
    9. , new Person("李四", 22)
    10. , new Person("张三", 13)
    11. , new Person("王五", 15)
    12. , new Person("张三", 19)
    13. ).collect(Collectors.maxBy((p1, p2) -> p1.getAge() - p2.getAge()));
    14. System.out.println("最大年龄:" + maxAge.get());
    15. // 获取年龄的最小值
    16. Optional<Person> minAge = Stream.of(
    17. new Person("张三", 18)
    18. , new Person("李四", 22)
    19. , new Person("张三", 13)
    20. , new Person("王五", 15)
    21. , new Person("张三", 19)
    22. ).collect(Collectors.minBy((p1, p2) -> p1.getAge() - p2.getAge()));
    23. System.out.println("最新年龄:" + minAge.get());
    24. // 求所有人的年龄之和
    25. Integer sumAge = Stream.of(
    26. new Person("张三", 18)
    27. , new Person("李四", 22)
    28. , new Person("张三", 13)
    29. , new Person("王五", 15)
    30. , new Person("张三", 19)
    31. )
    32. //.collect(Collectors.summingInt(s -> s.getAge()))
    33. .collect(Collectors.summingInt(Person::getAge))
    34. ;
    35. System.out.println("年龄总和:" + sumAge);
    36. // 年龄的平均值
    37. Double avgAge = Stream.of(
    38. new Person("张三", 18)
    39. , new Person("李四", 22)
    40. , new Person("张三", 13)
    41. , new Person("王五", 15)
    42. , new Person("张三", 19)
    43. ).collect(Collectors.averagingInt(Person::getAge));
    44. System.out.println("年龄的平均值:" + avgAge);
    45. // 统计数量
    46. Long count = Stream.of(
    47. new Person("张三", 18)
    48. , new Person("李四", 22)
    49. , new Person("张三", 13)
    50. , new Person("王五", 15)
    51. , new Person("张三", 19)
    52. ).filter(p->p.getAge() > 18)
    53. .collect(Collectors.counting());
    54. System.out.println("满足条件的记录数:" + count);
    55. }

    5.4 对流中数据做分组操作

      当我们使用Stream流处理数据后,可以根据某个属性将数据分组

    1. /**
    2. * 分组计算
    3. */
    4. @Test
    5. public void test04(){
    6. // 根据账号对数据进行分组
    7. Map<String, List<Person>> map1 = Stream.of(
    8. new Person("张三", 18, 175)
    9. , new Person("李四", 22, 177)
    10. , new Person("张三", 14, 165)
    11. , new Person("李四", 15, 166)
    12. , new Person("张三", 19, 182)
    13. ).collect(Collectors.groupingBy(Person::getName));
    14. map1.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v));
    15. System.out.println("-----------");
    16. // 根据年龄分组 如果大于等于18 成年否则未成年
    17. Map<String, List<Person>> map2 = Stream.of(
    18. new Person("张三", 18, 175)
    19. , new Person("李四", 22, 177)
    20. , new Person("张三", 14, 165)
    21. , new Person("李四", 15, 166)
    22. , new Person("张三", 19, 182)
    23. ).collect(Collectors.groupingBy(p -> p.getAge() >= 18 ? "成年" : "未成年"));
    24. map2.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v));
    25. }

    输出结果:

    1. k=李四 v=[Person{name='李四', age=22, height=177}, Person{name='李四', age=15, height=166}]
    2. k=张三 v=[Person{name='张三', age=18, height=175}, Person{name='张三', age=14, height=165}, Person{name='张三', age=19, height=182}]
    3. -----------
    4. k=未成年 v=[Person{name='张三', age=14, height=165}, Person{name='李四', age=15, height=166}]
    5. k=成年 v=[Person{name='张三', age=18, height=175}, Person{name='李四', age=22, height=177}, Person{name='张三', age=19, height=182}]

    多级分组: 先根据name分组然后根据年龄分组

    1. /**
    2. * 分组计算--多级分组
    3. */
    4. @Test
    5. public void test05(){
    6. // 先根据name分组,然后根据age(成年和未成年)分组
    7. Map<String,Map<Object,List<Person>>> map = Stream.of(
    8. new Person("张三", 18, 175)
    9. , new Person("李四", 22, 177)
    10. , new Person("张三", 14, 165)
    11. , new Person("李四", 15, 166)
    12. , new Person("张三", 19, 182)
    13. ).collect(Collectors.groupingBy(
    14. Person::getName
    15. ,Collectors.groupingBy(p->p.getAge()>=18?"成年":"未成年"
    16. )
    17. ));
    18. map.forEach((k,v)->{
    19. System.out.println(k);
    20. v.forEach((k1,v1)->{
    21. System.out.println("\t"+k1 + "=" + v1);
    22. });
    23. });
    24. }

    输出结果:

    1. 李四
    2. 未成年=[Person{name='李四', age=15, height=166}]
    3. 成年=[Person{name='李四', age=22, height=177}]
    4. 张三
    5. 未成年=[Person{name='张三', age=14, height=165}]
    6. 成年=[Person{name='张三', age=18, height=175}, Person{name='张三', age=19, height=182}]

    5.5 对流中的数据做分区操作

      Collectors.partitioningBy会根据值是否为true,把集合中的数据分割为两个列表,一个true列表,一个false列表

     

    1. /**
    2. * 分区操作
    3. */
    4. @Test
    5. public void test06(){
    6. Map<Boolean, List<Person>> map = Stream.of(
    7. new Person("张三", 18, 175)
    8. , new Person("李四", 22, 177)
    9. , new Person("张三", 14, 165)
    10. , new Person("李四", 15, 166)
    11. , new Person("张三", 19, 182)
    12. ).collect(Collectors.partitioningBy(p -> p.getAge() > 18));
    13. map.forEach((k,v)-> System.out.println(k+"\t" + v));
    14. }

     

    输出结果:

    1. false [Person{name='张三', age=18, height=175}, Person{name='张三', age=14, height=165}, Person{name='李四', age=15, height=166}]
    2. true [Person{name='李四', age=22, height=177}, Person{name='张三', age=19, height=182}]

    5.6 对流中的数据做拼接

      Collectors.joining会根据指定的连接符,将所有的元素连接成一个字符串

    1. /**
    2. * 对流中的数据做拼接操作
    3. */
    4. @Test
    5. public void test07(){
    6. String s1 = Stream.of(
    7. new Person("张三", 18, 175)
    8. , new Person("李四", 22, 177)
    9. , new Person("张三", 14, 165)
    10. , new Person("李四", 15, 166)
    11. , new Person("张三", 19, 182)
    12. ).map(Person::getName)
    13. .collect(Collectors.joining());
    14. // 张三李四张三李四张三
    15. System.out.println(s1);
    16. String s2 = Stream.of(
    17. new Person("张三", 18, 175)
    18. , new Person("李四", 22, 177)
    19. , new Person("张三", 14, 165)
    20. , new Person("李四", 15, 166)
    21. , new Person("张三", 19, 182)
    22. ).map(Person::getName)
    23. .collect(Collectors.joining("_"));
    24. // 张三_李四_张三_李四_张三
    25. System.out.println(s2);
    26. String s3 = Stream.of(
    27. new Person("张三", 18, 175)
    28. , new Person("李四", 22, 177)
    29. , new Person("张三", 14, 165)
    30. , new Person("李四", 15, 166)
    31. , new Person("张三", 19, 182)
    32. ).map(Person::getName)
    33. .collect(Collectors.joining("_", "###", "$$$"));
    34. // ###张三_李四_张三_李四_张三$$$
    35. System.out.println(s3);
    36. }

    6. 并行的Stream流

    6.1 串行的Stream流

      我们前面使用的Stream流都是串行,也就是在一个线程上面执行。

    1. /**
    2. * 串行流
    3. */
    4. @Test
    5. public void test01(){
    6. Stream.of(5,6,8,3,1,6)
    7. .filter(s->{
    8. System.out.println(Thread.currentThread() + "" + s);
    9. return s > 3;
    10. }).count();
    11. }

    输出:

    1. Thread[main,5,main]5
    2. Thread[main,5,main]6
    3. Thread[main,5,main]8
    4. Thread[main,5,main]3
    5. Thread[main,5,main]1
    6. Thread[main,5,main]6

    6.2 并行流

      parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可以提高多线程任务的速度。

    6.2.1 获取并行流

      我们可以通过两种方式来获取并行流。

    1. 通过List接口中的parallelStream方法来获取
    2. 通过已有的串行流转换为并行流(parallel)

    实现:

    1. /**
    2. * 获取并行流的两种方式
    3. */
    4. @Test
    5. public void test02(){
    6. List<Integer> list = new ArrayList<>();
    7. // 通过List 接口 直接获取并行流
    8. Stream<Integer> integerStream = list.parallelStream();
    9. // 将已有的串行流转换为并行流
    10. Stream<Integer> parallel = Stream.of(1, 2, 3).parallel();
    11. }

    6.2.2 并行流操作

    1. /**
    2. * 并行流操作
    3. */
    4. @Test
    5. public void test03(){
    6. Stream.of(1,4,2,6,1,5,9)
    7. .parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理
    8. .filter(s->{
    9. System.out.println(Thread.currentThread() + " s=" +s);
    10. return s > 2;
    11. }).count();
    12. }

    效果

    1. Thread[main,5,main] s=1
    2. Thread[ForkJoinPool.commonPool-worker-2,5,main] s=9
    3. Thread[ForkJoinPool.commonPool-worker-6,5,main] s=6
    4. Thread[ForkJoinPool.commonPool-worker-13,5,main] s=2
    5. Thread[ForkJoinPool.commonPool-worker-9,5,main] s=4
    6. Thread[ForkJoinPool.commonPool-worker-4,5,main] s=5
    7. Thread[ForkJoinPool.commonPool-worker-11,5,main] s=1

    6.3 并行流和串行流对比

      我们通过for循环,串行Stream流,并行Stream流来对500000000亿个数字求和。来看消耗时间

    1. package com.bobo.jdk.res;
    2. import org.junit.After;
    3. import org.junit.Before;
    4. import org.junit.Test;
    5. import java.util.stream.LongStream;
    6. public class Test03 {
    7. private static long times = 500000000;
    8. private long start;
    9. @Before
    10. public void befor(){
    11. start = System.currentTimeMillis();
    12. }
    13. @After
    14. public void end(){
    15. long end = System.currentTimeMillis();
    16. System.out.println("消耗时间:" + (end - start));
    17. }
    18. /**
    19. * 普通for循环 消耗时间:138
    20. */
    21. @Test
    22. public void test01(){
    23. System.out.println("普通for循环:");
    24. long res = 0;
    25. for (int i = 0; i < times; i++) {
    26. res += i;
    27. }
    28. }
    29. /**
    30. * 串行流处理
    31. * 消耗时间:203
    32. */
    33. @Test
    34. public void test02(){
    35. System.out.println("串行流:serialStream");
    36. LongStream.rangeClosed(0,times)
    37. .reduce(0,Long::sum);
    38. }
    39. /**
    40. * 并行流处理 消耗时间:84
    41. */
    42. @Test
    43. public void test03(){
    44. LongStream.rangeClosed(0,times)
    45. .parallel()
    46. .reduce(0,Long::sum);
    47. }
    48. }

      通过案例我们可以看到parallelStream的效率是最高的。

      Stream并行处理的过程会分而治之,也就是将一个大的任务切分成了多个小任务,这表示每个任务都是一个线程操作。

    6.4 线程安全问题

      在多线程的处理下,肯定会出现数据安全问题。如下:

    1. @Test
    2. public void test01(){
    3. List<Integer> list = new ArrayList<>();
    4. for (int i = 0; i < 1000; i++) {
    5. list.add(i);
    6. }
    7. System.out.println(list.size());
    8. List<Integer> listNew = new ArrayList<>();
    9. // 使用并行流来向集合中添加数据
    10. list.parallelStream()
    11. //.forEach(s->listNew.add(s));
    12. .forEach(listNew::add);
    13. System.out.println(listNew.size());
    14. }

    运行效果:

    839

    或者直接抛异常

    1. java.lang.ArrayIndexOutOfBoundsException
    2. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    3. at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    4. at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    5. at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    6. at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    7. ....
    8. Caused by: java.lang.ArrayIndexOutOfBoundsException: 366
    9. at java.util.ArrayList.add(ArrayList.java:463)

      针对这个问题,我们的解决方案有哪些呢?

    1. 加同步锁
    2. 使用线程安全的容器
    3. 通过Stream中的toArray/collect操作

    实现:

    1. /**
    2. * 加同步锁
    3. */
    4. @Test
    5. public void test02(){
    6. List<Integer> listNew = new ArrayList<>();
    7. Object obj = new Object();
    8. IntStream.rangeClosed(1,1000)
    9. .parallel()
    10. .forEach(i->{
    11. synchronized (obj){
    12. listNew.add(i);
    13. }
    14. });
    15. System.out.println(listNew.size());
    16. }
    17. /**
    18. * 使用线程安全的容器
    19. */
    20. @Test
    21. public void test03(){
    22. Vector v = new Vector();
    23. Object obj = new Object();
    24. IntStream.rangeClosed(1,1000)
    25. .parallel()
    26. .forEach(i->{
    27. synchronized (obj){
    28. v.add(i);
    29. }
    30. });
    31. System.out.println(v.size());
    32. }
    33. /**
    34. * 将线程不安全的容器转换为线程安全的容器
    35. */
    36. @Test
    37. public void test04(){
    38. List<Integer> listNew = new ArrayList<>();
    39. // 将线程不安全的容器包装为线程安全的容器
    40. List<Integer> synchronizedList = Collections.synchronizedList(listNew);
    41. Object obj = new Object();
    42. IntStream.rangeClosed(1,1000)
    43. .parallel()
    44. .forEach(i->{
    45. synchronizedList.add(i);
    46. });
    47. System.out.println(synchronizedList.size());
    48. }
    49. /**
    50. * 我们还可以通过Stream中的 toArray方法或者 collect方法来操作
    51. * 就是满足线程安全的要求
    52. */
    53. @Test
    54. public void test05(){
    55. List<Integer> listNew = new ArrayList<>();
    56. Object obj = new Object();
    57. List<Integer> list = IntStream.rangeClosed(1, 1000)
    58. .parallel()
    59. .boxed()
    60. .collect(Collectors.toList());
    61. System.out.println(list.size());
    62. }

    7.Fork/Join框架

      parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小任务来异步执行。 Fork/Join框架主要包含三个模块:

    1. 线程池:ForkJoinPool
    2. 任务对象:ForkJoinTask
    3. 执行任务的线程:ForkJoinWorkerThread

    7.1 Fork/Join原理-分治法

      ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

     

    7.2 Fork/Join原理-工作窃取算法

      Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

     

     

    那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

      工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,

    比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。

      对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:
    java.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。

    7.3 Fork/Join案例

      需求:使用Fork/Join计算1-10000的和,当一个任务的计算数量大于3000的时候拆分任务。数量小于3000的时候就计算

    案例的实现 

    1. package com.bobo.jdk.res;
    2. import java.util.concurrent.ForkJoinPool;
    3. import java.util.concurrent.RecursiveTask;
    4. public class Test05 {
    5. /**
    6. * 使用Fork/Join计算1-10000的和,
    7. * 当一个任务的计算数量大于3000的时候拆分任务。
    8. * 数量小于3000的时候就计算
    9. * @param args
    10. */
    11. public static void main(String[] args) {
    12. long start = System.currentTimeMillis();
    13. ForkJoinPool pool = new ForkJoinPool();
    14. SumRecursiveTask task = new SumRecursiveTask(1,10000l);
    15. Long result = pool.invoke(task);
    16. System.out.println("result="+result);
    17. long end = System.currentTimeMillis();
    18. System.out.println("总的耗时:" + (end-start));
    19. }
    20. }
    21. class SumRecursiveTask extends RecursiveTask<Long>{
    22. // 定义一个拆分的临界值
    23. private static final long THRESHOLD = 3000l;
    24. private final long start;
    25. private final long end;
    26. public SumRecursiveTask(long start, long end) {
    27. this.start = start;
    28. this.end = end;
    29. }
    30. @Override
    31. protected Long compute() {
    32. long length = end -start;
    33. if(length <= THRESHOLD){
    34. // 任务不用拆分,可以计算
    35. long sum = 0;
    36. for(long i=start ; i <= end ;i++){
    37. sum += i;
    38. }
    39. System.out.println("计算:"+ start+"-->" + end +",的结果为:" + sum);
    40. return sum;
    41. }else{
    42. // 数量大于预定的数量,那说明任务还需要继续拆分
    43. long middle = (start+end)/2;
    44. System.out.println("拆分:左边 " + start+"-->" + middle+", 右边" + (middle+1) + "-->" + end);
    45. SumRecursiveTask left = new SumRecursiveTask(start, middle);
    46. left.fork();
    47. SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
    48. right.fork();
    49. return left.join()+right.join();
    50. }
    51. }
    52. }

    输出结果:

    1. 拆分:左边 1-->5000, 右边5001-->10000
    2. 拆分:左边 5001-->7500, 右边7501-->10000
    3. 拆分:左边 1-->2500, 右边2501-->5000
    4. 计算:1-->2500,的结果为:3126250
    5. 计算:5001-->7500,的结果为:15626250
    6. 计算:2501-->5000,的结果为:9376250
    7. 计算:7501-->10000,的结果为:21876250
    8. result=50005000
    9. 总的耗时:19

    ~好了,Stream流的内容就介绍到这儿,如果对你有帮助,欢迎点赞关注加收藏哦 V_V

  • 相关阅读:
    linux 查看并统计进程、线程数量: awk 分组统计
    vs2010 webapi开发http请求以及website中如何实现http请求
    如何解决Maven依赖冲突?
    floating-ui react-dom-interactions:如何将浮动元素宽度设置为与参考宽度相同
    如何使用nginx部署https网站(亲测可行)
    C++基础知识(十九)--- 函数对象
    VS2022 安装.NET 3.5/.NET 4/.NET 4.5/.NET 4.5.1目标包的方法
    KY34 Is It A Tree?
    彩虹商城知识付费程序
    SpringBoot整合Redis,redis连接池和RedisTemplate序列化
  • 原文地址:https://blog.csdn.net/m0_67698950/article/details/125599687