Class AsyncPipeline<T>

java.lang.Object
cloud.opencode.base.parallel.pipeline.AsyncPipeline<T>
Type Parameters:
T - the type of the pipeline value - 流水线值的类型

public final class AsyncPipeline<T> extends Object
Async Pipeline - Asynchronous Processing Pipeline 异步流水线 - 异步处理流水线

A fluent API for chaining asynchronous operations with error handling and timeout support.

用于链接异步操作的流式 API,支持错误处理和超时控制。

Example | 示例:

String result = AsyncPipeline.of(() -> fetchData())
    .then(this::transform)
    .then(this::validate)
    .peek(data -> log.info("Processed: {}", data))
    .onError(e -> "fallback")
    .get(Duration.ofSeconds(30));

Features | 主要功能:

  • Fluent API for async operation chaining - 异步操作链的流式API
  • Error handling and recovery - 错误处理和恢复
  • Timeout support - 超时支持
  • Peek for side effects - Peek用于副作用

Security | 安全性:

  • Thread-safe: Yes (backed by CompletableFuture) - 线程安全: 是(基于CompletableFuture)
Since:
JDK 25, opencode-base-parallel V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Constructor Details

    • AsyncPipeline

      public AsyncPipeline(CompletableFuture<T> future)
      Constructs a new async pipeline. 构造新的异步流水线。
      Parameters:
      future - the underlying future - 底层 Future
  • Method Details

    • of

      public static <T> AsyncPipeline<T> of(CompletableFuture<T> future)
      Creates a pipeline from a CompletableFuture. 从 CompletableFuture 创建流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      future - the future - Future
      Returns:
      the pipeline - 流水线
    • completed

      public static <T> AsyncPipeline<T> completed(T value)
      Creates a pipeline with a completed value. 使用已完成的值创建流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      value - the value - 值
      Returns:
      the pipeline - 流水线
    • failed

      public static <T> AsyncPipeline<T> failed(Throwable error)
      Creates a pipeline with a failed value. 使用失败值创建流水线。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      error - the error - 错误
      Returns:
      the pipeline - 流水线
    • then

      public <R> AsyncPipeline<R> then(Function<T,R> fn)
      Applies a transformation function. 应用转换函数。
      Type Parameters:
      R - the result type - 结果类型
      Parameters:
      fn - the transformation function - 转换函数
      Returns:
      the new pipeline - 新流水线
    • thenAsync

      public <R> AsyncPipeline<R> thenAsync(Function<T, CompletableFuture<R>> fn)
      Applies an async transformation function. 应用异步转换函数。
      Type Parameters:
      R - the result type - 结果类型
      Parameters:
      fn - the async transformation function - 异步转换函数
      Returns:
      the new pipeline - 新流水线
    • peek

      public AsyncPipeline<T> peek(Consumer<T> action)
      Peeks at the value without transforming it. 查看值但不转换。
      Parameters:
      action - the action - 动作
      Returns:
      this pipeline - 此流水线
    • filter

      public AsyncPipeline<T> filter(Predicate<T> predicate)
      Filters the value. 过滤值。
      Parameters:
      predicate - the predicate - 谓词
      Returns:
      the new pipeline with Optional result - 带 Optional 结果的新流水线
    • onError

      public AsyncPipeline<T> onError(Function<Throwable, T> handler)
      Handles errors with a recovery function. 使用恢复函数处理错误。
      Parameters:
      handler - the error handler - 错误处理器
      Returns:
      the new pipeline - 新流水线
    • onErrorAsync

      public AsyncPipeline<T> onErrorAsync(Function<Throwable, CompletableFuture<T>> handler)
      Handles errors with an async recovery function. 使用异步恢复函数处理错误。
      Parameters:
      handler - the async error handler - 异步错误处理器
      Returns:
      the new pipeline - 新流水线
    • handle

      public <R> AsyncPipeline<R> handle(BiFunction<T, Throwable, R> handler)
      Handles both success and error. 同时处理成功和错误。
      Type Parameters:
      R - the result type - 结果类型
      Parameters:
      handler - the handler - 处理器
      Returns:
      the new pipeline - 新流水线
    • combine

      public <U,R> AsyncPipeline<R> combine(AsyncPipeline<U> other, BiFunction<T,U,R> combiner)
      Combines with another pipeline. 与另一个流水线组合。
      Type Parameters:
      U - the other type - 另一个类型
      R - the result type - 结果类型
      Parameters:
      other - the other pipeline - 另一个流水线
      combiner - the combiner function - 组合函数
      Returns:
      the new pipeline - 新流水线
    • runAfter

      public AsyncPipeline<T> runAfter(AsyncPipeline<?> other)
      Runs after another pipeline completes. 在另一个流水线完成后运行。
      Parameters:
      other - the other pipeline - 另一个流水线
      Returns:
      this pipeline - 此流水线
    • get

      public T get()
      Gets the result, blocking if necessary. 获取结果,如有必要则阻塞。
      Returns:
      the result - 结果
    • get

      public T get(Duration timeout) throws TimeoutException
      Gets the result with timeout. 带超时获取结果。
      Parameters:
      timeout - the timeout - 超时
      Returns:
      the result - 结果
      Throws:
      TimeoutException - if timeout - 如果超时
    • getOrDefault

      public T getOrDefault(T defaultValue)
      Gets the result or default on error. 获取结果或错误时返回默认值。
      Parameters:
      defaultValue - the default value - 默认值
      Returns:
      the result or default - 结果或默认值
    • getOptional

      public Optional<T> getOptional()
      Gets the result as Optional. 获取结果为 Optional。
      Returns:
      the optional result - Optional 结果
    • toFuture

      public CompletableFuture<T> toFuture()
      Gets the underlying CompletableFuture. 获取底层 CompletableFuture。
      Returns:
      the future - Future
    • isDone

      public boolean isDone()
      Checks if the pipeline is completed. 检查流水线是否完成。
      Returns:
      true if completed - 如果完成返回 true
    • isCompletedExceptionally

      public boolean isCompletedExceptionally()
      Checks if the pipeline completed exceptionally. 检查流水线是否异常完成。
      Returns:
      true if completed exceptionally - 如果异常完成返回 true
    • cancel

      public boolean cancel(boolean mayInterruptIfRunning)
      Cancels the pipeline. 取消流水线。
      Parameters:
      mayInterruptIfRunning - whether to interrupt - 是否中断
      Returns:
      true if cancelled - 如果取消返回 true