Class OpenParallel
A facade class providing static methods for parallel task execution using JDK 25 virtual threads. Features include parallel execution, batch processing, and async pipeline composition.
使用 JDK 25 虚拟线程提供并行任务执行的静态方法的门面类。 特性包括并行执行、批处理和异步流水线组合。
Example | 示例:
// Parallel execution
OpenParallel.runAll(
() -> sendEmail(),
() -> sendSMS(),
() -> pushNotification()
);
// Parallel with results
List<String> results = OpenParallel.invokeAll(
() -> fetchFromServiceA(),
() -> fetchFromServiceB()
);
// Parallel map with concurrency limit
List<Result> processed = OpenParallel.parallelMap(items, item -> process(item), 10);
// Async pipeline
String result = OpenParallel.pipeline(() -> fetchData())
.then(this::transform)
.onError(e -> "fallback")
.get();
Security | 安全性:
- Thread-safe: Yes (utility class, stateless) - 线程安全: 是(工具类,无状态)
- Since:
- JDK 25, opencode-base-parallel V1.0.0
- Author:
- Leon Soo www.LeonSoo.com
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionstatic CompletableFuture<Void> Creates an async runnable.static <T> CompletableFuture<T> Creates an async supplier.static <T1,T2, T3, R>
CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1, T2, T3, R> combiner) Combines three futures with a combiner function.static <T1,T2, R> CompletableFuture <R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1, T2, R> combiner) Combines two futures with a combiner function.static <T> CompletableFuture<T> Delays execution.static <T> voidforEachAsCompleted(List<Supplier<T>> suppliers, int parallelism, Consumer<T> action) Executes suppliers in parallel with bounded concurrency and processes results in completion order.static <T> voidforEachAsCompleted(List<Supplier<T>> suppliers, Consumer<T> action) Executes suppliers in parallel and processes results in completion order.static ExecutorServiceGets the shared virtual thread executor.static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers) Invokes all suppliers in parallel and collects results.static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers, Duration timeout) Invokes all suppliers in parallel with timeout.static <T> List<T> Invokes all suppliers in parallel and collects results.static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks) Executes tasks with rate limiting.static <T> TinvokeAny(Collection<Supplier<T>> suppliers) Invokes all and returns the first to complete.static <T> TInvokes all and returns the first to complete.static <T> voidparallelForEach(Collection<T> items, int parallelism, Consumer<T> action) Applies an action to each item in parallel with bounded concurrency.static <T> voidparallelForEach(Collection<T> items, int parallelism, Consumer<T> action, Duration timeout) Applies an action to each item in parallel with bounded concurrency and timeout.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper) Maps items in parallel using virtual threads.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism) Maps items in parallel with concurrency limit.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism, Duration timeout) Maps items in parallel with concurrency limit and timeout.static <T,R> ParallelResult <R> parallelMapSettled(List<T> items, Function<T, R> mapper, int parallelism) Maps items in parallel, collecting both successes and failures instead of throwing.static <T> AsyncPipeline<T> pipeline(CompletableFuture<T> future) Creates an async pipeline from existing future.static <T> AsyncPipeline<T> Creates an async pipeline from initial supplier.static <T> voidprocessBatch(List<T> items, int batchSize, Consumer<List<T>> processor) Processes items in batches.static <T,R> List <R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>, List<R>> processor) Processes items in batches with result collection.static RateLimitedExecutorrateLimited(double permitsPerSecond) Creates a rate limited executor with specified permits per second.static RateLimitedExecutorrateLimited(double permitsPerSecond, long burstCapacity) Creates a rate limited executor with specified rate and burst capacity.static voidRuns all tasks in parallel, waiting for completion.static voidrunAll(Collection<Runnable> tasks) Runs all tasks in parallel, waiting for completion.static voidrunAll(Collection<Runnable> tasks, Duration timeout) Runs all tasks in parallel with timeout.
-
Method Details
-
runAll
Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。- Parameters:
tasks- the tasks to run - 要运行的任务
-
runAll
Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。- Parameters:
tasks- the tasks to run - 要运行的任务
-
runAll
Runs all tasks in parallel with timeout. 并行运行所有任务,带超时。- Parameters:
tasks- the tasks to run - 要运行的任务timeout- the timeout - 超时
-
invokeAll
Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the results - 结果
-
invokeAll
Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the results - 结果
-
invokeAll
Invokes all suppliers in parallel with timeout. 并行调用所有 Supplier,带超时。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Suppliertimeout- the timeout - 超时- Returns:
- the results - 结果
-
invokeAny
Invokes all and returns the first to complete. 调用所有并返回首个完成的。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the first result - 首个结果
-
invokeAny
Invokes all and returns the first to complete. 调用所有并返回首个完成的。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the first result - 首个结果
-
parallelForEach
Applies an action to each item in parallel with bounded concurrency. 使用有界并发对每个元素并行应用操作。Uses a
Semaphoreto limit concurrency, ensuring at mostparallelismtasks run simultaneously.使用
Semaphore限制并发,确保同时最多运行parallelism个任务。- Type Parameters:
T- the item type - 元素类型- Parameters:
items- the items to process - 要处理的元素parallelism- the maximum concurrency level - 最大并发级别action- the action to apply to each item - 应用于每个元素的操作- Throws:
OpenParallelException- if any task fails or is interrupted - 如果任何任务失败或被中断
-
parallelForEach
public static <T> void parallelForEach(Collection<T> items, int parallelism, Consumer<T> action, Duration timeout) Applies an action to each item in parallel with bounded concurrency and timeout. 使用有界并发和超时对每个元素并行应用操作。Uses a
Semaphoreto limit concurrency. If the timeout expires before all tasks complete, remaining futures are cancelled.使用
Semaphore限制并发。如果在所有任务完成之前超时到期, 则取消剩余的 Future。- Type Parameters:
T- the item type - 元素类型- Parameters:
items- the items to process - 要处理的元素parallelism- the maximum concurrency level - 最大并发级别action- the action to apply to each item - 应用于每个元素的操作timeout- the maximum time to wait - 最大等待时间- Throws:
OpenParallelException- if tasks time out, fail, or are interrupted - 如果任务超时、失败或被中断
-
parallelMap
Maps items in parallel using virtual threads. 使用虚拟线程并行映射项目。- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items - 项目mapper- the mapper function - 映射函数- Returns:
- the results - 结果
-
parallelMap
Maps items in parallel with concurrency limit. 并行映射项目,带并发限制。- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items - 项目mapper- the mapper function - 映射函数parallelism- the max concurrency - 最大并发数- Returns:
- the results - 结果
-
parallelMap
public static <T,R> List<R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism, Duration timeout) Maps items in parallel with concurrency limit and timeout. 并行映射项目,带并发限制和超时。- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items - 项目mapper- the mapper function - 映射函数parallelism- the max concurrency - 最大并发数timeout- the timeout - 超时- Returns:
- the results - 结果
-
parallelMapSettled
public static <T,R> ParallelResult<R> parallelMapSettled(List<T> items, Function<T, R> mapper, int parallelism) Maps items in parallel, collecting both successes and failures instead of throwing. 并行映射元素,收集成功和失败结果而非抛出异常。Unlike
parallelMap(List, Function, int), this method never throws on individual task failure. Instead, all outcomes are collected into aParallelResultcontaining both successful results and failure exceptions.与
parallelMap(List, Function, int)不同,此方法不会因单个任务失败而抛出异常。 所有结果被收集到ParallelResult中,包含成功结果和失败异常。The order of results in the returned list may differ from the input order.
返回列表中结果的顺序可能与输入顺序不同。
- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items to map - 要映射的元素mapper- the mapper function - 映射函数parallelism- the maximum concurrency level - 最大并发级别- Returns:
- a
ParallelResultcontaining successes and failures - 包含成功和失败的ParallelResult
-
forEachAsCompleted
Executes suppliers in parallel and processes results in completion order. 并行执行 Supplier,按完成顺序处理结果。Unlike
invokeAll(Collection)which returns results in submission order, this method invokes the action as each task completes, allowing earlier processing of faster tasks.与
invokeAll(Collection)按提交顺序返回结果不同, 此方法在每个任务完成时调用操作,允许更快地处理先完成的任务。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers to execute - 要执行的 Supplieraction- the action to apply to each result - 应用于每个结果的操作
-
forEachAsCompleted
public static <T> void forEachAsCompleted(List<Supplier<T>> suppliers, int parallelism, Consumer<T> action) Executes suppliers in parallel with bounded concurrency and processes results in completion order. 使用有界并发并行执行 Supplier,按完成顺序处理结果。Combines concurrency control via
Semaphorewith completion-order processing via a blocking queue.通过
Semaphore实现并发控制,通过阻塞队列实现按完成顺序处理。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers to execute - 要执行的 Supplierparallelism- the maximum concurrency level - 最大并发级别action- the action to apply to each result - 应用于每个结果的操作
-
processBatch
-
processBatchAndCollect
public static <T,R> List<R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>, List<R>> processor) Processes items in batches with result collection. 批量处理项目并收集结果。- Type Parameters:
T- the item type - 项目类型R- the result type - 结果类型- Parameters:
items- the items - 项目batchSize- the batch size - 批大小processor- the batch processor - 批处理器- Returns:
- the results - 结果
-
pipeline
Creates an async pipeline from initial supplier. 从初始 Supplier 创建异步流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
initial- the initial supplier - 初始 Supplier- Returns:
- the pipeline - 流水线
-
pipeline
Creates an async pipeline from existing future. 从现有 Future 创建异步流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
future- the future - Future- Returns:
- the pipeline - 流水线
-
combine
public static <T1,T2, CompletableFuture<R> combineR> (CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1, T2, R> combiner) Combines two futures with a combiner function. 使用组合函数组合两个 Future。- Type Parameters:
T1- the first type - 第一个类型T2- the second type - 第二个类型R- the result type - 结果类型- Parameters:
f1- the first future - 第一个 Futuref2- the second future - 第二个 Futurecombiner- the combiner function - 组合函数- Returns:
- the combined future - 组合的 Future
-
combine
public static <T1,T2, CompletableFuture<R> combineT3, R> (CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1, T2, T3, R> combiner) Combines three futures with a combiner function. 使用组合函数组合三个 Future。- Type Parameters:
T1- the first type - 第一个类型T2- the second type - 第二个类型T3- the third type - 第三个类型R- the result type - 结果类型- Parameters:
f1- the first future - 第一个 Futuref2- the second future - 第二个 Futuref3- the third future - 第三个 Futurecombiner- the combiner function - 组合函数- Returns:
- the combined future - 组合的 Future
-
async
Creates an async supplier. 创建异步 Supplier。- Type Parameters:
T- the result type - 结果类型- Parameters:
supplier- the supplier - Supplier- Returns:
- the future - Future
-
async
Creates an async runnable. 创建异步 Runnable。- Parameters:
runnable- the runnable - Runnable- Returns:
- the future - Future
-
delay
Delays execution. 延迟执行。- Type Parameters:
T- the result type - 结果类型- Parameters:
delay- the delay - 延迟supplier- the supplier - Supplier- Returns:
- the future - Future
-
getExecutor
Gets the shared virtual thread executor. 获取共享虚拟线程执行器。- Returns:
- the executor - 执行器
-
rateLimited
Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。Example | 示例:
RateLimitedExecutor executor = OpenParallel.rateLimited(100); executor.submit(() -> callApi());- Parameters:
permitsPerSecond- the permits per second - 每秒许可数- Returns:
- the rate limited executor - 限速执行器
-
rateLimited
Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。Example | 示例:
// 100 requests per second, burst of 20 RateLimitedExecutor executor = OpenParallel.rateLimited(100, 20);- Parameters:
permitsPerSecond- the permits per second - 每秒许可数burstCapacity- the burst capacity - 突发容量- Returns:
- the rate limited executor - 限速执行器
-
invokeAllRateLimited
@SafeVarargs public static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks) Executes tasks with rate limiting. 使用限速执行任务。- Type Parameters:
T- the result type - 结果类型- Parameters:
permitsPerSecond- the permits per second - 每秒许可数tasks- the tasks - 任务- Returns:
- the results - 结果
-