Package at.aimon.session.postgres
Class PostgresConversationInbox
java.lang.Object
at.aimon.session.postgres.PostgresConversationInbox
- All Implemented Interfaces:
at.aimon.session.base.spi.ConversationInbox
public final class PostgresConversationInbox
extends Object
implements at.aimon.session.base.spi.ConversationInbox
Postgres-backed
ConversationInbox per design §4.3.
One row per delivered message in conversation_inbox. Drain is a single SQL statement —
DELETE ... USING (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1024) RETURNING — that preserves priority-then-FIFO
ordering via ORDER BY priority, id on the inner select.
SKIP LOCKED ensures concurrent drains never block each other (defense in depth: the manager invariant is that
only the lock holder calls collect, but the SPI shouldn't deadlock on a misbehaving caller).
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intMaximum messages a singlecollectcall removes; matches design §4.3. -
Constructor Summary
ConstructorsConstructorDescriptionPostgresConversationInbox(DataSource dataSource) PostgresConversationInbox(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper, int collectLimit) -
Method Summary
Modifier and TypeMethodDescriptionList<at.aimon.session.base.inbox.InboundMessage>collect(at.aimon.core.agent.conversation.ConversationId id, at.aimon.core.agent.queue.QueuedInputPriority maxPriority) at.aimon.session.base.inbox.InboundMessageIddeliver(at.aimon.session.base.inbox.InboundMessage message) booleanisEmpty(at.aimon.core.agent.conversation.ConversationId id) voidpurge(at.aimon.core.agent.conversation.ConversationId id)
-
Field Details
-
DEFAULT_COLLECT_LIMIT
public static final int DEFAULT_COLLECT_LIMITMaximum messages a singlecollectcall removes; matches design §4.3.- See Also:
-
-
Constructor Details
-
PostgresConversationInbox
-
PostgresConversationInbox
public PostgresConversationInbox(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper, int collectLimit)
-
-
Method Details
-
deliver
public at.aimon.session.base.inbox.InboundMessageId deliver(at.aimon.session.base.inbox.InboundMessage message) - Specified by:
deliverin interfaceat.aimon.session.base.spi.ConversationInbox
-
collect
public List<at.aimon.session.base.inbox.InboundMessage> collect(at.aimon.core.agent.conversation.ConversationId id, at.aimon.core.agent.queue.QueuedInputPriority maxPriority) - Specified by:
collectin interfaceat.aimon.session.base.spi.ConversationInbox
-
isEmpty
public boolean isEmpty(at.aimon.core.agent.conversation.ConversationId id) - Specified by:
isEmptyin interfaceat.aimon.session.base.spi.ConversationInbox
-
purge
public void purge(at.aimon.core.agent.conversation.ConversationId id) - Specified by:
purgein interfaceat.aimon.session.base.spi.ConversationInbox
-