Class PostgresDerivationQueueManager

java.lang.Object
at.aimon.memory.postgres.PostgresDerivationQueueManager
All Implemented Interfaces:
at.aimon.core.memory.deriver.DerivationQueueManager

public final class PostgresDerivationQueueManager extends Object implements at.aimon.core.memory.deriver.DerivationQueueManager
Postgres-coordinated DerivationQueueManager.

Enforces the same two invariants as InMemoryDerivationQueueManager: redaction at enqueue, per-work-unit serialization. The difference is the serialization gate: instead of an in-process monitor the gate is the mem_active_work_unit table from V1__init.sql (design §6.1.1). The (workspace_id, session_id, observer_principal_type, observer_principal_id) primary key plus an INSERT ... ON CONFLICT DO UPDATE WHERE expires_at < ... probe gives us "first writer wins" across every queue manager instance pointed at the same database.

Tasks themselves live in an in-process LinkedBlockingQueue. JVM crash loses pending tasks for now — full task-row persistence is deferred to a later stage. Each instance generates a UUID holderId at construction so a stale claim left behind by a dead instance can be stolen once expires_at passes (lease length: DEFAULT_CLAIM_LEASE).

Threading: workers are daemon threads named postgres-derivation-worker-N. If a worker dequeues a task whose work unit is currently claimed by another worker (in this JVM or another), it puts the task back on the tail of the ready queue and sleeps pollInterval so it does not spin.

  • Constructor Summary

    Constructors
    Constructor
    Description
    PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties)
    Creates a new Postgres-backed queue manager.
    PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease)
    Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable.
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    enqueue(at.aimon.core.memory.deriver.DerivationTask task)
     
    Returns the per-instance holder id used for claim ownership in mem_active_work_unit.
    void
     
    at.aimon.core.memory.deriver.QueueStats
     
    void
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • PostgresDerivationQueueManager

      public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties)
      Creates a new Postgres-backed queue manager. The default claim lease (DEFAULT_CLAIM_LEASE) is sufficient for typical deriver invocations; tests use the explicit-lease constructor.
    • PostgresDerivationQueueManager

      public PostgresDerivationQueueManager(DataSource dataSource, at.aimon.core.memory.deriver.Deriver deriver, at.aimon.core.memory.redaction.RedactionPolicy redactionPolicy, at.aimon.core.memory.MemoryProperties.DeriverProperties properties, Duration claimLease)
      Test-friendly constructor that lets callers shorten the claim lease so concurrent-claim tests don't have to wait minutes for an expired lease to be stealable.
  • Method Details

    • getHolderId

      public String getHolderId()
      Returns the per-instance holder id used for claim ownership in mem_active_work_unit.
    • enqueue

      public void enqueue(at.aimon.core.memory.deriver.DerivationTask task)
      Specified by:
      enqueue in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • start

      public void start()
      Specified by:
      start in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • stop

      public void stop()
      Specified by:
      stop in interface at.aimon.core.memory.deriver.DerivationQueueManager
    • stats

      public at.aimon.core.memory.deriver.QueueStats stats()
      Specified by:
      stats in interface at.aimon.core.memory.deriver.DerivationQueueManager