Class RateLimitedExecutor
java.lang.Object
cloud.opencode.base.parallel.executor.RateLimitedExecutor
- All Implemented Interfaces:
AutoCloseable
Rate Limited Executor - Token Bucket Rate Limiting Executor
限速执行器 - 令牌桶限速执行器
An executor that limits task submission rate using the token bucket algorithm. Different from concurrency limiting, this controls throughput over time.
使用令牌桶算法限制任务提交速率的执行器。 与并发限制不同,这是控制单位时间内的吞吐量。
Token Bucket Algorithm | 令牌桶算法:
- Tokens are added at a fixed rate (permits per second) - 以固定速率添加令牌
- Bucket has a maximum capacity for burst handling - 桶有最大容量用于处理突发
- Each task consumes one token - 每个任务消耗一个令牌
- If no token available, task waits or is rejected - 无令牌时等待或拒绝
Example | 示例:
// Allow 100 requests per second with burst of 10
RateLimitedExecutor executor = RateLimitedExecutor.create(100);
// With burst capacity
RateLimitedExecutor bursty = RateLimitedExecutor.create(100, 50);
// Submit tasks (will be rate limited)
executor.submit(() -> callApi());
// Try without waiting
executor.trySubmit(() -> callApi())
.ifPresent(future -> future.join());
Features | 主要功能:
- Token bucket rate limiting - 令牌桶限速
- Configurable permits per second - 可配置的每秒许可数
- Burst capacity support - 突发容量支持
- Virtual thread execution - 虚拟线程执行
Security | 安全性:
- Thread-safe: Yes - 线程安全: 是
- Since:
- JDK 25, opencode-base-parallel V1.0.0
- Author:
- Leon Soo www.LeonSoo.com
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final classBuilder for RateLimitedExecutor. -
Method Summary
Modifier and TypeMethodDescriptionvoidacquire()Acquires a permit, blocking until available.static RateLimitedExecutor.Builderbuilder()Creates a builder for more configuration options.voidclose()static RateLimitedExecutorcreate(double permitsPerSecond) Creates a rate limited executor with specified permits per second.static RateLimitedExecutorcreate(double permitsPerSecond, long burstCapacity) Creates a rate limited executor with specified rate and burst capacity.doubleGets the currently available permits.doubleGets the average wait time in milliseconds.longGets the burst capacity.longGets the number of completed tasks.longGets the number of failed tasks.doubleGets the configured permits per second.longGets the number of rejected tasks (when using trySubmit).longGets the number of submitted tasks.longGets the total wait time in nanoseconds.longGets the number of tasks that waited for a permit.<T> List<T> invokeAll(Collection<? extends Callable<T>> tasks) Submits multiple tasks and waits for all.<T> List<T> invokeAll(Collection<? extends Callable<T>> tasks, Duration timeout) Submits multiple tasks with timeout.booleanChecks if the executor is shutdown.booleanChecks if the executor is terminated.voidshutdown()Shuts down the executor.booleanshutdownAndAwait(Duration timeout) Shuts down and waits for termination.Submits a runnable task, waiting for a permit if necessary.Submits a runnable task with timeout for acquiring permit.<T> CompletableFuture<T> Submits a callable task, waiting for a permit if necessary.<T> CompletableFuture<T> Submits a callable task with timeout for acquiring permit.booleanTries to acquire a permit without waiting.booleantryAcquire(Duration timeout) Tries to acquire a permit with timeout.Tries to submit a runnable task without waiting.<T> Optional<CompletableFuture<T>> Tries to submit a callable task without waiting.static RateLimitedExecutorwithExecutor(ExecutorService delegate, double permitsPerSecond, long burstCapacity) Creates a rate limited executor with custom executor service.
-
Method Details
-
create
Creates a rate limited executor with specified permits per second. 创建指定每秒许可数的限速执行器。Burst capacity defaults to permitsPerSecond / 10, minimum 1.
突发容量默认为 permitsPerSecond / 10,最小为 1。
- Parameters:
permitsPerSecond- the permits per second - 每秒许可数- Returns:
- the executor - 执行器
-
create
Creates a rate limited executor with specified rate and burst capacity. 创建指定速率和突发容量的限速执行器。- Parameters:
permitsPerSecond- the permits per second - 每秒许可数burstCapacity- the burst capacity - 突发容量- Returns:
- the executor - 执行器
-
withExecutor
public static RateLimitedExecutor withExecutor(ExecutorService delegate, double permitsPerSecond, long burstCapacity) Creates a rate limited executor with custom executor service. 使用自定义执行器服务创建限速执行器。- Parameters:
delegate- the underlying executor - 底层执行器permitsPerSecond- the permits per second - 每秒许可数burstCapacity- the burst capacity - 突发容量- Returns:
- the executor - 执行器
-
builder
Creates a builder for more configuration options. 创建构建器以获取更多配置选项。- Returns:
- the builder - 构建器
-
submit
Submits a runnable task, waiting for a permit if necessary. 提交 Runnable 任务,必要时等待许可。- Parameters:
task- the task - 任务- Returns:
- the future - Future
-
submit
Submits a callable task, waiting for a permit if necessary. 提交 Callable 任务,必要时等待许可。- Type Parameters:
T- the result type - 结果类型- Parameters:
task- the task - 任务- Returns:
- the future - Future
-
submit
Submits a runnable task with timeout for acquiring permit. 提交 Runnable 任务,带获取许可超时。- Parameters:
task- the task - 任务timeout- the timeout for acquiring permit - 获取许可的超时- Returns:
- the future - Future
- Throws:
OpenParallelException- if timeout waiting for permit - 等待许可超时时抛出
-
submit
Submits a callable task with timeout for acquiring permit. 提交 Callable 任务,带获取许可超时。- Type Parameters:
T- the result type - 结果类型- Parameters:
task- the task - 任务timeout- the timeout for acquiring permit - 获取许可的超时- Returns:
- the future - Future
- Throws:
OpenParallelException- if timeout waiting for permit - 等待许可超时时抛出
-
trySubmit
Tries to submit a runnable task without waiting. 尝试提交 Runnable 任务,不等待。- Parameters:
task- the task - 任务- Returns:
- optional future, empty if rate limited - 可选的 Future,如果被限速则为空
-
trySubmit
Tries to submit a callable task without waiting. 尝试提交 Callable 任务,不等待。- Type Parameters:
T- the result type - 结果类型- Parameters:
task- the task - 任务- Returns:
- optional future, empty if rate limited - 可选的 Future,如果被限速则为空
-
invokeAll
Submits multiple tasks and waits for all. 提交多个任务并等待全部完成。- Type Parameters:
T- the result type - 结果类型- Parameters:
tasks- the tasks - 任务- Returns:
- the results - 结果
-
invokeAll
Submits multiple tasks with timeout. 提交多个任务并设置超时。- Type Parameters:
T- the result type - 结果类型- Parameters:
tasks- the tasks - 任务timeout- the timeout - 超时- Returns:
- the results - 结果
-
acquire
public void acquire()Acquires a permit, blocking until available. 获取一个许可,阻塞直到可用。 -
tryAcquire
public boolean tryAcquire()Tries to acquire a permit without waiting. 尝试获取许可,不等待。- Returns:
- true if acquired - 如果获取成功返回 true
-
tryAcquire
Tries to acquire a permit with timeout. 尝试获取许可,带超时。- Parameters:
timeout- the timeout - 超时- Returns:
- true if acquired - 如果获取成功返回 true
-
getSubmittedCount
public long getSubmittedCount()Gets the number of submitted tasks. 获取提交的任务数。- Returns:
- the submitted count - 提交数
-
getCompletedCount
public long getCompletedCount()Gets the number of completed tasks. 获取完成的任务数。- Returns:
- the completed count - 完成数
-
getFailedCount
public long getFailedCount()Gets the number of failed tasks. 获取失败的任务数。- Returns:
- the failed count - 失败数
-
getRejectedCount
public long getRejectedCount()Gets the number of rejected tasks (when using trySubmit). 获取被拒绝的任务数(使用 trySubmit 时)。- Returns:
- the rejected count - 拒绝数
-
getWaitedCount
public long getWaitedCount()Gets the number of tasks that waited for a permit. 获取等待许可的任务数。- Returns:
- the waited count - 等待数
-
getTotalWaitNanos
public long getTotalWaitNanos()Gets the total wait time in nanoseconds. 获取总等待时间(纳秒)。- Returns:
- the total wait nanos - 总等待纳秒数
-
getAverageWaitMillis
public double getAverageWaitMillis()Gets the average wait time in milliseconds. 获取平均等待时间(毫秒)。- Returns:
- the average wait millis - 平均等待毫秒数
-
getPermitsPerSecond
public double getPermitsPerSecond()Gets the configured permits per second. 获取配置的每秒许可数。- Returns:
- the permits per second - 每秒许可数
-
getBurstCapacity
public long getBurstCapacity()Gets the burst capacity. 获取突发容量。- Returns:
- the burst capacity - 突发容量
-
getAvailablePermits
public double getAvailablePermits()Gets the currently available permits. 获取当前可用的许可数。- Returns:
- the available permits - 可用许可数
-
shutdown
public void shutdown()Shuts down the executor. 关闭执行器。 -
shutdownAndAwait
Shuts down and waits for termination. 关闭并等待终止。- Parameters:
timeout- the timeout - 超时- Returns:
- true if terminated - 如果终止返回 true
- Throws:
InterruptedException- if interrupted - 如果中断
-
isShutdown
public boolean isShutdown()Checks if the executor is shutdown. 检查执行器是否已关闭。- Returns:
- true if shutdown - 如果已关闭返回 true
-
isTerminated
public boolean isTerminated()Checks if the executor is terminated. 检查执行器是否已终止。- Returns:
- true if terminated - 如果已终止返回 true
-
close
public void close()- Specified by:
closein interfaceAutoCloseable
-