Class VirtualTasks

java.lang.Object
cloud.opencode.base.core.concurrent.VirtualTasks

public final class VirtualTasks extends Object
VirtualTasks - Virtual thread concurrency utilities VirtualTasks - 虚拟线程并发工具类

Provides high-level concurrency primitives built on JDK 25 virtual threads. All methods create a short-lived virtual-thread-per-task executor, submit tasks, collect results, and shut down the executor before returning.

提供基于 JDK 25 虚拟线程的高级并发原语。 所有方法创建短生命周期的每任务虚拟线程执行器,提交任务,收集结果, 并在返回前关闭执行器。

Features | 主要功能:

  • invokeAll(List) - All-or-nothing execution: all succeed or throw on first failure 全部成功或在首次失败时抛出异常
  • invokeAny(List) - First success wins, cancel rest 首个成功结果胜出,取消其余任务
  • invokeAllSettled(List) - Collect all results as Result, never throws from task failures 收集所有结果为 Result,不因任务失败而抛出异常
  • parallelMap(List, Function) - Parallel mapping over a list 对列表进行并行映射
  • runAll(List) - Run all tasks to completion 运行所有任务直至完成
  • supplyAsync(Callable) - Bridge virtual thread execution with CompletableFuture 将虚拟线程执行与 CompletableFuture 桥接
  • runAsync(Runnable) - Run a task on a virtual thread returning CompletableFuture 在虚拟线程上运行任务并返回 CompletableFuture
  • parallelMap(List, Function, int) - Parallel mapping with concurrency limit 带并发限制的并行映射

Usage Examples | 使用示例:

// All-or-nothing
List<String> results = VirtualTasks.invokeAll(List.of(
    () -> fetchFromServiceA(),
    () -> fetchFromServiceB()
));

// First success wins
String fastest = VirtualTasks.invokeAny(List.of(
    () -> queryMirror1(),
    () -> queryMirror2()
));

// Collect all (success or failure)
List<Result<String>> settled = VirtualTasks.invokeAllSettled(List.of(
    () -> riskyOperation1(),
    () -> riskyOperation2()
));

// Parallel map with timeout
List<Integer> lengths = VirtualTasks.parallelMap(
    urls, url -> download(url).length(), Duration.ofSeconds(30)
);

Security | 安全性:

  • Thread-safe: Yes (stateless utility class) - 线程安全: 是 (无状态工具类)
  • Resource-safe: Executors are always shut down in finally blocks - 资源安全: 执行器始终在 finally 块中关闭
Since:
JDK 25, opencode-base-core V1.0.3
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • invokeAll

      public static <T> List<T> invokeAll(List<Callable<T>> tasks)
      Execute all tasks and return their results. If any task fails, cancel all remaining and throw an OpenException wrapping the first failure cause. 执行所有任务并返回结果。如果任何任务失败,取消所有剩余任务并抛出包装首个失败原因的 OpenException
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      Returns:
      immutable list of results in submission order - 按提交顺序的不可变结果列表
      Throws:
      OpenException - if any task fails - 如果任何任务失败
    • invokeAll

      public static <T> List<T> invokeAll(List<Callable<T>> tasks, Duration timeout)
      Execute all tasks with a timeout. If any task fails or the timeout expires, cancel all remaining and throw. 在超时限制内执行所有任务。如果任何任务失败或超时,取消所有剩余任务并抛出异常。
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      timeout - the maximum duration to wait - 最大等待时长
      Returns:
      immutable list of results in submission order - 按提交顺序的不可变结果列表
      Throws:
      OpenException - if any task fails - 如果任何任务失败
      OpenTimeoutException - if the timeout expires - 如果超时
    • invokeAny

      public static <T> T invokeAny(List<Callable<T>> tasks)
      Execute all tasks and return the result of the first one to succeed. All remaining tasks are cancelled. If all tasks fail, throws OpenException. 执行所有任务并返回首个成功的结果。取消所有剩余任务。如果所有任务都失败,抛出 OpenException
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      Returns:
      the first successful result - 首个成功的结果
      Throws:
      OpenException - if all tasks fail - 如果所有任务都失败
    • invokeAny

      public static <T> T invokeAny(List<Callable<T>> tasks, Duration timeout)
      Execute all tasks with a timeout and return the first successful result. 在超时限制内执行所有任务并返回首个成功的结果。
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      timeout - the maximum duration to wait - 最大等待时长
      Returns:
      the first successful result - 首个成功的结果
      Throws:
      OpenException - if all tasks fail - 如果所有任务都失败
      OpenTimeoutException - if the timeout expires before any task succeeds - 如果在任何任务成功前超时
    • invokeAllSettled

      public static <T> List<Result<T>> invokeAllSettled(List<Callable<T>> tasks)
      Execute all tasks and collect every result as Result.success(Object) or Result.failure(Throwable). Never throws from task failures. 执行所有任务并将每个结果收集为 Result.success(Object)Result.failure(Throwable)。 不因任务失败而抛出异常。
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      Returns:
      immutable list of Results in submission order - 按提交顺序的不可变 Result 列表
    • invokeAllSettled

      public static <T> List<Result<T>> invokeAllSettled(List<Callable<T>> tasks, Duration timeout)
      Execute all tasks with a timeout and collect every result as Result. Tasks that have not completed by the timeout deadline are recorded as Result.failure(Throwable) with an OpenTimeoutException. 在超时限制内执行所有任务并将每个结果收集为 Result。 超时未完成的任务记录为包含 OpenTimeoutExceptionResult.failure(Throwable)
      Type Parameters:
      T - result type - 结果类型
      Parameters:
      tasks - the tasks to execute - 要执行的任务列表
      timeout - the maximum duration to wait - 最大等待时长
      Returns:
      immutable list of Results in submission order - 按提交顺序的不可变 Result 列表
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper)
      Apply a mapping function to each item in parallel using virtual threads. 使用虚拟线程对每个元素并行应用映射函数。
      Type Parameters:
      T - input type - 输入类型
      R - result type - 结果类型
      Parameters:
      items - the input items - 输入元素列表
      mapper - the mapping function - 映射函数
      Returns:
      immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
      Throws:
      OpenException - if any mapping fails - 如果任何映射失败
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, Duration timeout)
      Apply a mapping function to each item in parallel with a timeout. 在超时限制内使用虚拟线程对每个元素并行应用映射函数。
      Type Parameters:
      T - input type - 输入类型
      R - result type - 结果类型
      Parameters:
      items - the input items - 输入元素列表
      mapper - the mapping function - 映射函数
      timeout - the maximum duration to wait - 最大等待时长
      Returns:
      immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
      Throws:
      OpenException - if any mapping fails - 如果任何映射失败
      OpenTimeoutException - if the timeout expires - 如果超时
    • runAll

      public static void runAll(List<Runnable> tasks)
      Run all tasks to completion. If any task fails, cancel remaining and throw. 运行所有任务直至完成。如果任何任务失败,取消剩余任务并抛出异常。
      Parameters:
      tasks - the tasks to run - 要运行的任务列表
      Throws:
      OpenException - if any task fails - 如果任何任务失败
    • runAll

      public static void runAll(List<Runnable> tasks, Duration timeout)
      Run all tasks to completion with a timeout. 在超时限制内运行所有任务直至完成。
      Parameters:
      tasks - the tasks to run - 要运行的任务列表
      timeout - the maximum duration to wait - 最大等待时长
      Throws:
      OpenException - if any task fails - 如果任何任务失败
      OpenTimeoutException - if the timeout expires - 如果超时
    • supplyAsync

      public static <T> CompletableFuture<T> supplyAsync(Callable<T> task)
      Executes a callable on a virtual thread and returns a CompletableFuture. 在虚拟线程上执行 Callable 并返回 CompletableFuture。

      Bridges virtual thread execution with the CompletableFuture API. The callable is executed on a new virtual thread, and the returned future completes when the callable finishes.

      将虚拟线程执行与 CompletableFuture API 桥接。 Callable 在新虚拟线程上执行,返回的 future 在 Callable 完成时完成。

      Type Parameters:
      T - result type - 结果类型
      Parameters:
      task - the task to execute - 要执行的任务
      Returns:
      a CompletableFuture that completes with the task result - 完成后包含任务结果的 CompletableFuture
      Throws:
      NullPointerException - if task is null - 如果 task 为 null
    • runAsync

      public static CompletableFuture<Void> runAsync(Runnable task)
      Executes a runnable on a virtual thread and returns a CompletableFuture. 在虚拟线程上执行 Runnable 并返回 CompletableFuture。
      Parameters:
      task - the task to run - 要运行的任务
      Returns:
      a CompletableFuture that completes when the task finishes - 任务完成时完成的 CompletableFuture
      Throws:
      NullPointerException - if task is null - 如果 task 为 null
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int maxConcurrency)
      Apply a mapping function to each item in parallel with a concurrency limit. 使用并发限制对每个元素并行应用映射函数。

      Uses a Semaphore to limit the number of concurrently running virtual threads. This is useful when the mapper accesses a resource with limited capacity (e.g., a connection pool).

      使用 Semaphore 限制并发运行的虚拟线程数。 当映射函数访问容量有限的资源(如连接池)时,此方法非常有用。

      Type Parameters:
      T - input type - 输入类型
      R - result type - 结果类型
      Parameters:
      items - the input items - 输入元素列表
      mapper - the mapping function - 映射函数
      maxConcurrency - maximum number of concurrent virtual threads (must be positive) - 最大并发虚拟线程数(必须为正数)
      Returns:
      immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
      Throws:
      OpenException - if any mapping fails - 如果任何映射失败
      IllegalArgumentException - if maxConcurrency is not positive - 如果 maxConcurrency 不为正数
    • parallelMap

      public static <T,R> List<R> parallelMap(List<T> items, Function<T,R> mapper, int maxConcurrency, Duration timeout)
      Apply a mapping function with concurrency limit and timeout. 使用并发限制和超时对每个元素并行应用映射函数。
      Type Parameters:
      T - input type - 输入类型
      R - result type - 结果类型
      Parameters:
      items - the input items - 输入元素列表
      mapper - the mapping function - 映射函数
      maxConcurrency - maximum concurrent virtual threads - 最大并发虚拟线程数
      timeout - the maximum duration to wait - 最大等待时长
      Returns:
      immutable list of mapped results in input order - 按输入顺序的不可变映射结果列表
      Throws:
      OpenException - if any mapping fails - 如果任何映射失败
      OpenTimeoutException - if the timeout expires - 如果超时