Class KnowledgeStoreOutboxRelay

java.lang.Object
at.aimon.memory.postgres.KnowledgeStoreOutboxRelay
All Implemented Interfaces:
AutoCloseable

public final class KnowledgeStoreOutboxRelay extends Object implements AutoCloseable
Polling worker that drains mem_outbox and dispatches each claimed row to a KnowledgeStore — the read side of the outbox pattern from design doc §5.2.

Writes happen elsewhere: PostgresObservationStore.save / delete / merge insert into mem_outbox inside the same JDBC transaction as the metadata mutation, so the outbox row is durably published only if the metadata change commits. This relay then asynchronously moves those rows into the embedding/vector index, decoupling the hot path from KnowledgeStore latency or availability.

At-least-once semantics

A claim is held via claimed_by / claimed_until; a successful dispatch deletes the row, a failed one resets the claim and bumps next_attempt_at. If a worker crashes between dispatch and delete the row will be re-claimed when its claim expires and re-dispatched. The KnowledgeStore.reindex(at.aimon.core.knowledge.KnowledgeScope, at.aimon.core.knowledge.KnowledgeSource, at.aimon.core.knowledge.IndexOptions) contract is idempotent (clears the scope and re-emits all staged documents) so duplicate dispatches converge to the same end state.

SKIP LOCKED isolation

Claims use SELECT ... FOR UPDATE SKIP LOCKED so multiple relay processes scaled out on the same database never block each other and never double-claim a row.

Backoff and poison pill

Failed rows are retried with capped exponential backoff: the row's next_attempt_at is set to now() + min(60, 2^attempt) seconds. After RelayOptions.getMaxAttempts() consecutive failures the row is marked with claimed_by = 'POISON' and claimed_until pinned far into the future so it stays in the table for forensics but is excluded from drain scans.

Lifecycle

The relay is intended to be application-scoped and long-lived. start() spawns a single daemon thread that loops drainOnce() then sleeps RelayOptions.getPollIntervalMillis(). stop() interrupts the thread and joins it. Calling drainOnce() directly from a test or from a custom scheduler is also supported.

See Also:
  • Field Details

  • Constructor Details

    • KnowledgeStoreOutboxRelay

      public KnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore)
      Convenience constructor — uses KnowledgeStoreObservationIndex.DEFAULT_AGENT_NAME as the agent name segment of the KnowledgeScope and RelayOptions.defaults().
      Parameters:
      dataSource - datasource pointing at the mem_outbox-bearing schema (must not be null)
      knowledgeStore - destination knowledge store (must not be null)
    • KnowledgeStoreOutboxRelay

      public KnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore, String agentName, RelayOptions options)
      Full constructor.
      Parameters:
      dataSource - datasource pointing at the mem_outbox-bearing schema (must not be null)
      knowledgeStore - destination knowledge store (must not be null)
      agentName - value used as KnowledgeScope.getAgentName() (must not be null or blank)
      options - relay options (must not be null)
  • Method Details

    • start

      public void start()
      Spawns the daemon poller. Idempotent: a second call while running is a no-op.
    • stop

      public void stop()
      Signals the daemon to stop and joins it (best-effort, with a short timeout so callers don't hang on a stuck dispatch). Idempotent.
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
    • drainOnce

      public DrainResult drainOnce()
      Runs a single drain pass: claims a batch, dispatches each row, then settles them (success → delete, failure → backoff or poison).
      Returns:
      summary counts for the batch