Class OpenParallel
java.lang.Object
cloud.opencode.base.parallel.OpenParallel
Open Parallel - Parallel Computing Utility
Open 并行 - 并行计算工具
A facade class providing static methods for parallel task execution using JDK 25 virtual threads. Features include parallel execution, batch processing, and async pipeline composition.
使用 JDK 25 虚拟线程提供并行任务执行的静态方法的门面类。 特性包括并行执行、批处理和异步流水线组合。
Example | 示例:
// Parallel execution
OpenParallel.runAll(
() -> sendEmail(),
() -> sendSMS(),
() -> pushNotification()
);
// Parallel with results
List<String> results = OpenParallel.invokeAll(
() -> fetchFromServiceA(),
() -> fetchFromServiceB()
);
// Parallel map with concurrency limit
List<Result> processed = OpenParallel.parallelMap(items, item -> process(item), 10);
// Async pipeline
String result = OpenParallel.pipeline(() -> fetchData())
.then(this::transform)
.onError(e -> "fallback")
.get();
Security | 安全性:
- Thread-safe: Yes (utility class, stateless) - 线程安全: 是(工具类,无状态)
- Since:
- JDK 25, opencode-base-parallel V1.0.0
- Author:
- Leon Soo www.LeonSoo.com
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionstatic CompletableFuture<Void> Creates an async runnable.static <T> CompletableFuture<T> Creates an async supplier.static <T1,T2, T3, R>
CompletableFuture<R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1, T2, T3, R> combiner) Combines three futures with a combiner function.static <T1,T2, R> CompletableFuture <R> combine(CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1, T2, R> combiner) Combines two futures with a combiner function.static <T> CompletableFuture<T> Delays execution.static ExecutorServiceGets the shared virtual thread executor.static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers) Invokes all suppliers in parallel and collects results.static <T> List<T> invokeAll(Collection<Supplier<T>> suppliers, Duration timeout) Invokes all suppliers in parallel with timeout.static <T> List<T> Invokes all suppliers in parallel and collects results.static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks) Executes tasks with rate limiting.static <T> TinvokeAny(Collection<Supplier<T>> suppliers) Invokes all and returns the first to complete.static <T> TInvokes all and returns the first to complete.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper) Maps items in parallel using stream.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism) Maps items in parallel with concurrency limit.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism, Duration timeout) Maps items in parallel with concurrency limit and timeout.static <T> AsyncPipeline<T> pipeline(CompletableFuture<T> future) Creates an async pipeline from existing future.static <T> AsyncPipeline<T> Creates an async pipeline from initial supplier.static <T> voidprocessBatch(List<T> items, int batchSize, Consumer<List<T>> processor) Processes items in batches.static <T,R> List <R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>, List<R>> processor) Processes items in batches with result collection.static RateLimitedExecutorrateLimited(double permitsPerSecond) Creates a rate limited executor with specified permits per second.static RateLimitedExecutorrateLimited(double permitsPerSecond, long burstCapacity) Creates a rate limited executor with specified rate and burst capacity.static voidRuns all tasks in parallel, waiting for completion.static voidrunAll(Collection<Runnable> tasks) Runs all tasks in parallel, waiting for completion.static voidrunAll(Collection<Runnable> tasks, Duration timeout) Runs all tasks in parallel with timeout.
-
Method Details
-
runAll
Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。- Parameters:
tasks- the tasks to run - 要运行的任务
-
runAll
Runs all tasks in parallel, waiting for completion. 并行运行所有任务,等待完成。- Parameters:
tasks- the tasks to run - 要运行的任务
-
runAll
Runs all tasks in parallel with timeout. 并行运行所有任务,带超时。- Parameters:
tasks- the tasks to run - 要运行的任务timeout- the timeout - 超时
-
invokeAll
Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the results - 结果
-
invokeAll
Invokes all suppliers in parallel and collects results. 并行调用所有 Supplier 并收集结果。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the results - 结果
-
invokeAll
Invokes all suppliers in parallel with timeout. 并行调用所有 Supplier,带超时。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Suppliertimeout- the timeout - 超时- Returns:
- the results - 结果
-
invokeAny
Invokes all and returns the first to complete. 调用所有并返回首个完成的。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the first result - 首个结果
-
invokeAny
Invokes all and returns the first to complete. 调用所有并返回首个完成的。- Type Parameters:
T- the result type - 结果类型- Parameters:
suppliers- the suppliers - Supplier- Returns:
- the first result - 首个结果
-
parallelMap
-
parallelMap
Maps items in parallel with concurrency limit. 并行映射项目,带并发限制。- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items - 项目mapper- the mapper function - 映射函数parallelism- the max concurrency - 最大并发数- Returns:
- the results - 结果
-
parallelMap
public static <T,R> List<R> parallelMap(List<T> items, Function<T, R> mapper, int parallelism, Duration timeout) Maps items in parallel with concurrency limit and timeout. 并行映射项目,带并发限制和超时。- Type Parameters:
T- the input type - 输入类型R- the result type - 结果类型- Parameters:
items- the items - 项目mapper- the mapper function - 映射函数parallelism- the max concurrency - 最大并发数timeout- the timeout - 超时- Returns:
- the results - 结果
-
processBatch
-
processBatchAndCollect
public static <T,R> List<R> processBatchAndCollect(List<T> items, int batchSize, Function<List<T>, List<R>> processor) Processes items in batches with result collection. 批量处理项目并收集结果。- Type Parameters:
T- the item type - 项目类型R- the result type - 结果类型- Parameters:
items- the items - 项目batchSize- the batch size - 批大小processor- the batch processor - 批处理器- Returns:
- the results - 结果
-
pipeline
Creates an async pipeline from initial supplier. 从初始 Supplier 创建异步流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
initial- the initial supplier - 初始 Supplier- Returns:
- the pipeline - 流水线
-
pipeline
Creates an async pipeline from existing future. 从现有 Future 创建异步流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
future- the future - Future- Returns:
- the pipeline - 流水线
-
combine
public static <T1,T2, CompletableFuture<R> combineR> (CompletableFuture<T1> f1, CompletableFuture<T2> f2, BiFunction<T1, T2, R> combiner) Combines two futures with a combiner function. 使用组合函数组合两个 Future。- Type Parameters:
T1- the first type - 第一个类型T2- the second type - 第二个类型R- the result type - 结果类型- Parameters:
f1- the first future - 第一个 Futuref2- the second future - 第二个 Futurecombiner- the combiner function - 组合函数- Returns:
- the combined future - 组合的 Future
-
combine
public static <T1,T2, CompletableFuture<R> combineT3, R> (CompletableFuture<T1> f1, CompletableFuture<T2> f2, CompletableFuture<T3> f3, TriFunction<T1, T2, T3, R> combiner) Combines three futures with a combiner function. 使用组合函数组合三个 Future。- Type Parameters:
T1- the first type - 第一个类型T2- the second type - 第二个类型T3- the third type - 第三个类型R- the result type - 结果类型- Parameters:
f1- the first future - 第一个 Futuref2- the second future - 第二个 Futuref3- the third future - 第三个 Futurecombiner- the combiner function - 组合函数- Returns:
- the combined future - 组合的 Future
-
async
Creates an async supplier. 创建异步 Supplier。- Type Parameters:
T- the result type - 结果类型- Parameters:
supplier- the supplier - Supplier- Returns:
- the future - Future
-
async
Creates an async runnable. 创建异步 Runnable。- Parameters:
runnable- the runnable - Runnable- Returns:
- the future - Future
-
delay
Delays execution. 延迟执行。- Type Parameters:
T- the result type - 结果类型- Parameters:
delay- the delay - 延迟supplier- the supplier - Supplier- Returns:
- the future - Future
-
getExecutor
Gets the shared virtual thread executor. 获取共享虚拟线程执行器。- Returns:
- the executor - 执行器
-
rateLimited
Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。Example | 示例:
RateLimitedExecutor executor = OpenParallel.rateLimited(100); executor.submit(() -> callApi());- Parameters:
permitsPerSecond- the permits per second - 每秒许可数- Returns:
- the rate limited executor - 限速执行器
-
rateLimited
Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。Example | 示例:
// 100 requests per second, burst of 20 RateLimitedExecutor executor = OpenParallel.rateLimited(100, 20);- Parameters:
permitsPerSecond- the permits per second - 每秒许可数burstCapacity- the burst capacity - 突发容量- Returns:
- the rate limited executor - 限速执行器
-
invokeAllRateLimited
@SafeVarargs public static <T> List<T> invokeAllRateLimited(double permitsPerSecond, Supplier<T>... tasks) Executes tasks with rate limiting. 使用限速执行任务。- Type Parameters:
T- the result type - 结果类型- Parameters:
permitsPerSecond- the permits per second - 每秒许可数tasks- the tasks - 任务- Returns:
- the results - 结果
-