Class RateLimitedExecutor

java.lang.Object
cloud.opencode.base.parallel.executor.RateLimitedExecutor
All Implemented Interfaces:
AutoCloseable

public final class RateLimitedExecutor extends Object implements AutoCloseable
Rate Limited Executor - Token Bucket Rate Limiting Executor 限速执行器 - 令牌桶限速执行器

An executor that limits task submission rate using the token bucket algorithm. Different from concurrency limiting, this controls throughput over time.

使用令牌桶算法限制任务提交速率的执行器。 与并发限制不同,这是控制单位时间内的吞吐量。

Token Bucket Algorithm | 令牌桶算法:

  • Tokens are added at a fixed rate (permits per second) - 以固定速率添加令牌
  • Bucket has a maximum capacity for burst handling - 桶有最大容量用于处理突发
  • Each task consumes one token - 每个任务消耗一个令牌
  • If no token available, task waits or is rejected - 无令牌时等待或拒绝

Example | 示例:

// Allow 100 requests per second with burst of 10
RateLimitedExecutor executor = RateLimitedExecutor.create(100);

// With burst capacity
RateLimitedExecutor bursty = RateLimitedExecutor.create(100, 50);

// Submit tasks (will be rate limited)
executor.submit(() -> callApi());

// Try without waiting
executor.trySubmit(() -> callApi())
    .ifPresent(future -> future.join());

Features | 主要功能:

  • Token bucket rate limiting - 令牌桶限速
  • Configurable permits per second - 可配置的每秒许可数
  • Burst capacity support - 突发容量支持
  • Virtual thread execution - 虚拟线程执行

Security | 安全性:

  • Thread-safe: Yes - 线程安全: 是
Since:
JDK 25, opencode-base-parallel V1.0.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • create

      public static RateLimitedExecutor create(double permitsPerSecond)
      Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。

      Burst capacity defaults to permitsPerSecond / 10, minimum 1.

      突发容量默认为 permitsPerSecond / 10,最小为 1。

      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      Returns:
      the executor - 执行器
    • create

      public static RateLimitedExecutor create(double permitsPerSecond, long burstCapacity)
      Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。
      Parameters:
      permitsPerSecond - the permits per second - 每秒许可数
      burstCapacity - the burst capacity - 突发容量
      Returns:
      the executor - 执行器
    • withExecutor

      public static RateLimitedExecutor withExecutor(ExecutorService delegate, double permitsPerSecond, long burstCapacity)
      Creates a rate limited executor with custom executor service. 使用自定义执行器服务创建限速执行器。
      Parameters:
      delegate - the underlying executor - 底层执行器
      permitsPerSecond - the permits per second - 每秒许可数
      burstCapacity - the burst capacity - 突发容量
      Returns:
      the executor - 执行器
    • builder

      public static RateLimitedExecutor.Builder builder()
      Creates a builder for more configuration options. 创建构建器以获取更多配置选项。
      Returns:
      the builder - 构建器
    • submit

      public CompletableFuture<Void> submit(Runnable task)
      Submits a runnable task, waiting for a permit if necessary. 提交 Runnable 任务,必要时等待许可。
      Parameters:
      task - the task - 任务
      Returns:
      the future - Future
    • submit

      public <T> CompletableFuture<T> submit(Callable<T> task)
      Submits a callable task, waiting for a permit if necessary. 提交 Callable 任务,必要时等待许可。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      task - the task - 任务
      Returns:
      the future - Future
    • submit

      public CompletableFuture<Void> submit(Runnable task, Duration timeout)
      Submits a runnable task with timeout for acquiring permit. 提交 Runnable 任务,带获取许可超时。
      Parameters:
      task - the task - 任务
      timeout - the timeout for acquiring permit - 获取许可的超时
      Returns:
      the future - Future
      Throws:
      OpenParallelException - if timeout waiting for permit - 等待许可超时时抛出
    • submit

      public <T> CompletableFuture<T> submit(Callable<T> task, Duration timeout)
      Submits a callable task with timeout for acquiring permit. 提交 Callable 任务,带获取许可超时。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      task - the task - 任务
      timeout - the timeout for acquiring permit - 获取许可的超时
      Returns:
      the future - Future
      Throws:
      OpenParallelException - if timeout waiting for permit - 等待许可超时时抛出
    • trySubmit

      public Optional<CompletableFuture<Void>> trySubmit(Runnable task)
      Tries to submit a runnable task without waiting. 尝试提交 Runnable 任务,不等待。
      Parameters:
      task - the task - 任务
      Returns:
      optional future, empty if rate limited - 可选的 Future,如果被限速则为空
    • trySubmit

      public <T> Optional<CompletableFuture<T>> trySubmit(Callable<T> task)
      Tries to submit a callable task without waiting. 尝试提交 Callable 任务,不等待。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      task - the task - 任务
      Returns:
      optional future, empty if rate limited - 可选的 Future,如果被限速则为空
    • invokeAll

      public <T> List<T> invokeAll(Collection<? extends Callable<T>> tasks)
      Submits multiple tasks and waits for all. 提交多个任务并等待全部完成。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      tasks - the tasks - 任务
      Returns:
      the results - 结果
    • invokeAll

      public <T> List<T> invokeAll(Collection<? extends Callable<T>> tasks, Duration timeout)
      Submits multiple tasks with timeout. 提交多个任务并设置超时。
      Type Parameters:
      T - the result type - 结果类型
      Parameters:
      tasks - the tasks - 任务
      timeout - the timeout - 超时
      Returns:
      the results - 结果
    • acquire

      public void acquire()
      Acquires a permit, blocking until available. 获取一个许可,阻塞直到可用。
    • tryAcquire

      public boolean tryAcquire()
      Tries to acquire a permit without waiting. 尝试获取许可,不等待。
      Returns:
      true if acquired - 如果获取成功返回 true
    • tryAcquire

      public boolean tryAcquire(Duration timeout)
      Tries to acquire a permit with timeout. 尝试获取许可,带超时。
      Parameters:
      timeout - the timeout - 超时
      Returns:
      true if acquired - 如果获取成功返回 true
    • getSubmittedCount

      public long getSubmittedCount()
      Gets the number of submitted tasks. 获取提交的任务数。
      Returns:
      the submitted count - 提交数
    • getCompletedCount

      public long getCompletedCount()
      Gets the number of completed tasks. 获取完成的任务数。
      Returns:
      the completed count - 完成数
    • getFailedCount

      public long getFailedCount()
      Gets the number of failed tasks. 获取失败的任务数。
      Returns:
      the failed count - 失败数
    • getRejectedCount

      public long getRejectedCount()
      Gets the number of rejected tasks (when using trySubmit). 获取被拒绝的任务数(使用 trySubmit 时)。
      Returns:
      the rejected count - 拒绝数
    • getWaitedCount

      public long getWaitedCount()
      Gets the number of tasks that waited for a permit. 获取等待许可的任务数。
      Returns:
      the waited count - 等待数
    • getTotalWaitNanos

      public long getTotalWaitNanos()
      Gets the total wait time in nanoseconds. 获取总等待时间(纳秒)。
      Returns:
      the total wait nanos - 总等待纳秒数
    • getAverageWaitMillis

      public double getAverageWaitMillis()
      Gets the average wait time in milliseconds. 获取平均等待时间(毫秒)。
      Returns:
      the average wait millis - 平均等待毫秒数
    • getPermitsPerSecond

      public double getPermitsPerSecond()
      Gets the configured permits per second. 获取配置的每秒许可数。
      Returns:
      the permits per second - 每秒许可数
    • getBurstCapacity

      public long getBurstCapacity()
      Gets the burst capacity. 获取突发容量。
      Returns:
      the burst capacity - 突发容量
    • getAvailablePermits

      public double getAvailablePermits()
      Gets the currently available permits. 获取当前可用的许可数。
      Returns:
      the available permits - 可用许可数
    • shutdown

      public void shutdown()
      Shuts down the executor. 关闭执行器。
    • shutdownAndAwait

      public boolean shutdownAndAwait(Duration timeout) throws InterruptedException
      Shuts down and waits for termination. 关闭并等待终止。
      Parameters:
      timeout - the timeout - 超时
      Returns:
      true if terminated - 如果终止返回 true
      Throws:
      InterruptedException - if interrupted - 如果中断
    • isShutdown

      public boolean isShutdown()
      Checks if the executor is shutdown. 检查执行器是否已关闭。
      Returns:
      true if shutdown - 如果已关闭返回 true
    • isTerminated

      public boolean isTerminated()
      Checks if the executor is terminated. 检查执行器是否已终止。
      Returns:
      true if terminated - 如果已终止返回 true
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable