Class OpenStream

java.lang.Object
cloud.opencode.base.core.OpenStream

public final class OpenStream extends Object
Stream Utility Class - Enhanced Stream operations with JDK 25 Gatherers support Stream工具类 - 增强的Stream操作,支持JDK 25 Gatherers

Provides utility methods for working with Java Streams, including creation, transformation, collection, and parallel processing.

提供Java Stream的实用方法,包括创建、转换、收集和并行处理。

Features | 主要功能:

  • Stream creation (of, from, range, generate) - Stream创建
  • Stream transformation (batch, window, sliding) - Stream转换
  • Parallel processing utilities - 并行处理工具
  • Collectors extensions - Collectors扩展
  • Stream combination (zip, merge, concat) - Stream组合

Usage Examples | 使用示例:

// Batch processing - 批量处理
List<List<User>> batches = OpenStream.batch(users.stream(), 100);

// Sliding window - 滑动窗口
Stream<List<Integer>> windows = OpenStream.slidingWindow(numbers, 3);

// Zip streams - 合并流
Stream<Pair<A, B>> zipped = OpenStream.zip(streamA, streamB, Pair::of);

// Parallel with concurrency limit - 限制并发的并行处理
results = OpenStream.parallelWithLimit(items, 10, item -> process(item));

Security | 安全性:

  • Thread-safe: Yes (stateless) - 线程安全: 是 (无状态)
  • Null-safe: Yes - 空值安全: 是
Since:
JDK 25, opencode-base-core V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • of

      @SafeVarargs public static <T> Stream<T> of(T... elements)
      Creates a stream from varargs 从可变参数创建Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      elements - elements | 元素
      Returns:
      stream | 流
    • from

      public static <T> Stream<T> from(Iterable<T> iterable)
      Creates a stream from an iterable 从Iterable创建Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      iterable - the iterable | 可迭代对象
      Returns:
      stream | 流
    • from

      public static <T> Stream<T> from(Iterator<T> iterator)
      Creates a stream from an iterator 从Iterator创建Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      iterator - the iterator | 迭代器
      Returns:
      stream | 流
    • from

      public static <T> Stream<T> from(Optional<T> optional)
      Creates a stream from an optional 从Optional创建Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      optional - the optional | 可选值
      Returns:
      stream with 0 or 1 element | 包含0或1个元素的流
    • range

      public static IntStream range(int startInclusive, int endExclusive)
      Creates an int range stream 创建整数范围Stream
      Parameters:
      startInclusive - start (inclusive) | 起始值(包含)
      endExclusive - end (exclusive) | 结束值(不包含)
      Returns:
      int stream | 整数流
    • rangeClosed

      public static IntStream rangeClosed(int startInclusive, int endInclusive)
      Creates an int range stream (closed) 创建闭区间整数范围Stream
      Parameters:
      startInclusive - start (inclusive) | 起始值(包含)
      endInclusive - end (inclusive) | 结束值(包含)
      Returns:
      int stream | 整数流
    • range

      public static LongStream range(long startInclusive, long endExclusive)
      Creates a long range stream 创建长整数范围Stream
      Parameters:
      startInclusive - start (inclusive) | 起始值(包含)
      endExclusive - end (exclusive) | 结束值(不包含)
      Returns:
      long stream | 长整数流
    • generate

      public static <T> Stream<T> generate(Supplier<T> supplier)
      Generates an infinite stream 生成无限Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      supplier - element supplier | 元素提供者
      Returns:
      infinite stream | 无限流
    • iterate

      public static <T> Stream<T> iterate(T seed, UnaryOperator<T> next)
      Generates an infinite stream with seed 使用种子生成无限Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      seed - initial value | 初始值
      next - next value function | 下一个值函数
      Returns:
      infinite stream | 无限流
    • iterate

      public static <T> Stream<T> iterate(T seed, Predicate<T> hasNext, UnaryOperator<T> next)
      Generates a finite stream with seed and predicate 使用种子和谓词生成有限Stream
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      seed - initial value | 初始值
      hasNext - continue predicate | 继续条件
      next - next value function | 下一个值函数
      Returns:
      finite stream | 有限流
    • batch

      public static <T> List<List<T>> batch(Stream<T> stream, int batchSize)
      Splits stream into batches of fixed size 将Stream分割成固定大小的批次
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      batchSize - batch size | 批次大小
      Returns:
      list of batches | 批次列表
    • batchStream

      public static <T> Stream<List<T>> batchStream(Collection<T> source, int batchSize)
      Creates a stream of batches 创建批次流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      source - source collection | 源集合
      batchSize - batch size | 批次大小
      Returns:
      stream of batches | 批次流
    • slidingWindow

      public static <T> Stream<List<T>> slidingWindow(Collection<T> source, int windowSize)
      Creates a sliding window stream 创建滑动窗口流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      source - source collection | 源集合
      windowSize - window size | 窗口大小
      Returns:
      stream of windows | 窗口流
    • slidingWindow

      public static <T> Stream<List<T>> slidingWindow(Collection<T> source, int windowSize, int step)
      Creates a sliding window stream with custom step 创建自定义步长的滑动窗口流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      source - source collection | 源集合
      windowSize - window size | 窗口大小
      step - step size | 步长
      Returns:
      stream of windows | 窗口流
    • tumblingWindow

      public static <T> Stream<List<T>> tumblingWindow(Collection<T> source, int windowSize)
      Creates a tumbling window stream (non-overlapping) 创建翻滚窗口流(不重叠)
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      source - source collection | 源集合
      windowSize - window size | 窗口大小
      Returns:
      stream of windows | 窗口流
    • zip

      public static <A,B,R> Stream<R> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A,B,R> zipper)
      Zips two streams into pairs 将两个流合并成对
      Type Parameters:
      A - first element type | 第一个元素类型
      B - second element type | 第二个元素类型
      R - result type | 结果类型
      Parameters:
      streamA - first stream | 第一个流
      streamB - second stream | 第二个流
      zipper - zip function | 合并函数
      Returns:
      zipped stream | 合并后的流
    • zipWithIndex

      public static <T> Stream<OpenStream.IndexedValue<T>> zipWithIndex(Stream<T> stream)
      Zips stream with indices 将流与索引合并
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      Returns:
      stream of indexed elements | 带索引的元素流
    • merge

      @SafeVarargs public static <T> Stream<T> merge(Stream<T>... streams)
      Merges multiple streams 合并多个流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      streams - streams to merge | 要合并的流
      Returns:
      merged stream | 合并后的流
    • interleave

      public static <T> Stream<T> interleave(Stream<T> streamA, Stream<T> streamB)
      Interleaves two streams 交错合并两个流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      streamA - first stream | 第一个流
      streamB - second stream | 第二个流
      Returns:
      interleaved stream | 交错后的流
    • filterNulls

      public static <T> Stream<T> filterNulls(Stream<T> stream)
      Filters nulls from stream 从流中过滤空值
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      Returns:
      stream without nulls | 不含空值的流
    • distinctBy

      public static <T,K> Stream<T> distinctBy(Stream<T> stream, Function<T,K> keyExtractor)
      Distinct by key 按键去重
      Type Parameters:
      T - element type | 元素类型
      K - key type | 键类型
      Parameters:
      stream - source stream | 源流
      keyExtractor - key extractor | 键提取器
      Returns:
      distinct stream | 去重后的流
    • takeWhile

      public static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<T> predicate)
      Takes elements while predicate is true 在谓词为真时获取元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      filtered stream | 过滤后的流
    • dropWhile

      public static <T> Stream<T> dropWhile(Stream<T> stream, Predicate<T> predicate)
      Drops elements while predicate is true 在谓词为真时跳过元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      filtered stream | 过滤后的流
    • findFirst

      public static <T> Optional<T> findFirst(Stream<T> stream, Predicate<T> predicate)
      Finds first matching element 查找第一个匹配元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      optional result | 可选结果
    • anyMatch

      public static <T> boolean anyMatch(Stream<T> stream, Predicate<T> predicate)
      Checks if any element matches 检查是否有元素匹配
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      true if any matches | 如果有匹配则为true
    • allMatch

      public static <T> boolean allMatch(Stream<T> stream, Predicate<T> predicate)
      Checks if all elements match 检查是否所有元素都匹配
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      true if all match | 如果全部匹配则为true
    • noneMatch

      public static <T> boolean noneMatch(Stream<T> stream, Predicate<T> predicate)
      Checks if no elements match 检查是否没有元素匹配
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - condition | 条件
      Returns:
      true if none match | 如果没有匹配则为true
    • toUnmodifiableList

      public static <T> List<T> toUnmodifiableList(Stream<T> stream)
      Collects to unmodifiable list 收集为不可变列表
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      Returns:
      unmodifiable list | 不可变列表
    • toUnmodifiableSet

      public static <T> Set<T> toUnmodifiableSet(Stream<T> stream)
      Collects to unmodifiable set 收集为不可变集合
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      Returns:
      unmodifiable set | 不可变集合
    • toMap

      public static <T,K> Map<K,T> toMap(Stream<T> stream, Function<T,K> keyMapper)
      Collects to map with key extractor 使用键提取器收集为Map
      Type Parameters:
      T - element type | 元素类型
      K - key type | 键类型
      Parameters:
      stream - source stream | 源流
      keyMapper - key extractor | 键提取器
      Returns:
      map | Map
    • toMap

      public static <T,K,V> Map<K,V> toMap(Stream<T> stream, Function<T,K> keyMapper, Function<T,V> valueMapper)
      Collects to map with key and value extractors 使用键和值提取器收集为Map
      Type Parameters:
      T - element type | 元素类型
      K - key type | 键类型
      V - value type | 值类型
      Parameters:
      stream - source stream | 源流
      keyMapper - key extractor | 键提取器
      valueMapper - value extractor | 值提取器
      Returns:
      map | Map
    • groupBy

      public static <T,K> Map<K,List<T>> groupBy(Stream<T> stream, Function<T,K> keyMapper)
      Groups by key 按键分组
      Type Parameters:
      T - element type | 元素类型
      K - key type | 键类型
      Parameters:
      stream - source stream | 源流
      keyMapper - key extractor | 键提取器
      Returns:
      grouped map | 分组Map
    • partitionBy

      public static <T> Map<Boolean,List<T>> partitionBy(Stream<T> stream, Predicate<T> predicate)
      Partitions by predicate 按谓词分区
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      predicate - partition condition | 分区条件
      Returns:
      partitioned map | 分区Map
    • joining

      public static <T> String joining(Stream<T> stream, CharSequence delimiter)
      Joins to string 连接为字符串
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      delimiter - delimiter | 分隔符
      Returns:
      joined string | 连接后的字符串
    • joining

      public static <T> String joining(Stream<T> stream, CharSequence delimiter, CharSequence prefix, CharSequence suffix)
      Joins to string with prefix and suffix 使用前缀和后缀连接为字符串
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      delimiter - delimiter | 分隔符
      prefix - prefix | 前缀
      suffix - suffix | 后缀
      Returns:
      joined string | 连接后的字符串
    • parallelMap

      public static <T,R> List<R> parallelMap(Collection<T> source, int parallelism, Function<T,R> mapper)
      Processes in parallel with limited concurrency 使用有限并发进行并行处理
      Type Parameters:
      T - input type | 输入类型
      R - result type | 结果类型
      Parameters:
      source - source collection | 源集合
      parallelism - parallelism level | 并行度
      mapper - transformation function | 转换函数
      Returns:
      result list | 结果列表
    • parallelMap

      public static <T,R> List<R> parallelMap(Collection<T> source, Function<T,R> mapper)
      Processes in parallel and collects results 并行处理并收集结果
      Type Parameters:
      T - input type | 输入类型
      R - result type | 结果类型
      Parameters:
      source - source collection | 源集合
      mapper - transformation function | 转换函数
      Returns:
      result list | 结果列表
    • parallelFilter

      public static <T> List<T> parallelFilter(Collection<T> source, Predicate<T> predicate)
      Filters in parallel 并行过滤
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      source - source collection | 源集合
      predicate - filter condition | 过滤条件
      Returns:
      filtered list | 过滤后的列表
    • count

      public static <T> long count(Stream<T> stream)
      Counts elements 统计元素数量
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      Returns:
      count | 数量
    • sumInt

      public static int sumInt(IntStream stream)
      Gets sum of integers 获取整数和
      Parameters:
      stream - source stream | 源流
      Returns:
      sum | 和
    • sumLong

      public static long sumLong(LongStream stream)
      Gets sum of longs 获取长整数和
      Parameters:
      stream - source stream | 源流
      Returns:
      sum | 和
    • sumDouble

      public static double sumDouble(DoubleStream stream)
      Gets sum of doubles 获取双精度数和
      Parameters:
      stream - source stream | 源流
      Returns:
      sum | 和
    • averageInt

      public static OptionalDouble averageInt(IntStream stream)
      Gets average of integers 获取整数平均值
      Parameters:
      stream - source stream | 源流
      Returns:
      optional average | 可选平均值
    • max

      public static <T> Optional<T> max(Stream<T> stream, Comparator<T> comparator)
      Gets max element 获取最大元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      comparator - comparator | 比较器
      Returns:
      optional max | 可选最大值
    • min

      public static <T> Optional<T> min(Stream<T> stream, Comparator<T> comparator)
      Gets min element 获取最小元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      comparator - comparator | 比较器
      Returns:
      optional min | 可选最小值
    • peek

      public static <T> Stream<T> peek(Stream<T> stream, Consumer<T> action)
      Peeks at each element for debugging 查看每个元素用于调试
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      action - action to perform | 要执行的操作
      Returns:
      same stream with peek | 带peek的同一个流
    • limit

      public static <T> Stream<T> limit(Stream<T> stream, long maxSize)
      Limits stream size 限制流大小
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      maxSize - max size | 最大大小
      Returns:
      limited stream | 限制后的流
    • skip

      public static <T> Stream<T> skip(Stream<T> stream, long n)
      Skips first n elements 跳过前n个元素
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      n - number to skip | 跳过数量
      Returns:
      stream after skip | 跳过后的流
    • sorted

      public static <T> Stream<T> sorted(Stream<T> stream, Comparator<T> comparator)
      Sorts stream 排序流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream | 源流
      comparator - comparator | 比较器
      Returns:
      sorted stream | 排序后的流
    • flatten

      public static <T> Stream<T> flatten(Stream<Stream<T>> stream)
      Flattens nested streams 展平嵌套流
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream of streams | 流的源流
      Returns:
      flattened stream | 展平后的流
    • flattenCollections

      public static <T> Stream<T> flattenCollections(Stream<? extends Collection<T>> stream)
      Flattens nested collections 展平嵌套集合
      Type Parameters:
      T - element type | 元素类型
      Parameters:
      stream - source stream of collections | 集合的源流
      Returns:
      flattened stream | 展平后的流