Class PostgresDerivationQueueManager

java.lang.Object
at.aimon.memory.postgres.PostgresDerivationQueueManager
All Implemented Interfaces:
at.aimon.core.memory.deriver.DerivationQueueManager

public final class PostgresDerivationQueueManager extends Object implements at.aimon.core.memory.deriver.DerivationQueueManager
Postgres-coordinated DerivationQueueManager.

Enforces the same two invariants as InMemoryDerivationQueueManager: redaction at enqueue, per-work-unit serialization. The difference is the serialization gate: instead of an in-process monitor the gate is the mem_active_work_unit table from V1__init.sql (design §6.1.1). The (workspace_id, session_id, observer_principal_type, observer_principal_id) primary key plus an INSERT ... ON CONFLICT DO UPDATE WHERE expires_at < ... probe gives us "first writer wins" across every queue manager instance pointed at the same database.

Tasks themselves live in an in-process LinkedBlockingQueue. JVM crash loses pending tasks for now — full task-row persistence is deferred to a later stage. Each instance generates a UUID holderId at construction so a stale claim left behind by a dead instance can be stolen once expires_at passes (lease length: DEFAULT_CLAIM_LEASE).

Threading: workers are daemon threads named postgres-derivation-worker-N. If a worker dequeues a task whose work unit is currently claimed by another worker, it puts the task back on the tail of the ready queue and backs off so it does not spin — a short backoff when the holder is another worker in this JVM (tracked by an in-process busy set, so no DB probe is wasted) and a full pollInterval when the holder is another instance. While a worker holds a claim, a single daemon heartbeat thread periodically renews expires_at so a derivation that runs longer than the lease cannot have its unit stolen.

  • Constructor Summary

    Constructors
    Constructor
    Description
    PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties)
    Creates a new Postgres-backed queue manager.
    PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease)
    Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable.
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    enqueue(at.aimon.core.memory.deriver.DerivationTask task)
     
    Returns the per-instance holder id used for claim ownership in mem_active_work_unit.
    void
     
    at.aimon.core.memory.deriver.QueueStats
     
    void
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • PostgresDerivationQueueManager

      public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties)
      Creates a new Postgres-backed queue manager. The default claim lease (DEFAULT_CLAIM_LEASE) is sufficient for typical deriver invocations; tests use the explicit-lease constructor.
    • PostgresDerivationQueueManager

      public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease)
      Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable.
  • Method Details

    • getHolderId

      public String getHolderId()
      Returns the per-instance holder id used for claim ownership in mem_active_work_unit.
    • enqueue

      public void enqueue(at.aimon.core.memory.deriver.DerivationTask task)
      Specified by:
      enqueue in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • start

      public void start()
      Specified by:
      start in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • stop

      public void stop()
      Specified by:
      stop in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • stats

      public at.aimon.core.memory.deriver.QueueStats stats()
      Specified by:
      stats in interface at.aimon.core.memory.deriver.DerivationQueueManager