Class PostgresConversationInbox

java.lang.Object
at.aimon.session.postgres.PostgresConversationInbox
All Implemented Interfaces:
at.aimon.session.web.spi.ConversationInbox

public final class PostgresConversationInbox extends Object implements at.aimon.session.web.spi.ConversationInbox
Postgres-backed ConversationInbox per design §4.3.

One row per delivered message in conversation_inbox. Drain is a single SQL statement — DELETE ... USING (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1024) RETURNING — that preserves priority-then-FIFO ordering via ORDER BY priority, id on the inner select.

SKIP LOCKED ensures concurrent drains never block each other (defense in depth: the manager invariant is that only the lock holder calls collect, but the SPI shouldn't deadlock on a misbehaving caller).

  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    static final int
    Maximum messages a single collect call removes; matches design §4.3.
  • Constructor Summary

    Constructors
    Constructor
    Description
     
    PostgresConversationInbox(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper, int collectLimit)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    List<at.aimon.session.web.inbox.InboundMessage>
    collect(at.aimon.core.agent.conversation.ConversationId id, at.aimon.core.agent.queue.QueuedInputPriority maxPriority)
     
    at.aimon.session.web.inbox.InboundMessageId
    deliver(at.aimon.session.web.inbox.InboundMessage message)
     
    boolean
    isEmpty(at.aimon.core.agent.conversation.ConversationId id)
     
    void
    purge(at.aimon.core.agent.conversation.ConversationId id)
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • DEFAULT_COLLECT_LIMIT

      public static final int DEFAULT_COLLECT_LIMIT
      Maximum messages a single collect call removes; matches design §4.3.
      See Also:
  • Constructor Details

    • PostgresConversationInbox

      public PostgresConversationInbox(DataSource dataSource)
    • PostgresConversationInbox

      public PostgresConversationInbox(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper, int collectLimit)
  • Method Details

    • deliver

      public at.aimon.session.web.inbox.InboundMessageId deliver(at.aimon.session.web.inbox.InboundMessage message)
      Specified by:
      deliver in interface at.aimon.session.web.spi.ConversationInbox
    • collect

      public List<at.aimon.session.web.inbox.InboundMessage> collect(at.aimon.core.agent.conversation.ConversationId id, at.aimon.core.agent.queue.QueuedInputPriority maxPriority)
      Specified by:
      collect in interface at.aimon.session.web.spi.ConversationInbox
    • isEmpty

      public boolean isEmpty(at.aimon.core.agent.conversation.ConversationId id)
      Specified by:
      isEmpty in interface at.aimon.session.web.spi.ConversationInbox
    • purge

      public void purge(at.aimon.core.agent.conversation.ConversationId id)
      Specified by:
      purge in interface at.aimon.session.web.spi.ConversationInbox