Class OpenParallel

java.lang.Object
cloud.opencode.base.parallel.OpenParallel

public final class OpenParallel extends Object
Open Parallel - Parallel Computing Utility Open 并行 - 并行计算工具

A facade class providing static methods for parallel task execution using JDK 25 virtual threads. Features include parallel execution, batch processing, and async pipeline composition.

使用 JDK 25 虚拟线程提供并行任务执行的静态方法的门面类。 特性包括并行执行、批处理和异步流水线组合。

Example | 示例:

// Parallel execution
OpenParallel.runAll(
    () -> sendEmail(),
    () -> sendSMS(),
    () -> pushNotification()
);

// Parallel with results
List<String> results = OpenParallel.invokeAll(
    () -> fetchFromServiceA(),
    () -> fetchFromServiceB()
);

// Parallel map with concurrency limit
List<Result> processed = OpenParallel.parallelMap(items, item -> process(item), 10);

// Async pipeline
String result = OpenParallel.pipeline(() -> fetchData())
    .then(this::transform)
    .onError(e -> "fallback")
    .get();

Security | 安全性:

  • Thread-safe: Yes (utility class, stateless) - 线程安全: 是(工具类,无状态)
Since:
JDK 25, opencode-base-parallel V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • runAll

      public static void runAll(Runnable... tasks)
      Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。
      Parameters:
      tasks - the tasks to run - 要运行的任务
    • runAll

      public static void runAll(Collection<Runnable> tasks)
      Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。
      Parameters:
      tasks - the tasks to run - 要运行的任务
    • runAll

      public static void runAll(Collection<Runnable> tasks, Duration timeout)
      Runs all tasks in parallel with timeout. 并行运行所有任务,带超时。
      Parameters:
      tasks - the tasks to run - 要运行的任务
      timeout - the timeout - 超时
    • invokeAll

      @SafeVarargs public static <T> List<T> invokeAll(Supplier<T>... suppliers)
      Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the results - 结果
    • invokeAll

      public static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers)
      Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the results - 结果
    • invokeAll

      public static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers, Duration timeout)
      Invokes all suppliers in parallel with timeout. 并行调用所有 Supplier,带超时。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      timeout - the timeout - 超时
      Returns:
      the results - 结果
    • invokeAny

      @SafeVarargs public static <T> T invokeAny(Supplier<T>... suppliers)
      Invokes all and returns the first to complete. 调用所有并返回首个完成的。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the first result - 首个结果
    • invokeAny

      public static <T> T invokeAny(Collection<Supplier<T>> suppliers)
      Invokes all and returns the first to complete. 调用所有并返回首个完成的。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the first result - 首个结果
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper)
      Maps items in parallel using stream. 使用流并行映射项目。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      Returns:
      the results - 结果
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int parallelism)
      Maps items in parallel with concurrency limit. 并行映射项目,带并发限制。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      parallelism - the max concurrency - 最大并发数
      Returns:
      the results - 结果
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int parallelism, Duration timeout)
      Maps items in parallel with concurrency limit and timeout. 并行映射项目,带并发限制和超时。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      parallelism - the max concurrency - 最大并发数
      timeout - the timeout - 超时
      Returns:
      the results - 结果
    • processBatch

      public static <T> void processBatch(List<T> items, int batchSize, Consumer<List<T>> processor)
      Processes items in batches. 批量处理项目。
      Type Parameters:
      T - the item type - 项目类型
      Parameters:
      items - the items - 项目
      batchSize - the batch size - 批大小
      processor - the batch processor - 批处理器
    • processBatchAndCollect

      public static <T,R> List<R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>,List<R>> processor)
      Processes items in batches with result collection. 批量处理项目并收集结果。
      Type Parameters:
      T - the item type - 项目类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      batchSize - the batch size - 批大小
      processor - the batch processor - 批处理器
      Returns:
      the results - 结果
    • pipeline

      public static <T> AsyncPipeline<T> pipeline(Supplier<T> initial)
      Creates an async pipeline from initial supplier. 从初始 Supplier 创建异步流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      initial - the initial supplier - 初始 Supplier
      Returns:
      the pipeline - 流水线
    • pipeline

      public static <T> AsyncPipeline<T> pipeline(CompletableFuture<T> future)
      Creates an async pipeline from existing future. 从现有 Future 创建异步流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      future - the future - Future
      Returns:
      the pipeline - 流水线
    • combine

      public static <T1,T2,R> CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1,T2,R> combiner)
      Combines two futures with a combiner function. 使用组合函数组合两个 Future。
      Type Parameters:
      T1 - the first type - 第一个类型
      T2 - the second type - 第二个类型
      R - the result type - 结果类型
      Parameters:
      f1 - the first future - 第一个 Future
      f2 - the second future - 第二个 Future
      combiner - the combiner function - 组合函数
      Returns:
      the combined future - 组合的 Future
    • combine

      public static <T1,T2,T3,R> CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1,T2,T3,R> combiner)
      Combines three futures with a combiner function. 使用组合函数组合三个 Future。
      Type Parameters:
      T1 - the first type - 第一个类型
      T2 - the second type - 第二个类型
      T3 - the third type - 第三个类型
      R - the result type - 结果类型
      Parameters:
      f1 - the first future - 第一个 Future
      f2 - the second future - 第二个 Future
      f3 - the third future - 第三个 Future
      combiner - the combiner function - 组合函数
      Returns:
      the combined future - 组合的 Future
    • async

      public static <T> CompletableFuture<T> async(Supplier<T> supplier)
      Creates an async supplier. 创建异步 Supplier。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      supplier - the supplier - Supplier
      Returns:
      the future - Future
    • async

      public static CompletableFuture<Void> async(Runnable runnable)
      Creates an async runnable. 创建异步 Runnable。
      Parameters:
      runnable - the runnable - Runnable
      Returns:
      the future - Future
    • delay

      public static <T> CompletableFuture<T> delay(Duration delay, Supplier<T> supplier)
      Delays execution. 延迟执行。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      delay - the delay - 延迟
      supplier - the supplier - Supplier
      Returns:
      the future - Future
    • getExecutor

      public static ExecutorService getExecutor()
      Gets the shared virtual thread executor. 获取共享虚拟线程执行器。
      Returns:
      the executor - 执行器
    • rateLimited

      public static RateLimitedExecutor rateLimited(double permitsPerSecond)
      Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。

      Example | 示例:

      RateLimitedExecutor executor = OpenParallel.rateLimited(100);
      executor.submit(() -> callApi());
      
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      Returns:
      the rate limited executor - 限速执行器
    • rateLimited

      public static RateLimitedExecutor rateLimited(double permitsPerSecond, long burstCapacity)
      Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。

      Example | 示例:

      // 100 requests per second, burst of 20
      RateLimitedExecutor executor = OpenParallel.rateLimited(100, 20);
      
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      burstCapacity - the burst capacity - 突发容量
      Returns:
      the rate limited executor - 限速执行器
    • invokeAllRateLimited

      @SafeVarargs public static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks)
      Executes tasks with rate limiting. 使用限速执行任务。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      tasks - the tasks - 任务
      Returns:
      the results - 结果