Class PostgresObservationStore

java.lang.Object
at.aimon.memory.postgres.PostgresObservationStore
All Implemented Interfaces:
at.aimon.core.memory.ObservationStore

public final class PostgresObservationStore extends Object implements at.aimon.core.memory.ObservationStore
Postgres-backed ObservationStore implementing the metadata side of the C3 split (design doc §5.2).

Search (semanticSearch) is intentionally not implemented here: vector search is delegated to the KnowledgeStore via an ObservationIndex so the memory layer does not build a parallel RAG stack. Callers needing semantic search should compose this store with another one that owns indexing — calling semanticSearch(at.aimon.core.memory.PeerView, java.lang.String, int) directly throws UnsupportedOperationException.

Every write to mem_observation is paired with an outbox row in mem_outbox written inside the same JDBC transaction (design §5.2 outbox). A separate worker drains the outbox and pushes embeddings to the KnowledgeStore, giving us at-least-once delivery without a 2PC across heterogeneous backends.

merge(ObservationId, ObservationId, Observation) performs a soft-delete on the loser (sets soft_deleted_at = now()), keeping it for the 30-day audit window described in §5.2. delete(ObservationId) is a hard delete because the audit retention belongs to merge(), not to arbitrary deletes.

Wraps SQLException as AimonException (the project-wide pattern, see at.aimon.session.base.exception.IdempotencyStoreException).

  • Constructor Summary

    Constructors
    Constructor
    Description
    PostgresObservationStore(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper)
    Creates a new store.
  • Method Summary

    Modifier and Type
    Method
    Description
    long
    count(at.aimon.core.memory.PeerView subject)
     
    void
    delete(at.aimon.core.memory.ObservationId id)
     
    List<at.aimon.core.memory.Observation>
    findByConfidenceBelow(at.aimon.core.memory.PeerView subject, double threshold, int limit)
     
    Optional<at.aimon.core.memory.Observation>
    findById(at.aimon.core.memory.ObservationId id)
     
    List<at.aimon.core.memory.Observation>
    findBySubject(at.aimon.core.memory.PeerView subject, int limit)
     
    List<at.aimon.core.memory.PeerView>
    findSubjects(at.aimon.core.memory.Workspace workspace, int limit)
     
    at.aimon.core.memory.Observation
    merge(at.aimon.core.memory.ObservationId winner, at.aimon.core.memory.ObservationId loser, at.aimon.core.memory.Observation merged)
     
    int
    purgeSoftDeletedBefore(at.aimon.core.memory.Workspace workspace, Instant cutoff)
     
    at.aimon.core.memory.Observation
    save(at.aimon.core.memory.Observation observation)
     
    List<at.aimon.core.memory.Observation>
    semanticSearch(at.aimon.core.memory.PeerView subject, String query, int topK)
    void
    softDelete(at.aimon.core.memory.ObservationId id)
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • PostgresObservationStore

      public PostgresObservationStore(DataSource dataSource, com.fasterxml.jackson.databind.ObjectMapper mapper)
      Creates a new store.
      Parameters:
      dataSource - JDBC pool against the Postgres schema in db/postgres/V1__init.sql; must not be null
      mapper - Jackson mapper used for source_message_ids and metadata jsonb columns; injected so multi-instance configs can control Jackson features (e.g., FAIL_ON_UNKNOWN_PROPERTIES)
      Throws:
      NullPointerException - if any argument is null
  • Method Details

    • save

      public at.aimon.core.memory.Observation save(at.aimon.core.memory.Observation observation)
      Specified by:
      save in interface at.aimon.core.memory.ObservationStore
    • findById

      public Optional<at.aimon.core.memory.Observation> findById(at.aimon.core.memory.ObservationId id)
      Specified by:
      findById in interface at.aimon.core.memory.ObservationStore
    • findBySubject

      public List<at.aimon.core.memory.Observation> findBySubject(at.aimon.core.memory.PeerView subject, int limit)
      Specified by:
      findBySubject in interface at.aimon.core.memory.ObservationStore
    • count

      public long count(at.aimon.core.memory.PeerView subject)
      Specified by:
      count in interface at.aimon.core.memory.ObservationStore
    • semanticSearch

      public List<at.aimon.core.memory.Observation> semanticSearch(at.aimon.core.memory.PeerView subject, String query, int topK)
      Always throws UnsupportedOperationException.

      Per design §5.2 (review C3) the metadata store and the search index are split: vector search lives in the KnowledgeStore via an ObservationIndex. To get semantic search, wrap this store in another ObservationStore that owns indexing (the in-memory reference does this with InMemoryObservationIndex).

      Specified by:
      semanticSearch in interface at.aimon.core.memory.ObservationStore
    • findByConfidenceBelow

      public List<at.aimon.core.memory.Observation> findByConfidenceBelow(at.aimon.core.memory.PeerView subject, double threshold, int limit)
      Specified by:
      findByConfidenceBelow in interface at.aimon.core.memory.ObservationStore
    • findSubjects

      public List<at.aimon.core.memory.PeerView> findSubjects(at.aimon.core.memory.Workspace workspace, int limit)
      Specified by:
      findSubjects in interface at.aimon.core.memory.ObservationStore
    • delete

      public void delete(at.aimon.core.memory.ObservationId id)
      Specified by:
      delete in interface at.aimon.core.memory.ObservationStore
    • merge

      public at.aimon.core.memory.Observation merge(at.aimon.core.memory.ObservationId winner, at.aimon.core.memory.ObservationId loser, at.aimon.core.memory.Observation merged)
      Specified by:
      merge in interface at.aimon.core.memory.ObservationStore
    • softDelete

      public void softDelete(at.aimon.core.memory.ObservationId id)
      Specified by:
      softDelete in interface at.aimon.core.memory.ObservationStore
    • purgeSoftDeletedBefore

      public int purgeSoftDeletedBefore(at.aimon.core.memory.Workspace workspace, Instant cutoff)
      Specified by:
      purgeSoftDeletedBefore in interface at.aimon.core.memory.ObservationStore