Class ReactiveCache<K,V>

java.lang.Object
cloud.opencode.base.cache.reactive.ReactiveCache<K,V>
Type Parameters:
K - key type | 键类型
V - value type | 值类型

Features | 主要功能:

  • JDK Flow API integration - JDK Flow API 集成
  • CompletableFuture-based async operations - 基于 CompletableFuture 的异步操作
  • Optional Project Reactor support - 可选 Project Reactor 支持
  • Reactive load support - 响应式加载支持

Security | 安全性:

  • Thread-safe: Yes - 线程安全: 是
  • Null-safe: Yes - 空值安全: 是

public final class ReactiveCache<K,V> extends Object
Reactive Cache - Cache with Reactive Streams support (JDK 9+ Flow API) 响应式缓存 - 支持响应式流的缓存(JDK 9+ Flow API)

Provides reactive programming support for cache operations using JDK's built-in Flow API. Also supports Project Reactor (Mono/Flux) via reflection when available.

使用 JDK 内置的 Flow API 为缓存操作提供响应式编程支持。当 Project Reactor 可用时,也通过反射支持 Mono/Flux。

Usage Examples | 使用示例:

// Create reactive cache
Cache<String, User> cache = OpenCache.getOrCreate("users");
ReactiveCache<String, User> reactiveCache = ReactiveCache.wrap(cache);

// Using JDK Flow API
reactiveCache.getMono("user:1")
    .subscribe(new Flow.Subscriber<>() {
        public void onSubscribe(Flow.Subscription s) { s.request(1); }
        public void onNext(User user) { process(user); }
        public void onError(Throwable t) { handleError(t); }
        public void onComplete() { }
    });

// If Project Reactor is available
Mono<User> mono = reactiveCache.asMono("user:1");
Flux<User> flux = reactiveCache.valuesFlux();

// Reactive load
reactiveCache.getOrLoad("user:1", key -> loadUser(key))
    .thenAccept(user -> process(user));
Since:
JDK 25, opencode-base-cache V1.9.0
Author:
Leon Soo www.LeonSoo.com
See Also:
  • Method Details

    • wrap

      public static <K,V> ReactiveCache<K,V> wrap(Cache<K,V> cache)
      Wrap a cache with reactive support 使用响应式支持包装缓存
      Type Parameters:
      K - key type | 键类型
      V - value type | 值类型
      Parameters:
      cache - the cache to wrap | 要包装的缓存
      Returns:
      reactive cache | 响应式缓存
    • getMono

      public Flow.Publisher<V> getMono(K key)
      Get value as a Publisher (JDK Flow API) 获取值作为 Publisher(JDK Flow API)
      Parameters:
      key - the key | 键
      Returns:
      publisher emitting value or empty | 发射值或空的 Publisher
    • getOrLoadMono

      public Flow.Publisher<V> getOrLoadMono(K key, Function<? super K, ? extends V> loader)
      Get or load value as a Publisher 获取或加载值作为 Publisher
      Parameters:
      key - the key | 键
      loader - the loader function | 加载函数
      Returns:
      publisher emitting loaded value | 发射加载值的 Publisher
    • getAllFlux

      public Flow.Publisher<V> getAllFlux(Iterable<? extends K> keys)
      Get all values as a Publisher (emits each value) 获取所有值作为 Publisher(发射每个值)
      Parameters:
      keys - the keys | 键集合
      Returns:
      publisher emitting values | 发射值的 Publisher
    • keysFlux

      public Flow.Publisher<K> keysFlux()
      Get all keys as a Publisher 获取所有键作为 Publisher
      Returns:
      publisher emitting keys | 发射键的 Publisher
    • valuesFlux

      public Flow.Publisher<V> valuesFlux()
      Get all values as a Publisher 获取所有值作为 Publisher
      Returns:
      publisher emitting values | 发射值的 Publisher
    • entriesFlux

      public Flow.Publisher<Map.Entry<K,V>> entriesFlux()
      Get all entries as a Publisher 获取所有条目作为 Publisher
      Returns:
      publisher emitting entries | 发射条目的 Publisher
    • getAsync

      public CompletableFuture<V> getAsync(K key)
      Get value asynchronously 异步获取值
      Parameters:
      key - the key | 键
      Returns:
      future with value or null | 包含值或 null 的 Future
    • getOrLoad

      public CompletableFuture<V> getOrLoad(K key, Function<? super K, ? extends V> loader)
      Get or load value asynchronously 异步获取或加载值
      Parameters:
      key - the key | 键
      loader - the loader | 加载器
      Returns:
      future with value | 包含值的 Future
    • putAsync

      public CompletableFuture<Void> putAsync(K key, V value)
      Put value asynchronously 异步放入值
      Parameters:
      key - the key | 键
      value - the value | 值
      Returns:
      future completing when done | 完成时的 Future
    • putWithTtlAsync

      public CompletableFuture<Void> putWithTtlAsync(K key, V value, Duration ttl)
      Put value with TTL asynchronously 异步放入带 TTL 的值
      Parameters:
      key - the key | 键
      value - the value | 值
      ttl - the TTL | 存活时间
      Returns:
      future completing when done | 完成时的 Future
    • invalidateAsync

      public CompletableFuture<Void> invalidateAsync(K key)
      Invalidate key asynchronously 异步使键失效
      Parameters:
      key - the key | 键
      Returns:
      future completing when done | 完成时的 Future
    • invalidateByPatternAsync

      public CompletableFuture<Long> invalidateByPatternAsync(String pattern)
      Invalidate by pattern asynchronously 异步按模式使键失效
      Parameters:
      pattern - the pattern | 模式
      Returns:
      future with count of invalidated entries | 包含失效条目数的 Future
    • statsAsync

      public CompletableFuture<CacheStats> statsAsync()
      Get stats asynchronously 异步获取统计
      Returns:
      future with stats | 包含统计的 Future
    • asMono

      public Object asMono(K key)
      Get value as Reactor Mono (if available) 获取值作为 Reactor Mono(如果可用)
      Parameters:
      key - the key | 键
      Returns:
      Mono object or throws if Reactor not available | Mono 对象,Reactor 不可用时抛异常
    • asFlux

      public Object asFlux()
      Get all values as Reactor Flux (if available) 获取所有值作为 Reactor Flux(如果可用)
      Returns:
      Flux object or throws if Reactor not available | Flux 对象,Reactor 不可用时抛异常
    • isReactorAvailable

      public static boolean isReactorAvailable()
      Check if Project Reactor is available 检查 Project Reactor 是否可用
      Returns:
      true if available | 可用返回 true
    • getDelegate

      public Cache<K,V> getDelegate()
      Get the underlying cache 获取底层缓存
      Returns:
      delegate cache | 委托缓存