Class OpenParallel

java.lang.Object
cloud.opencode.base.parallel.OpenParallel

public final class OpenParallel extends Object
Open Parallel - Parallel Computing Utility Open 并行 - 并行计算工具

A facade class providing static methods for parallel task execution using JDK 25 virtual threads. Features include parallel execution, batch processing, and async pipeline composition.

使用 JDK 25 虚拟线程提供并行任务执行的静态方法的门面类。 特性包括并行执行、批处理和异步流水线组合。

Example | 示例:

// Parallel execution
OpenParallel.runAll(
    () -> sendEmail(),
    () -> sendSMS(),
    () -> pushNotification()
);

// Parallel with results
List<String> results = OpenParallel.invokeAll(
    () -> fetchFromServiceA(),
    () -> fetchFromServiceB()
);

// Parallel map with concurrency limit
List<Result> processed = OpenParallel.parallelMap(items, item -> process(item), 10);

// Async pipeline
String result = OpenParallel.pipeline(() -> fetchData())
    .then(this::transform)
    .onError(e -> "fallback")
    .get();

Security | 安全性:

  • Thread-safe: Yes (utility class, stateless) - 线程安全: 是(工具类,无状态)
Since:
JDK 25, opencode-base-parallel V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • runAll

      public static void runAll(Runnable... tasks)
      Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。
      Parameters:
      tasks - the tasks to run - 要运行的任务
    • runAll

      public static void runAll(Collection<Runnable> tasks)
      Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。
      Parameters:
      tasks - the tasks to run - 要运行的任务
    • runAll

      public static void runAll(Collection<Runnable> tasks, Duration timeout)
      Runs all tasks in parallel with timeout. 并行运行所有任务,带超时。
      Parameters:
      tasks - the tasks to run - 要运行的任务
      timeout - the timeout - 超时
    • invokeAll

      @SafeVarargs public static <T> List<T> invokeAll(Supplier<T>... suppliers)
      Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the results - 结果
    • invokeAll

      public static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers)
      Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the results - 结果
    • invokeAll

      public static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers, Duration timeout)
      Invokes all suppliers in parallel with timeout. 并行调用所有 Supplier,带超时。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      timeout - the timeout - 超时
      Returns:
      the results - 结果
    • invokeAny

      @SafeVarargs public static <T> T invokeAny(Supplier<T>... suppliers)
      Invokes all and returns the first to complete. 调用所有并返回首个完成的。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the first result - 首个结果
    • invokeAny

      public static <T> T invokeAny(Collection<Supplier<T>> suppliers)
      Invokes all and returns the first to complete. 调用所有并返回首个完成的。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers - Supplier
      Returns:
      the first result - 首个结果
    • parallelForEach

      public static <T> void parallelForEach(Collection<T> items, int parallelism, Consumer<T> action)
      Applies an action to each item in parallel with bounded concurrency. 使用有界并发对每个元素并行应用操作。

      Uses a Semaphore to limit concurrency, ensuring at most parallelism tasks run simultaneously.

      使用 Semaphore 限制并发,确保同时最多运行 parallelism 个任务。

      Type Parameters:
      T - the item type - 元素类型
      Parameters:
      items - the items to process - 要处理的元素
      parallelism - the maximum concurrency level - 最大并发级别
      action - the action to apply to each item - 应用于每个元素的操作
      Throws:
      OpenParallelException - if any task fails or is interrupted - 如果任何任务失败或被中断
    • parallelForEach

      public static <T> void parallelForEach(Collection<T> items, int parallelism, Consumer<T> action, Duration timeout)
      Applies an action to each item in parallel with bounded concurrency and timeout. 使用有界并发和超时对每个元素并行应用操作。

      Uses a Semaphore to limit concurrency. If the timeout expires before all tasks complete, remaining futures are cancelled.

      使用 Semaphore 限制并发。如果在所有任务完成之前超时到期, 则取消剩余的 Future。

      Type Parameters:
      T - the item type - 元素类型
      Parameters:
      items - the items to process - 要处理的元素
      parallelism - the maximum concurrency level - 最大并发级别
      action - the action to apply to each item - 应用于每个元素的操作
      timeout - the maximum time to wait - 最大等待时间
      Throws:
      OpenParallelException - if tasks time out, fail, or are interrupted - 如果任务超时、失败或被中断
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper)
      Maps items in parallel using virtual threads. 使用虚拟线程并行映射项目。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      Returns:
      the results - 结果
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int parallelism)
      Maps items in parallel with concurrency limit. 并行映射项目,带并发限制。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      parallelism - the max concurrency - 最大并发数
      Returns:
      the results - 结果
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int parallelism, Duration timeout)
      Maps items in parallel with concurrency limit and timeout. 并行映射项目,带并发限制和超时。
      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      mapper - the mapper function - 映射函数
      parallelism - the max concurrency - 最大并发数
      timeout - the timeout - 超时
      Returns:
      the results - 结果
    • parallelMapSettled

      public static <T,R> ParallelResult<R> parallelMapSettled(List<T> items, Function<T,R> mapper, int parallelism)
      Maps items in parallel, collecting both successes and failures instead of throwing. 并行映射元素,收集成功和失败结果而非抛出异常。

      Unlike parallelMap(List, Function, int), this method never throws on individual task failure. Instead, all outcomes are collected into a ParallelResult containing both successful results and failure exceptions.

      parallelMap(List, Function, int) 不同,此方法不会因单个任务失败而抛出异常。 所有结果被收集到 ParallelResult 中,包含成功结果和失败异常。

      The order of results in the returned list may differ from the input order.

      返回列表中结果的顺序可能与输入顺序不同。

      Type Parameters:
      T - the input type - 输入类型
      R - the result type - 结果类型
      Parameters:
      items - the items to map - 要映射的元素
      mapper - the mapper function - 映射函数
      parallelism - the maximum concurrency level - 最大并发级别
      Returns:
      a ParallelResult containing successes and failures - 包含成功和失败的 ParallelResult
    • forEachAsCompleted

      public static <T> void forEachAsCompleted(List<Supplier<T>> suppliers, Consumer<T> action)
      Executes suppliers in parallel and processes results in completion order. 并行执行 Supplier,按完成顺序处理结果。

      Unlike invokeAll(Collection) which returns results in submission order, this method invokes the action as each task completes, allowing earlier processing of faster tasks.

      invokeAll(Collection) 按提交顺序返回结果不同, 此方法在每个任务完成时调用操作,允许更快地处理先完成的任务。

      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers to execute - 要执行的 Supplier
      action - the action to apply to each result - 应用于每个结果的操作
    • forEachAsCompleted

      public static <T> void forEachAsCompleted(List<Supplier<T>> suppliers, int parallelism, Consumer<T> action)
      Executes suppliers in parallel with bounded concurrency and processes results in completion order. 使用有界并发并行执行 Supplier,按完成顺序处理结果。

      Combines concurrency control via Semaphore with completion-order processing via a blocking queue.

      通过 Semaphore 实现并发控制,通过阻塞队列实现按完成顺序处理。

      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      suppliers - the suppliers to execute - 要执行的 Supplier
      parallelism - the maximum concurrency level - 最大并发级别
      action - the action to apply to each result - 应用于每个结果的操作
    • processBatch

      public static <T> void processBatch(List<T> items, int batchSize, Consumer<List<T>> processor)
      Processes items in batches. 批量处理项目。
      Type Parameters:
      T - the item type - 项目类型
      Parameters:
      items - the items - 项目
      batchSize - the batch size - 批大小
      processor - the batch processor - 批处理器
    • processBatchAndCollect

      public static <T,R> List<R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>,List<R>> processor)
      Processes items in batches with result collection. 批量处理项目并收集结果。
      Type Parameters:
      T - the item type - 项目类型
      R - the result type - 结果类型
      Parameters:
      items - the items - 项目
      batchSize - the batch size - 批大小
      processor - the batch processor - 批处理器
      Returns:
      the results - 结果
    • pipeline

      public static <T> AsyncPipeline<T> pipeline(Supplier<T> initial)
      Creates an async pipeline from initial supplier. 从初始 Supplier 创建异步流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      initial - the initial supplier - 初始 Supplier
      Returns:
      the pipeline - 流水线
    • pipeline

      public static <T> AsyncPipeline<T> pipeline(CompletableFuture<T> future)
      Creates an async pipeline from existing future. 从现有 Future 创建异步流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      future - the future - Future
      Returns:
      the pipeline - 流水线
    • combine

      public static <T1,T2,R> CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1,T2,R> combiner)
      Combines two futures with a combiner function. 使用组合函数组合两个 Future。
      Type Parameters:
      T1 - the first type - 第一个类型
      T2 - the second type - 第二个类型
      R - the result type - 结果类型
      Parameters:
      f1 - the first future - 第一个 Future
      f2 - the second future - 第二个 Future
      combiner - the combiner function - 组合函数
      Returns:
      the combined future - 组合的 Future
    • combine

      public static <T1,T2,T3,R> CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1,T2,T3,R> combiner)
      Combines three futures with a combiner function. 使用组合函数组合三个 Future。
      Type Parameters:
      T1 - the first type - 第一个类型
      T2 - the second type - 第二个类型
      T3 - the third type - 第三个类型
      R - the result type - 结果类型
      Parameters:
      f1 - the first future - 第一个 Future
      f2 - the second future - 第二个 Future
      f3 - the third future - 第三个 Future
      combiner - the combiner function - 组合函数
      Returns:
      the combined future - 组合的 Future
    • async

      public static <T> CompletableFuture<T> async(Supplier<T> supplier)
      Creates an async supplier. 创建异步 Supplier。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      supplier - the supplier - Supplier
      Returns:
      the future - Future
    • async

      public static CompletableFuture<Void> async(Runnable runnable)
      Creates an async runnable. 创建异步 Runnable。
      Parameters:
      runnable - the runnable - Runnable
      Returns:
      the future - Future
    • delay

      public static <T> CompletableFuture<T> delay(Duration delay, Supplier<T> supplier)
      Delays execution. 延迟执行。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      delay - the delay - 延迟
      supplier - the supplier - Supplier
      Returns:
      the future - Future
    • getExecutor

      public static ExecutorService getExecutor()
      Gets the shared virtual thread executor. 获取共享虚拟线程执行器。
      Returns:
      the executor - 执行器
    • rateLimited

      public static RateLimitedExecutor rateLimited(double permitsPerSecond)
      Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。

      Example | 示例:

      RateLimitedExecutor executor = OpenParallel.rateLimited(100);
      executor.submit(() -> callApi());
      
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      Returns:
      the rate limited executor - 限速执行器
    • rateLimited

      public static RateLimitedExecutor rateLimited(double permitsPerSecond, long burstCapacity)
      Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。

      Example | 示例:

      // 100 requests per second, burst of 20
      RateLimitedExecutor executor = OpenParallel.rateLimited(100, 20);
      
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      burstCapacity - the burst capacity - 突发容量
      Returns:
      the rate limited executor - 限速执行器
    • invokeAllRateLimited

      @SafeVarargs public static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks)
      Executes tasks with rate limiting. 使用限速执行任务。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      tasks - the tasks - 任务
      Returns:
      the results - 结果