Class AsyncPipeline<T>
java.lang.Object
cloud.opencode.base.parallel.pipeline.AsyncPipeline<T>
- Type Parameters:
T- the type of the pipeline value - 流水线值的类型
Async Pipeline - Asynchronous Processing Pipeline
异步流水线 - 异步处理流水线
A fluent API for chaining asynchronous operations with error handling and timeout support.
用于链接异步操作的流式 API,支持错误处理和超时控制。
Example | 示例:
String result = AsyncPipeline.of(() -> fetchData())
.then(this::transform)
.then(this::validate)
.peek(data -> log.info("Processed: {}", data))
.onError(e -> "fallback")
.get(Duration.ofSeconds(30));
Features | 主要功能:
- Fluent API for async operation chaining - 异步操作链的流式API
- Error handling and recovery - 错误处理和恢复
- Timeout support - 超时支持
- Peek for side effects - Peek用于副作用
Security | 安全性:
- Thread-safe: Yes (backed by CompletableFuture) - 线程安全: 是(基于CompletableFuture)
- Since:
- JDK 25, opencode-base-parallel V1.0.0
- Author:
- Leon Soo www.LeonSoo.com
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbooleancancel(boolean mayInterruptIfRunning) Cancels the pipeline.<U,R> AsyncPipeline <R> combine(AsyncPipeline<U> other, BiFunction<T, U, R> combiner) Combines with another pipeline.static <T> AsyncPipeline<T> completed(T value) Creates a pipeline with a completed value.static <T> AsyncPipeline<T> Creates a pipeline with a failed value.Filters the value.get()Gets the result, blocking if necessary.Gets the result with timeout.Gets the result as Optional.getOrDefault(T defaultValue) Gets the result or default on error.<R> AsyncPipeline<R> handle(BiFunction<T, Throwable, R> handler) Handles both success and error.booleanChecks if the pipeline completed exceptionally.booleanisDone()Checks if the pipeline is completed.static <T> AsyncPipeline<T> of(CompletableFuture<T> future) Creates a pipeline from a CompletableFuture.Handles errors with a recovery function.onErrorAsync(Function<Throwable, CompletableFuture<T>> handler) Handles errors with an async recovery function.Peeks at the value without transforming it.runAfter(AsyncPipeline<?> other) Runs after another pipeline completes.<R> AsyncPipeline<R> Applies a transformation function.<R> AsyncPipeline<R> thenAsync(Function<T, CompletableFuture<R>> fn) Applies an async transformation function.toFuture()Gets the underlying CompletableFuture.
-
Constructor Details
-
AsyncPipeline
Constructs a new async pipeline. 构造新的异步流水线。- Parameters:
future- the underlying future - 底层 Future
-
-
Method Details
-
of
Creates a pipeline from a CompletableFuture. 从 CompletableFuture 创建流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
future- the future - Future- Returns:
- the pipeline - 流水线
-
completed
Creates a pipeline with a completed value. 使用已完成的值创建流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
value- the value - 值- Returns:
- the pipeline - 流水线
-
failed
Creates a pipeline with a failed value. 使用失败值创建流水线。- Type Parameters:
T- the result type - 结果类型- Parameters:
error- the error - 错误- Returns:
- the pipeline - 流水线
-
then
Applies a transformation function. 应用转换函数。- Type Parameters:
R- the result type - 结果类型- Parameters:
fn- the transformation function - 转换函数- Returns:
- the new pipeline - 新流水线
-
thenAsync
Applies an async transformation function. 应用异步转换函数。- Type Parameters:
R- the result type - 结果类型- Parameters:
fn- the async transformation function - 异步转换函数- Returns:
- the new pipeline - 新流水线
-
peek
Peeks at the value without transforming it. 查看值但不转换。- Parameters:
action- the action - 动作- Returns:
- this pipeline - 此流水线
-
filter
Filters the value. 过滤值。- Parameters:
predicate- the predicate - 谓词- Returns:
- the new pipeline with Optional result - 带 Optional 结果的新流水线
-
onError
Handles errors with a recovery function. 使用恢复函数处理错误。- Parameters:
handler- the error handler - 错误处理器- Returns:
- the new pipeline - 新流水线
-
onErrorAsync
Handles errors with an async recovery function. 使用异步恢复函数处理错误。- Parameters:
handler- the async error handler - 异步错误处理器- Returns:
- the new pipeline - 新流水线
-
handle
Handles both success and error. 同时处理成功和错误。- Type Parameters:
R- the result type - 结果类型- Parameters:
handler- the handler - 处理器- Returns:
- the new pipeline - 新流水线
-
combine
Combines with another pipeline. 与另一个流水线组合。- Type Parameters:
U- the other type - 另一个类型R- the result type - 结果类型- Parameters:
other- the other pipeline - 另一个流水线combiner- the combiner function - 组合函数- Returns:
- the new pipeline - 新流水线
-
runAfter
Runs after another pipeline completes. 在另一个流水线完成后运行。- Parameters:
other- the other pipeline - 另一个流水线- Returns:
- this pipeline - 此流水线
-
get
-
get
Gets the result with timeout. 带超时获取结果。- Parameters:
timeout- the timeout - 超时- Returns:
- the result - 结果
- Throws:
TimeoutException- if timeout - 如果超时
-
getOrDefault
-
getOptional
-
toFuture
Gets the underlying CompletableFuture. 获取底层 CompletableFuture。- Returns:
- the future - Future
-
isDone
public boolean isDone()Checks if the pipeline is completed. 检查流水线是否完成。- Returns:
- true if completed - 如果完成返回 true
-
isCompletedExceptionally
public boolean isCompletedExceptionally()Checks if the pipeline completed exceptionally. 检查流水线是否异常完成。- Returns:
- true if completed exceptionally - 如果异常完成返回 true
-
cancel
public boolean cancel(boolean mayInterruptIfRunning) Cancels the pipeline. 取消流水线。- Parameters:
mayInterruptIfRunning- whether to interrupt - 是否中断- Returns:
- true if cancelled - 如果取消返回 true
-