Class VirtualTasks
Provides high-level concurrency primitives built on JDK 25 virtual threads. All methods create a short-lived virtual-thread-per-task executor, submit tasks, collect results, and shut down the executor before returning.
提供基于 JDK 25 虚拟线程的高级并发原语。 所有方法创建短生命周期的每任务虚拟线程执行器,提交任务,收集结果, 并在返回前关闭执行器。
Features | 主要功能:
invokeAll(List)- All-or-nothing execution: all succeed or throw on first failure 全部成功或在首次失败时抛出异常invokeAny(List)- First success wins, cancel rest 首个成功结果胜出,取消其余任务invokeAllSettled(List)- Collect all results asResult, never throws from task failures 收集所有结果为Result,不因任务失败而抛出异常parallelMap(List, Function)- Parallel mapping over a list 对列表进行并行映射runAll(List)- Run all tasks to completion 运行所有任务直至完成supplyAsync(Callable)- Bridge virtual thread execution with CompletableFuture 将虚拟线程执行与 CompletableFuture 桥接runAsync(Runnable)- Run a task on a virtual thread returning CompletableFuture 在虚拟线程上运行任务并返回 CompletableFutureparallelMap(List, Function, int)- Parallel mapping with concurrency limit 带并发限制的并行映射
Usage Examples | 使用示例:
// All-or-nothing
List<String> results = VirtualTasks.invokeAll(List.of(
() -> fetchFromServiceA(),
() -> fetchFromServiceB()
));
// First success wins
String fastest = VirtualTasks.invokeAny(List.of(
() -> queryMirror1(),
() -> queryMirror2()
));
// Collect all (success or failure)
List<Result<String>> settled = VirtualTasks.invokeAllSettled(List.of(
() -> riskyOperation1(),
() -> riskyOperation2()
));
// Parallel map with timeout
List<Integer> lengths = VirtualTasks.parallelMap(
urls, url -> download(url).length(), Duration.ofSeconds(30)
);
Security | 安全性:
- Thread-safe: Yes (stateless utility class) - 线程安全: 是 (无状态工具类)
- Resource-safe: Executors are always shut down in finally blocks - 资源安全: 执行器始终在 finally 块中关闭
- Since:
- JDK 25, opencode-base-core V1.0.3
- Author:
- Leon Soo www.LeonSoo.com
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> List<T> Execute all tasks and return their results.static <T> List<T> Execute all tasks with a timeout.invokeAllSettled(List<Callable<T>> tasks) Execute all tasks and collect every result asResult.success(Object)orResult.failure(Throwable).invokeAllSettled(List<Callable<T>> tasks, Duration timeout) Execute all tasks with a timeout and collect every result asResult.static <T> TExecute all tasks and return the result of the first one to succeed.static <T> TExecute all tasks with a timeout and return the first successful result.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper) Apply a mapping function to each item in parallel using virtual threads.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int maxConcurrency) Apply a mapping function to each item in parallel with a concurrency limit.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, int maxConcurrency, Duration timeout) Apply a mapping function with concurrency limit and timeout.static <T,R> List <R> parallelMap(List<T> items, Function<T, R> mapper, Duration timeout) Apply a mapping function to each item in parallel with a timeout.static voidRun all tasks to completion.static voidRun all tasks to completion with a timeout.static CompletableFuture<Void> Executes a runnable on a virtual thread and returns a CompletableFuture.static <T> CompletableFuture<T> supplyAsync(Callable<T> task) Executes a callable on a virtual thread and returns a CompletableFuture.
-
Method Details
-
invokeAll
Execute all tasks and return their results. If any task fails, cancel all remaining and throw anOpenExceptionwrapping the first failure cause. 执行所有任务并返回结果。如果任何任务失败,取消所有剩余任务并抛出包装首个失败原因的OpenException。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表- Returns:
- immutable list of results in submission order - 按提交顺序的不可变结果列表
- Throws:
OpenException- if any task fails - 如果任何任务失败
-
invokeAll
Execute all tasks with a timeout. If any task fails or the timeout expires, cancel all remaining and throw. 在超时限制内执行所有任务。如果任何任务失败或超时,取消所有剩余任务并抛出异常。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表timeout- the maximum duration to wait - 最大等待时长- Returns:
- immutable list of results in submission order - 按提交顺序的不可变结果列表
- Throws:
OpenException- if any task fails - 如果任何任务失败OpenTimeoutException- if the timeout expires - 如果超时
-
invokeAny
Execute all tasks and return the result of the first one to succeed. All remaining tasks are cancelled. If all tasks fail, throwsOpenException. 执行所有任务并返回首个成功的结果。取消所有剩余任务。如果所有任务都失败,抛出OpenException。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表- Returns:
- the first successful result - 首个成功的结果
- Throws:
OpenException- if all tasks fail - 如果所有任务都失败
-
invokeAny
Execute all tasks with a timeout and return the first successful result. 在超时限制内执行所有任务并返回首个成功的结果。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表timeout- the maximum duration to wait - 最大等待时长- Returns:
- the first successful result - 首个成功的结果
- Throws:
OpenException- if all tasks fail - 如果所有任务都失败OpenTimeoutException- if the timeout expires before any task succeeds - 如果在任何任务成功前超时
-
invokeAllSettled
Execute all tasks and collect every result asResult.success(Object)orResult.failure(Throwable). Never throws from task failures. 执行所有任务并将每个结果收集为Result.success(Object)或Result.failure(Throwable)。 不因任务失败而抛出异常。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表- Returns:
- immutable list of Results in submission order - 按提交顺序的不可变 Result 列表
-
invokeAllSettled
Execute all tasks with a timeout and collect every result asResult. Tasks that have not completed by the timeout deadline are recorded asResult.failure(Throwable)with anOpenTimeoutException. 在超时限制内执行所有任务并将每个结果收集为Result。 超时未完成的任务记录为包含OpenTimeoutException的Result.failure(Throwable)。- Type Parameters:
T- result type - 结果类型- Parameters:
tasks- the tasks to execute - 要执行的任务列表timeout- the maximum duration to wait - 最大等待时长- Returns:
- immutable list of Results in submission order - 按提交顺序的不可变 Result 列表
-
parallelMap
Apply a mapping function to each item in parallel using virtual threads. 使用虚拟线程对每个元素并行应用映射函数。- Type Parameters:
T- input type - 输入类型R- result type - 结果类型- Parameters:
items- the input items - 输入元素列表mapper- the mapping function - 映射函数- Returns:
- immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
- Throws:
OpenException- if any mapping fails - 如果任何映射失败
-
parallelMap
Apply a mapping function to each item in parallel with a timeout. 在超时限制内使用虚拟线程对每个元素并行应用映射函数。- Type Parameters:
T- input type - 输入类型R- result type - 结果类型- Parameters:
items- the input items - 输入元素列表mapper- the mapping function - 映射函数timeout- the maximum duration to wait - 最大等待时长- Returns:
- immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
- Throws:
OpenException- if any mapping fails - 如果任何映射失败OpenTimeoutException- if the timeout expires - 如果超时
-
runAll
Run all tasks to completion. If any task fails, cancel remaining and throw. 运行所有任务直至完成。如果任何任务失败,取消剩余任务并抛出异常。- Parameters:
tasks- the tasks to run - 要运行的任务列表- Throws:
OpenException- if any task fails - 如果任何任务失败
-
runAll
Run all tasks to completion with a timeout. 在超时限制内运行所有任务直至完成。- Parameters:
tasks- the tasks to run - 要运行的任务列表timeout- the maximum duration to wait - 最大等待时长- Throws:
OpenException- if any task fails - 如果任何任务失败OpenTimeoutException- if the timeout expires - 如果超时
-
supplyAsync
Executes a callable on a virtual thread and returns a CompletableFuture. 在虚拟线程上执行 Callable 并返回 CompletableFuture。Bridges virtual thread execution with the CompletableFuture API. The callable is executed on a new virtual thread, and the returned future completes when the callable finishes.
将虚拟线程执行与 CompletableFuture API 桥接。 Callable 在新虚拟线程上执行,返回的 future 在 Callable 完成时完成。
- Type Parameters:
T- result type - 结果类型- Parameters:
task- the task to execute - 要执行的任务- Returns:
- a CompletableFuture that completes with the task result - 完成后包含任务结果的 CompletableFuture
- Throws:
NullPointerException- if task is null - 如果 task 为 null
-
runAsync
Executes a runnable on a virtual thread and returns a CompletableFuture. 在虚拟线程上执行 Runnable 并返回 CompletableFuture。- Parameters:
task- the task to run - 要运行的任务- Returns:
- a CompletableFuture that completes when the task finishes - 任务完成时完成的 CompletableFuture
- Throws:
NullPointerException- if task is null - 如果 task 为 null
-
parallelMap
Apply a mapping function to each item in parallel with a concurrency limit. 使用并发限制对每个元素并行应用映射函数。Uses a
Semaphoreto limit the number of concurrently running virtual threads. This is useful when the mapper accesses a resource with limited capacity (e.g., a connection pool).使用
Semaphore限制并发运行的虚拟线程数。 当映射函数访问容量有限的资源(如连接池)时,此方法非常有用。- Type Parameters:
T- input type - 输入类型R- result type - 结果类型- Parameters:
items- the input items - 输入元素列表mapper- the mapping function - 映射函数maxConcurrency- maximum number of concurrent virtual threads (must be positive) - 最大并发虚拟线程数(必须为正数)- Returns:
- immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
- Throws:
OpenException- if any mapping fails - 如果任何映射失败IllegalArgumentException- if maxConcurrency is not positive - 如果 maxConcurrency 不为正数
-
parallelMap
public static <T,R> List<R> parallelMap(List<T> items, Function<T, R> mapper, int maxConcurrency, Duration timeout) Apply a mapping function with concurrency limit and timeout. 使用并发限制和超时对每个元素并行应用映射函数。- Type Parameters:
T- input type - 输入类型R- result type - 结果类型- Parameters:
items- the input items - 输入元素列表mapper- the mapping function - 映射函数maxConcurrency- maximum concurrent virtual threads - 最大并发虚拟线程数timeout- the maximum duration to wait - 最大等待时长- Returns:
- immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
- Throws:
OpenException- if any mapping fails - 如果任何映射失败OpenTimeoutException- if the timeout expires - 如果超时
-