Class PostgresDerivationQueueManager
- All Implemented Interfaces:
at.aimon.core.memory.deriver.DerivationQueueManager
DerivationQueueManager.
Enforces the same two invariants as
InMemoryDerivationQueueManager:
redaction at enqueue, per-work-unit serialization. The difference is the
serialization gate: instead of an in-process monitor the gate is the
mem_active_work_unit table from V1__init.sql (design §6.1.1).
The (workspace_id, session_id, observer_principal_type,
observer_principal_id) primary key plus an INSERT ... ON CONFLICT DO
UPDATE WHERE expires_at < ... probe gives us "first writer wins" across
every queue manager instance pointed at the same database.
Tasks themselves live in an in-process LinkedBlockingQueue. JVM crash
loses pending tasks for now — full task-row persistence is deferred to a
later stage. Each instance generates a UUID holderId at construction
so a stale claim left behind by a dead instance can be stolen once
expires_at passes (lease length: DEFAULT_CLAIM_LEASE).
Threading: workers are daemon threads named postgres-derivation-worker-N.
If a worker dequeues a task whose work unit is currently claimed by another
worker, it puts the task back on the tail of the ready queue and backs off so it
does not spin — a short backoff when the holder is another worker in this JVM
(tracked by an in-process busy set, so no DB probe is wasted) and a full
pollInterval when the holder is another instance. While a worker holds a
claim, a single daemon heartbeat thread periodically renews expires_at so
a derivation that runs longer than the lease cannot have its unit stolen.
-
Constructor Summary
ConstructorsConstructorDescriptionPostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties) Creates a new Postgres-backed queue manager.PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease) Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable. -
Method Summary
-
Constructor Details
-
PostgresDerivationQueueManager
public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties) Creates a new Postgres-backed queue manager. The default claim lease (DEFAULT_CLAIM_LEASE) is sufficient for typical deriver invocations; tests use the explicit-lease constructor. -
PostgresDerivationQueueManager
public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease) Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable.
-
-
Method Details
-
getHolderId
Returns the per-instance holder id used for claim ownership inmem_active_work_unit. -
enqueue
public void enqueue(at.aimon.core.memory.deriver.DerivationTask task) - Specified by:
enqueuein interfaceat.aimon.core.memory.deriver.DerivationQueueManager
-
start
public void start()- Specified by:
startin interfaceat.aimon.core.memory.deriver.DerivationQueueManager
-
stop
public void stop()- Specified by:
stopin interfaceat.aimon.core.memory.deriver.DerivationQueueManager
-
stats
public at.aimon.core.memory.deriver.QueueStats stats()- Specified by:
statsin interfaceat.aimon.core.memory.deriver.DerivationQueueManager
-