什么是Stream流

Java 8 引入的 Stream API 是一个革命性的特性,它为集合操作提供了一种更高效、更简洁的方式。配合同版本的lambda表达式的出现,我们操作集合(Collection)提供了极大的便利。Stream 是 Java 8 中处理集合数据的抽象概念,它不是一个数据结构,而是一个来自数据源的元素队列并支持聚合操作。

Stream 将要处理的元素集合看作一种流,在流的过程中,借助 Stream API 对流中的元素进行操作,比如:筛选、排序、聚合等。

Stream 允许你以声明式方式处理数据集合,将复杂的数据处理任务转化为一系列简单的操作链。

这里的流指的是一系列元素的序列,它可以在一次遍历的过程中逐个处理这些元素。在Java中,流是对数据的抽象,可以操作各种不同类型的数据源,如集合、数组、文件等。

Stream可以由数组或集合创建,对流的操作分为两种:

  • 中间操作,每次返回一个新的流,可以有多个。(筛选 filter、映射 map、排序 sorted、去重组合 skip—limit)

  • 终端操作,每个流只能进行一次终端操作,终端操作结束后流无法再次使用。终端操作会产生一个新的集合或值。(遍历 foreach、匹配 find–match、规约 reduce、聚合 max–min–count、收集 collect)

Stream 的特点

  • 不存储数据:Stream 是对数据源的视图,不存储元素。
  • 函数式编程:所有操作都返回 Stream 或最终结果,不修改原始数据。
  • 延迟执行:中间操作(如 filtermap)是延迟的,直到终止操作(如 collectforEach)才执行。
  • 可消费性:Stream 只能被消费一次,消费后需要重新创建。

Stream与传统遍历对比

几乎所有的集合(如 Collection 接口或 Map 接口等)都支持直接或间接的遍历操作。而当我们需要对集合中的元素进行操作的时候,除了必需的添加、删除、获取外,最典型的就是集合遍历。

这里的遍历实际上不只是一一列举,可以是对其中的元素一一操作

如果我们从列表中筛选出长度大于 3 的字符串,并转换为大写

普通的写法就是如下

1
2
3
4
5
6
7
8
9
10
List<String> names = Arrays.asList("Tom", "Alice", "Bob", "Charlie");
List<String> result = new ArrayList<>();

for (String name : names) {
if (name.length() > 3) {
result.add(name.toUpperCase());
}
}

System.out.println(result); // [ALICE, CHARLIE]

使用 Stream 流效果如下

1
2
3
4
5
6
7
List<String> names = Arrays.asList("Tom", "Alice", "Bob", "Charlie");
List<String> result = names.stream()
.filter(name -> name.length() > 3) // 过滤长度>3的字符串
.map(String::toUpperCase) // 转换为大写
.collect(Collectors.toList()); // 收集结果

System.out.println(result); // [ALICE, CHARLIE]

在正常情况下,如果数值可能出现null,我们肯定需要处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<User> users = Arrays.asList(
new User("Alice", 25),
null,
new User("Bob", 30)
);

List<String> names = new ArrayList<>();
for (User user : users) {
if (user != null && user.getName() != null) {
names.add(user.getName());
}
}

System.out.println(names); // [Alice, Bob]

而在 Stream 中,这个处理十分简便而且清晰,代码更健壮

1
2
3
4
5
6
7
List<String> names = users.stream()
.filter(Objects::nonNull) // 过滤 null 元素
.map(User::getName)
.filter(Objects::nonNull) // 过滤 name 为 null 的情况
.collect(Collectors.toList());

System.out.println(names); // [Alice, Bob]

Stream 的创建

  1. 从集合创建

    1
    2
    3
    List<String> list = Arrays.asList("a", "b", "c");
    Stream<String> stream = list.stream(); // 顺序流
    Stream<String> parallelStream = list.parallelStream(); // 并行流
  2. 从数组创建

    1
    2
    int[] array = {1, 2, 3};
    IntStream intStream = Arrays.stream(array);
  3. 使用 Stream.of()

    1
    Stream<String> stream = Stream.of("a", "b", "c");
  4. 创建无限流:iterate()generate()

    1
    2
    3
    4
    5
    // 迭代生成无限流(从 0 开始,每次加 2)
    Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);

    // 生成随机数无限流
    Stream<Double> randoms = Stream.generate(Math::random);
  5. 从文件创建

    1
    2
    3
    4
    5
    try (Stream<String> lines = Files.lines(Paths.get("file.txt"))) {
    lines.forEach(System.out::println);
    } catch (IOException e) {
    e.printStackTrace();
    }

Stream 的各种操作

我建议大家学会 Optional 类再来,那也在这里简单介绍一下吧,Optional类是一个可以为null的容器对象。如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象。

Stream 操作分为中间操作终止操作

中间操作(返回新的 Stream)

  • 过滤filter(Predicate<T>)

    按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。

    1
    2
    3
    4
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
    names.stream()
    .filter(name -> name.length() > 4) // 保留长度大于 4 的元素
    .forEach(System.out::println); // 输出:Alice, Charlie
  • 映射map(Function<T, R>)

    接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。

    1
    2
    3
    4
    List<Integer> numbers = Arrays.asList(1, 2, 3);
    numbers.stream()
    .map(n -> n * 2) // 将每个元素乘以 2
    .forEach(System.out::println); // 输出:2, 4, 6
  • 扁平化flatMap(Function<T, Stream<R>>)

    接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。

    1
    2
    3
    4
    5
    6
    7
    List<List<Integer>> nestedList = Arrays.asList(
    Arrays.asList(1, 2),
    Arrays.asList(3, 4)
    );
    nestedList.stream()
    .flatMap(List::stream) // 将嵌套列表展开为单个流
    .forEach(System.out::println); // 输出:1, 2, 3, 4
  • 排序sorted()sorted(Comparator<T>)

    1
    2
    3
    4
    List<String> names = Arrays.asList("Charlie", "Alice", "Bob");
    names.stream()
    .sorted() // 自然排序
    .forEach(System.out::println); // 输出:Alice, Bob, Charlie
  • 去重distinct()

    1
    2
    3
    4
    List<Integer> numbers = Arrays.asList(1, 1, 2, 2, 3);
    numbers.stream()
    .distinct() // 去重
    .forEach(System.out::println); // 输出:1, 2, 3
  • 截断limit(long n)skip(long n)

    1
    2
    3
    4
    5
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    numbers.stream()
    .skip(2) // 跳过前 2 个元素
    .limit(2) // 取接下来的 2 个元素
    .forEach(System.out::println); // 输出:3, 4
  • 原始类型映射

    将元素映射为原始类型(如intlongdouble)的流,避免装箱拆箱开销。

    1
    2
    3
    List<String> strings = Arrays.asList("1", "2", "3");
    IntStream intStream = strings.stream()
    .mapToInt(Integer::parseInt); // 映射为 IntStream
  • 匹配转换(Optional 映射)

    1
    2
    3
    4
    5
    6
    List<String> names = Arrays.asList("Alice", null, "Bob");
    names.stream()
    .flatMap(name -> Optional.ofNullable(name)
    .map(Stream::of)
    .orElseGet(Stream::empty)) // 过滤 null
    .forEach(System.out::println);
  • 截断(分页)skip()limit()

    1
    2
    3
    4
    5
    6
    7
    8
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    List<Integer> limited = numbers.stream()
    .limit(3) // 取前3个元素
    .collect(Collectors.toList()); // [1, 2, 3]

    List<Integer> skipped = numbers.stream()
    .skip(2) // 跳过前2个元素
    .collect(Collectors.toList()); // [3, 4, 5]

终止操作(返回最终结果或 void)

  • 遍历forEach(Consumer<T>)

    此时Stream中的元素是以Optional类型存在的。Stream的遍历、匹配非常简单。

    1
    2
    List<String> names = Arrays.asList("Alice", "Bob");
    names.stream().forEach(System.out::println); // 输出:Alice, Bob
  • 收集collect(Collector<T, A, R>)

    把一个流收集起来,最终可以是收集成一个值也可以收集成一个新的集合。

    1
    2
    3
    4
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
    List<String> filteredNames = names.stream()
    .filter(name -> name.length() > 4)
    .collect(Collectors.toList());
  • 统计count(), min(Comparator<T>), max(Comparator<T>)

    1
    2
    3
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    long count = numbers.stream().count(); // 5
    Optional<Integer> max = numbers.stream().max(Integer::compareTo); // 5
  • 匹配anyMatch(Predicate<T>), allMatch(Predicate<T>), noneMatch(Predicate<T>)

    1
    2
    List<Integer> numbers = Arrays.asList(1, 2, 3);
    boolean hasEven = numbers.stream().anyMatch(n -> n % 2 == 0); // true
  • 查找findFirst(), findAny()

    1
    2
    List<Integer> numbers = Arrays.asList(1, 2, 3);
    Optional<Integer> first = numbers.stream().findFirst(); // 1
  • 归约reduce(T identity, BinaryOperator<T>)

    1
    2
    List<Integer> numbers = Arrays.asList(1, 2, 3);
    int sum = numbers.stream().reduce(0, (a, b) -> a + b); // 6

Collectors 工具类

Collectors 提供了常用的收集器实现,用于将 Stream 结果转换为集合或其他数据结构:

  1. 收集到集合

    1
    2
    List<String> list = stream.collect(Collectors.toList());
    Set<String> set = stream.collect(Collectors.toSet());
  2. 连接字符串

    1
    String joined = stream.collect(Collectors.joining(", ")); // "a, b, c"
  3. 分组

    1
    2
    3
    Map<Integer, List<String>> lengthGroups = names.stream()
    .collect(Collectors.groupingBy(String::length));
    // 结果:{3=[Bob], 5=[Alice], 7=[Charlie]}
  4. 分区(Partitioning)

    1
    2
    3
    Map<Boolean, List<Integer>> evenOdd = numbers.stream()
    .collect(Collectors.partitioningBy(n -> n % 2 == 0));
    // 结果:{false=[1, 3], true=[2, 4]}
  5. 统计

    1
    2
    3
    IntSummaryStatistics stats = numbers.stream()
    .collect(Collectors.summarizingInt(Integer::intValue));
    // 包含 count, sum, min, max, average

流的并行化—Stream和parallelStream

stream是顺序流,由主线程按顺序对流执行操作,Stream 上的所有操作按顺序在单线程中执行。处理少量数据时,顺序流的开销更低。

parallelStream是并行流,内部以多线程并行执行的方式对流进行操作,利用 Java 的ForkJoinPool将数据分成多个块,在多个线程中并行处理,但前提是流中的数据处理没有顺序要求,除非显式调用sequential()或使用forEachOrdered()。而且操作不依赖元素顺序或外部状态(如filtermap)。

特性 Stream(顺序流) parallelStream(并行流)
执行方式 单线程顺序执行 多线程并行执行
线程池 无需线程池 默认使用ForkJoinPool.commonPool()
元素顺序 保持顺序 不保证顺序(除非显式调用forEachOrdered()
性能 小数据量或 IO 密集型任务更优 大数据量且 CPU 密集型任务更优
线程安全 无需考虑 需确保操作线程安全(如避免共享可变状态)
适用场景 简单操作、依赖顺序、IO 密集型 复杂计算、大数据量、CPU 密集型

例如筛选集合中的奇数,两者的处理不同之处:

image-20250714142427023

使用 Collection.parallelStream() 方法直接创建并行流,元素可能由不同线程并行处理,顺序不固定。

1
2
3
4
5
6
7
List<String> list = Arrays.asList("apple", "banana", "cherry", "date");

// 创建并行流并打印每个元素的线程名
list.parallelStream()
.forEach(element -> {
System.out.println("处理元素: " + element + ",线程: " + Thread.currentThread().getName());
});

同理, Arrays.parallelStream() 方法也能创建并行流

1
2
3
4
5
6
int[] array = {1, 2, 3, 4, 5};

// 创建并行流并计算平方和
long sum = Arrays.parallelStream(array)
.map(n -> n * n)
.sum();

通过 Stream.parallel() 方法将已有的顺序流转换为并行流。

1
2
3
4
5
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

numbers.stream() // 创建顺序流
.parallel() // 转换为并行流
.forEach(System.out::println);

无限流也能并性化,无限流必须配合 limit() 等终止操作,否则会无限运行。

1
2
3
4
5
6
7
Random random = new Random();

// 生成10个随机数的并行流
Stream.generate(random::nextInt)
.limit(10)
.parallel()
.forEach(System.out::println);

默认情况下,并行流使用公共的 ForkJoinPool.commonPool(),你也可以指定自定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// 创建自定义线程池(3个线程)
ForkJoinPool customPool = new ForkJoinPool(3);

// 在自定义线程池中执行并行流
customPool.submit(() -> {
numbers.parallelStream()
.forEach(n -> System.out.println(Thread.currentThread().getName() + ": " + n));
}).join();

customPool.shutdown();

并行流的注意事项

  1. 线程安全问题

并行流在多线程环境下执行,若操作共享可变状态会导致数据竞争。

1
2
3
4
5
6
7
8
9
10
// 错误示例:共享可变变量
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = 0;
numbers.parallelStream()
.forEach(n -> sum += n); // 非线程安全,结果可能错误

// 正确示例:使用原子类或线程安全集合
AtomicInteger sum = new AtomicInteger(0);
numbers.parallelStream()
.forEach(n -> sum.addAndGet(n)); // 线程安全
  1. 避免阻塞操作

并行流的线程池是共享的,若执行 IO 阻塞操作会导致所有并行任务阻塞。

1
2
3
4
5
6
// 反例:并行流中执行IO操作
urls.parallelStream()
.map(url -> fetchData(url)) // 阻塞操作,可能耗尽线程池
.collect(Collectors.toList());

// 正确做法:使用专门的异步IO库(如CompletableFuture)
  1. 并行效率与开销
    • 并行流的启动和线程切换有开销,小数据量时可能比顺序流更慢。
    • 使用 peek() 监控并行流的执行线程:
1
2
3
4
numbers.parallelStream()
.peek(n -> System.out.println(Thread.currentThread().getName()))
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());
  1. 终止操作的选择
    • *无序终止操作(如forEachcollect)在并行流中效率更高。
    • 有序终止操作**(如forEachOrderedfindFirst)会强制按顺序执行,抵消并行优势。

流的性能相关

  1. 延迟执行(Lazy Evaluation)

    Stream 的中间操作不会立即执行,只有遇到终止操作时才会触发计算。这种特性避免了不必要的计算,提升效率。

    1
    2
    3
    4
    5
    6
    7
    8
    // 示例:中间操作不会执行,直到遇到终止操作
    Stream<Integer> stream = Stream.of(1, 2, 3, 4)
    .filter(n -> {
    System.out.println("过滤: " + n);
    return n % 2 == 0;
    }); // 此时不会打印任何内容

    stream.forEach(System.out::println); // 终止操作触发执行
  2. 短路操作

    某些操作可以在满足条件时提前终止流的处理,例如:

    • anyMatch:任意元素满足条件即返回
    • allMatch:所有元素满足条件才返回
    • findFirst:找到第一个元素即返回
    1
    2
    3
    // 示例:找到第一个偶数后立即终止
    boolean hasEven = Stream.of(1, 3, 5, 6, 7)
    .anyMatch(n -> n % 2 == 0); // 遇到6时立即返回true
  3. 原始类型流

    使用 IntStream、LongStream 等避免装箱拆箱开销。

    1
    2
    3
    // 示例:计算整数列表的和(避免Integer装箱)
    int sum = IntStream.of(1, 2, 3, 4)
    .sum();

    优先使用原始类型流,或通过 mapToInt/mapToLong 等转换。

    1
    2
    3
    4
    5
    // 示例:将对象流转换为原始类型流
    List<Integer> list = Arrays.asList(1, 2, 3);
    int sum = list.stream()
    .mapToInt(Integer::intValue) // 避免自动装箱拆箱
    .sum();
  4. 避免重复创建流

    流只能消费一次,重复使用需重新创建。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 错误示例:流已被消费
    Stream<Integer> stream = Stream.of(1, 2, 3);
    stream.forEach(System.out::println);
    stream.forEach(System.out::println); // 抛出异常

    // 正确示例:每次创建新流
    Supplier<Stream<Integer>> streamSupplier = () -> Stream.of(1, 2, 3);
    streamSupplier.get().forEach(System.out::println);
    streamSupplier.get().forEach(System.out::println); // 正常执行

Stream 与 Lambda 表达式结合

Stream API 与 Lambda 表达式紧密结合,使代码更简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 传统方式:使用匿名内部类
list.forEach(new Consumer<Integer>() {
@Override
public void accept(Integer n) {
System.out.println(n);
}
});

// Lambda方式:
list.forEach(n -> System.out.println(n));

// 方法引用方式:
list.forEach(System.out::println);

更简洁的 Lambda 表达式写法就是引用已有的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 示例:使用方法引用
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");

// 对象::实例方法
names.forEach(System.out::println); // 等同于 n -> System.out.println(n)

// 类::静态方法
List<Integer> numbers = Arrays.asList("1", "2", "3");
List<Integer> ints = numbers.stream()
.map(Integer::parseInt) // 等同于 s -> Integer.parseInt(s)
.collect(Collectors.toList());

// 类::实例方法
List<String> upperCaseNames = names.stream()
.map(String::toUpperCase) // 等同于 s -> s.toUpperCase()
.collect(Collectors.toList());

而且多个 Lambda 可以组合使用,这就是我们现在使用 Stream 流的最多的最常见的方式

1
2
3
4
// 示例:按长度排序,长度相同按字母序排序
List<String> names = Arrays.asList("Charlie", "Bob", "Alice");
names.sort(Comparator.comparingInt(String::length)
.thenComparing(String::compareTo));

Stream 的操作接受各种函数式接口作为参数:

  • Predicate<T>:过滤条件
  • Function<T, R>:映射转换
  • Consumer<T>:消费元素
  • Supplier<T>:提供元素
1
2
3
4
5
6
7
8
9
// 示例:使用Predicate过滤偶数
List<Integer> evenNumbers = list.stream()
.filter(n -> n % 2 == 0) // Predicate<Integer>
.collect(Collectors.toList());

// 示例:使用Function映射元素
List<String> strings = list.stream()
.map(n -> "Number: " + n) // Function<Integer, String>
.collect(Collectors.toList());

可以闭包,Lambda 可以捕获外部变量,但要求变量必须是 final 或有效 final。

1
2
3
4
int factor = 2; // 有效final变量
List<Integer> result = list.stream()
.map(n -> n * factor) // 捕获外部变量factor
.collect(Collectors.toList());