Class KnowledgeStoreOutboxRelay
- All Implemented Interfaces:
AutoCloseable
mem_outbox and dispatches each claimed
row to a KnowledgeStore — the read side of the outbox pattern from
design doc §5.2.
Writes happen elsewhere: PostgresObservationStore.save / delete /
merge insert into mem_outbox inside the same JDBC transaction
as the metadata mutation, so the outbox row is durably published only if the
metadata change commits. This relay then asynchronously moves those rows
into the embedding/vector index, decoupling the hot path from KnowledgeStore
latency or availability.
At-least-once semantics
A claim is held via claimed_by / claimed_until; a successful
dispatch deletes the row, a failed one resets the claim and bumps
next_attempt_at. If a worker crashes between dispatch and delete the
row will be re-claimed when its claim expires and re-dispatched. The
KnowledgeStore.reindex(at.aimon.core.knowledge.KnowledgeScope, at.aimon.core.knowledge.KnowledgeSource, at.aimon.core.knowledge.IndexOptions) contract is idempotent (clears the scope and
re-emits all staged documents) so duplicate dispatches converge to the same
end state.
SKIP LOCKED isolation
Claims use SELECT ... FOR UPDATE SKIP LOCKED so multiple relay
processes scaled out on the same database never block each other and never
double-claim a row.
Backoff and poison pill
Failed rows are retried with capped exponential backoff: the row's
next_attempt_at is set to now() + min(60, 2^attempt) seconds.
After RelayOptions.getMaxAttempts() consecutive failures the row is
marked with claimed_by = 'POISON' and claimed_until pinned
far into the future so it stays in the table for forensics but is excluded
from drain scans.
Lifecycle
The relay is intended to be application-scoped and long-lived. start()
spawns a single daemon thread that loops drainOnce() then sleeps
RelayOptions.getPollIntervalMillis(). stop() interrupts the
thread and joins it. Calling drainOnce() directly from a test or
from a custom scheduler is also supported.
- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final StringSentinel value written toclaimed_byfor rows that have exceededRelayOptions.getMaxAttempts()so they are visibly distinct from a regular in-flight claim. -
Constructor Summary
ConstructorsConstructorDescriptionKnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore) Convenience constructor — usesKnowledgeStoreObservationIndex.DEFAULT_AGENT_NAMEas the agent name segment of theKnowledgeScopeandRelayOptions.defaults().KnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore, String agentName, RelayOptions options) Full constructor. -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()Runs a single drain pass: claims a batch, dispatches each row, then settles them (success → delete, failure → backoff or poison).voidstart()Spawns the daemon poller.voidstop()Signals the daemon to stop and joins it (best-effort, with a short timeout so callers don't hang on a stuck dispatch).
-
Field Details
-
POISON_CLAIM
Sentinel value written toclaimed_byfor rows that have exceededRelayOptions.getMaxAttempts()so they are visibly distinct from a regular in-flight claim.- See Also:
-
-
Constructor Details
-
KnowledgeStoreOutboxRelay
public KnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore) Convenience constructor — usesKnowledgeStoreObservationIndex.DEFAULT_AGENT_NAMEas the agent name segment of theKnowledgeScopeandRelayOptions.defaults().- Parameters:
dataSource- datasource pointing at themem_outbox-bearing schema (must not be null)knowledgeStore- destination knowledge store (must not be null)
-
KnowledgeStoreOutboxRelay
public KnowledgeStoreOutboxRelay(DataSource dataSource, at.aimon.core.knowledge.KnowledgeStore knowledgeStore, String agentName, RelayOptions options) Full constructor.- Parameters:
dataSource- datasource pointing at themem_outbox-bearing schema (must not be null)knowledgeStore- destination knowledge store (must not be null)agentName- value used asKnowledgeScope.getAgentName()(must not be null or blank)options- relay options (must not be null)
-
-
Method Details
-
start
public void start()Spawns the daemon poller. Idempotent: a second call while running is a no-op. -
stop
public void stop()Signals the daemon to stop and joins it (best-effort, with a short timeout so callers don't hang on a stuck dispatch). Idempotent. -
close
public void close()- Specified by:
closein interfaceAutoCloseable
-
drainOnce
Runs a single drain pass: claims a batch, dispatches each row, then settles them (success → delete, failure → backoff or poison).- Returns:
- summary counts for the batch
-