Class BatchProcessor

java.lang.Object
cloud.opencode.base.parallel.batch.BatchProcessor
All Implemented Interfaces:
AutoCloseable

public final class BatchProcessor extends Object implements AutoCloseable
Batch Processor - Parallel Batch Processing Utility 批处理器 - 并行批处理工具

Provides configurable batch processing with parallel execution, concurrency control, and progress tracking.

提供可配置的批处理,支持并行执行、并发控制和进度跟踪。

Example | 示例:

BatchProcessor.builder()
    .batchSize(100)
    .parallelism(10)
    .build()
    .process(items, batch -> repository.saveAll(batch));

Features | 主要功能:

  • Configurable batch size and parallelism - 可配置的批次大小和并行度
  • Progress tracking - 进度跟踪
  • Error handling per batch - 每批次错误处理
  • Virtual thread execution - 虚拟线程执行

Security | 安全性:

  • Thread-safe: Yes - 线程安全: 是

Performance | 性能特性:

  • Time complexity: O(n/b * T) where b is batch size and T is per-batch processor time; parallel execution reduces wall time to O(n/b/p * T) with parallelism p - 时间复杂度: O(n/b * T),b 为批次大小,T 为每批处理时间;并行度 p 时实际耗时降至 O(n/b/p * T)
  • Space complexity: O(n/b) - CompletableFuture list proportional to batch count; batches themselves reference input subLists - 空间复杂度: O(n/b) - CompletableFuture 列表与批次数成正比;批次引用输入子列表
Since:
JDK 25, opencode-base-parallel V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • builder

      public static BatchProcessor.Builder builder()
      Creates a new builder. 创建新的构建器。
      Returns:
      the builder - 构建器
    • defaultProcessor

      public static BatchProcessor defaultProcessor()
      Creates a default batch processor. 创建默认批处理器。
      Returns:
      the processor - 处理器
    • process

      public <T> void process(List<T> items, Consumer<List<T>> processor)
      Processes items in batches. 批量处理项目。
      Type Parameters:
      T - the item type - 项目类型
      Parameters:
      items - the items to process - 要处理的项目
      processor - the batch processor - 批处理器
    • processAndCollect

      public <T,R> List<R> processAndCollect(List<T> items, Function<List<T>,List<R>> processor)
      Processes items in batches with result collection. 批量处理项目并收集结果。
      Type Parameters:
      T - the item type - 项目类型
      R - the result type - 结果类型
      Parameters:
      items - the items to process - 要处理的项目
      processor - the batch processor - 批处理器
      Returns:
      the results - 结果
    • processWithProgress

      public <T> void processWithProgress(List<T> items, Consumer<List<T>> processor, Consumer<BatchProcessor.BatchProgress> progressCallback)
      Processes items in batches with progress callback. 批量处理项目并回调进度。
      Type Parameters:
      T - the item type - 项目类型
      Parameters:
      items - the items to process - 要处理的项目
      processor - the batch processor - 批处理器
      progressCallback - the progress callback - 进度回调
    • getBatchSize

      public int getBatchSize()
      Gets the batch size. 获取批大小。
      Returns:
      the batch size - 批大小
    • getParallelism

      public int getParallelism()
      Gets the parallelism. 获取并行度。
      Returns:
      the parallelism - 并行度
    • close

      public void close()
      Shuts down the internal executor if it is not user-provided. User-provided executors are not managed by this processor. 如果不是用户提供的执行器,则关闭内部执行器。 用户提供的执行器不由此处理器管理。
      Specified by:
      close in interface AutoCloseable