Package at.aimon.session.postgres
Class PostgresConversationSignalBus
java.lang.Object
at.aimon.session.postgres.PostgresConversationSignalBus
- All Implemented Interfaces:
at.aimon.session.web.spi.ConversationSignalBus,AutoCloseable
public final class PostgresConversationSignalBus
extends Object
implements at.aimon.session.web.spi.ConversationSignalBus, AutoCloseable
Postgres-backed
ConversationSignalBus per design §4.2.
Hybrid model:
publish(at.aimon.session.web.spi.ConversationSignal)inserts the signal envelope intoconversation_signal(jsonb payload) inside a transaction, then issuespg_notify(conversation_signal_doorbell, '<id>'). The NOTIFY payload is just the row id — never approaches the 8 KB Postgres limit no matter how large the actual signal payload is.subscribe(at.aimon.core.agent.conversation.ConversationId, java.util.function.Consumer<at.aimon.session.web.spi.ConversationSignal>)registers an in-memory handler and asks theListenDispatcherto track this conversation. The dispatcher owns one dedicated long-lived connection runningLISTEN conversation_signal_doorbell; on each notification (or on a periodic 5 s self-poll backstop) it fetches new rows fromconversation_signalvia the suppliedfetchDataSource, decodes them, and dispatches to registered handlers.
Connection topology (design §6):
publishDataSource— Hikari main pool (short transactions for INSERT + NOTIFY).fetchDataSource— Hikari signal pool (recommendedmin=1, max=2); used by the dispatcher.- The dispatcher's LISTEN connection — opened directly via
DriverManager, outside Hikari.
dropSelfBroadcast (default true) skips delivery when a signal's originNodeId equals this
bus's nodeId, matching the §5.3 fan-out diagram.
-
Nested Class Summary
Nested classes/interfaces inherited from interface at.aimon.session.web.spi.ConversationSignalBus
at.aimon.session.web.spi.ConversationSignalBus.Subscription -
Constructor Summary
ConstructorsConstructorDescriptionPostgresConversationSignalBus(DataSource publishDataSource, DataSource fetchDataSource, String jdbcUrl, Properties listenConnectionProps, String nodeId) PostgresConversationSignalBus(DataSource publishDataSource, DataSource fetchDataSource, String jdbcUrl, Properties listenConnectionProps, String nodeId, com.fasterxml.jackson.databind.ObjectMapper mapper, boolean dropSelfBroadcast) -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()voidpublish(at.aimon.session.web.spi.ConversationSignal signal) at.aimon.session.web.spi.ConversationSignalBus.Subscriptionsubscribe(at.aimon.core.agent.conversation.ConversationId id, Consumer<at.aimon.session.web.spi.ConversationSignal> handler) intsweepOlderThan(Instant cutoff) Reapsconversation_signalrows older thancutoff.
-
Constructor Details
-
PostgresConversationSignalBus
public PostgresConversationSignalBus(DataSource publishDataSource, DataSource fetchDataSource, String jdbcUrl, Properties listenConnectionProps, String nodeId) -
PostgresConversationSignalBus
public PostgresConversationSignalBus(DataSource publishDataSource, DataSource fetchDataSource, String jdbcUrl, Properties listenConnectionProps, String nodeId, com.fasterxml.jackson.databind.ObjectMapper mapper, boolean dropSelfBroadcast)
-
-
Method Details
-
subscribe
public at.aimon.session.web.spi.ConversationSignalBus.Subscription subscribe(at.aimon.core.agent.conversation.ConversationId id, Consumer<at.aimon.session.web.spi.ConversationSignal> handler) - Specified by:
subscribein interfaceat.aimon.session.web.spi.ConversationSignalBus
-
publish
public void publish(at.aimon.session.web.spi.ConversationSignal signal) - Specified by:
publishin interfaceat.aimon.session.web.spi.ConversationSignalBus
-
sweepOlderThan
Reapsconversation_signalrows older thancutoff. Called by the manager's scheduled cleanup (design §4.2).- Parameters:
cutoff- rows withcreated_at < cutoffare deleted- Returns:
- number of rows deleted
-
close
public void close()- Specified by:
closein interfaceAutoCloseable
-