Documentation
¶
Index ¶
- Constants
- Variables
- func CleanDB(t *testing.T, db *bun.DB)
- func NewPostgresConn(appState *models.AppState) *bun.DB
- type DocumentCollectionDAO
- func (dc *DocumentCollectionDAO) Create(ctx context.Context) error
- func (dc *DocumentCollectionDAO) CreateDocuments(ctx context.Context, documents []models.Document) ([]uuid.UUID, error)
- func (dc *DocumentCollectionDAO) Delete(ctx context.Context) error
- func (dc *DocumentCollectionDAO) DeleteDocumentsByUUID(ctx context.Context, documentUUIDs []uuid.UUID) error
- func (dc *DocumentCollectionDAO) GetAll(ctx context.Context) ([]models.DocumentCollection, error)
- func (dc *DocumentCollectionDAO) GetByName(ctx context.Context) error
- func (dc *DocumentCollectionDAO) GetDocuments(ctx context.Context, limit int, uuids []uuid.UUID, documentIDs []string) ([]models.Document, error)
- func (dc *DocumentCollectionDAO) SearchDocuments(ctx context.Context, query *models.DocumentSearchPayload, limit int, ...) (*models.DocumentSearchResultPage, error)
- func (dc *DocumentCollectionDAO) Update(ctx context.Context) error
- func (dc *DocumentCollectionDAO) UpdateDocuments(ctx context.Context, documents []models.Document) error
- type DocumentCollectionSchema
- type DocumentSchemaTemplate
- type DocumentStore
- func (ds *DocumentStore) CreateCollection(ctx context.Context, collection models.DocumentCollection) error
- func (ds *DocumentStore) CreateCollectionIndex(ctx context.Context, collectionName string, force bool) error
- func (ds *DocumentStore) CreateDocuments(ctx context.Context, collectionName string, documents []models.Document) ([]uuid.UUID, error)
- func (ds *DocumentStore) DeleteCollection(ctx context.Context, collectionName string) error
- func (ds *DocumentStore) DeleteDocuments(ctx context.Context, collectionName string, documentUUID []uuid.UUID) error
- func (ds *DocumentStore) GetClient() any
- func (ds *DocumentStore) GetCollection(ctx context.Context, collectionName string) (models.DocumentCollection, error)
- func (ds *DocumentStore) GetCollectionList(ctx context.Context) ([]models.DocumentCollection, error)
- func (ds *DocumentStore) GetDocuments(ctx context.Context, collectionName string, uuids []uuid.UUID, ...) ([]models.Document, error)
- func (ds *DocumentStore) OnStart(ctx context.Context) error
- func (ds *DocumentStore) SearchCollection(ctx context.Context, query *models.DocumentSearchPayload, limit int, ...) (*models.DocumentSearchResultPage, error)
- func (ds *DocumentStore) Shutdown(_ context.Context) error
- func (ds *DocumentStore) UpdateCollection(ctx context.Context, collection models.DocumentCollection) error
- func (ds *DocumentStore) UpdateDocuments(ctx context.Context, collectionName string, documents []models.Document) error
- type JSONQuery
- type MessageStoreSchema
- type MessageVectorStoreSchema
- type PostgresMemoryStore
- func (pms *PostgresMemoryStore) Close() error
- func (pms *PostgresMemoryStore) DeleteSession(ctx context.Context, sessionID string) error
- func (pms *PostgresMemoryStore) GetClient() *bun.DB
- func (pms *PostgresMemoryStore) GetMemory(ctx context.Context, appState *models.AppState, sessionID string, ...) (*models.Memory, error)
- func (pms *PostgresMemoryStore) GetMessageVectors(ctx context.Context, _ *models.AppState, sessionID string) ([]models.MessageEmbedding, error)
- func (pms *PostgresMemoryStore) GetSession(ctx context.Context, _ *models.AppState, sessionID string) (*models.Session, error)
- func (pms *PostgresMemoryStore) GetSummary(ctx context.Context, _ *models.AppState, sessionID string) (*models.Summary, error)
- func (pms *PostgresMemoryStore) OnStart(_ context.Context, appState *models.AppState) error
- func (pms *PostgresMemoryStore) PurgeDeleted(ctx context.Context) error
- func (pms *PostgresMemoryStore) PutMemory(ctx context.Context, appState *models.AppState, sessionID string, ...) error
- func (pms *PostgresMemoryStore) PutMessageMetadata(ctx context.Context, _ *models.AppState, sessionID string, ...) error
- func (pms *PostgresMemoryStore) PutMessageVectors(ctx context.Context, _ *models.AppState, sessionID string, ...) error
- func (pms *PostgresMemoryStore) PutSession(ctx context.Context, _ *models.AppState, session *models.Session) error
- func (pms *PostgresMemoryStore) PutSummary(ctx context.Context, _ *models.AppState, sessionID string, ...) error
- func (pms *PostgresMemoryStore) SearchMemory(ctx context.Context, appState *models.AppState, sessionID string, ...) ([]models.MemorySearchResult, error)
- type SessionSchema
- type SummaryStoreSchema
- type VectorColIndex
Constants ¶
const DefaultDistanceFunction = "cosine"
DefaultDistanceFunction is the default distance function to use for indexing. Using cosine distance function by default in order to support both normalized and non-normalized embeddings. A future improvement would be to use a the inner product distance function for normalized embeddings.
const DefaultDocumentSearchLimit = 20
const DefaultMemorySearchLimit = 10
const EmbeddingColName = "embedding"
const MaxParallelWorkersPerGather = 4
const MinRowsForIndex = 10000
MinRowsForIndex is the minimum number of rows required to create an index. The pgvector docs recommend creating the index after a representative sample of data is loaded. This is a guesstimate.
Variables ¶
var IndexMutexMap = make(map[string]*sync.Mutex)
IndexMutexMap stores a mutex for each collection.
Functions ¶
Types ¶
type DocumentCollectionDAO ¶
type DocumentCollectionDAO struct { models.DocumentCollection // contains filtered or unexported fields }
func NewDocumentCollectionDAO ¶
func NewDocumentCollectionDAO( appState *models.AppState, db *bun.DB, collection models.DocumentCollection, ) *DocumentCollectionDAO
func (*DocumentCollectionDAO) Create ¶
func (dc *DocumentCollectionDAO) Create( ctx context.Context, ) error
Create inserts a collection into the collections table and creates a table for the collection's documents.
func (*DocumentCollectionDAO) CreateDocuments ¶
func (dc *DocumentCollectionDAO) CreateDocuments( ctx context.Context, documents []models.Document, ) ([]uuid.UUID, error)
CreateDocuments inserts the given documents into the given collection.
func (*DocumentCollectionDAO) Delete ¶
func (dc *DocumentCollectionDAO) Delete(ctx context.Context) error
Delete deletes a collection from the collections table and drops the collection's document table.
func (*DocumentCollectionDAO) DeleteDocumentsByUUID ¶
func (dc *DocumentCollectionDAO) DeleteDocumentsByUUID( ctx context.Context, documentUUIDs []uuid.UUID, ) error
DeleteDocumentsByUUID deletes a single document from a collection in the SqlDB, identified by its UUID.
func (*DocumentCollectionDAO) GetAll ¶
func (dc *DocumentCollectionDAO) GetAll( ctx context.Context, ) ([]models.DocumentCollection, error)
GetAll returns a list of all collections from the collections table.
func (*DocumentCollectionDAO) GetByName ¶
func (dc *DocumentCollectionDAO) GetByName( ctx context.Context, ) error
GetByName returns a collection from the collections table by name.
func (*DocumentCollectionDAO) GetDocuments ¶
func (dc *DocumentCollectionDAO) GetDocuments( ctx context.Context, limit int, uuids []uuid.UUID, documentIDs []string, ) ([]models.Document, error)
GetDocuments retrieves documents. If `documents` is non-Nil, it will use the document UUIDs to retrieve these documents. Otherwise, it will retrieve all documents. If limit is greater than 0, it will only retrieve limit many documents.
func (*DocumentCollectionDAO) SearchDocuments ¶
func (dc *DocumentCollectionDAO) SearchDocuments(ctx context.Context, query *models.DocumentSearchPayload, limit int, withMMR bool, pageNumber int, pageSize int) (*models.DocumentSearchResultPage, error)
SearchDocuments searches for documents in a collection. Currently pagination is not supported.
func (*DocumentCollectionDAO) Update ¶
func (dc *DocumentCollectionDAO) Update( ctx context.Context, ) error
Update updates a collection in the collections table.
func (*DocumentCollectionDAO) UpdateDocuments ¶
func (dc *DocumentCollectionDAO) UpdateDocuments( ctx context.Context, documents []models.Document, ) error
UpdateDocuments updates the document_id, metadata, and embedding columns of the given documents in the given collection. The documents must have non-nil uuids. **IMPORTANT:** We determine which columns to update based on the fields that are non-zero in the given documents. This means that all documents must have data for the same fields. If a document is missing data for a field, there could be data loss.
type DocumentCollectionSchema ¶
type DocumentCollectionSchema struct { bun.BaseModel `bun:"table:document_collection,alias:dc"` models.DocumentCollection }
DocumentCollectionSchema represents the schema for the DocumentCollectionDAO table.
func (*DocumentCollectionSchema) AfterCreateTable ¶
func (*DocumentCollectionSchema) AfterCreateTable( ctx context.Context, query *bun.CreateTableQuery, ) error
func (*DocumentCollectionSchema) BeforeCreateTable ¶
func (s *DocumentCollectionSchema) BeforeCreateTable( _ context.Context, _ *bun.CreateTableQuery, ) error
type DocumentSchemaTemplate ¶
type DocumentSchemaTemplate struct { bun.BaseModel `bun:"table:document,alias:d"` models.DocumentBase }
DocumentSchemaTemplate represents the schema template for Document tables. MessageEmbedding is manually added when createDocumentTable is run in order to set the correct dimensions. This means the embedding is not returned when querying using bun.
type DocumentStore ¶
type DocumentStore struct { store.BaseDocumentStore[*bun.DB] DocEmbeddingUpdateTaskCh chan []models.DocEmbeddingUpdate DocEmbeddingTaskCh chan<- []models.DocEmbeddingTask // contains filtered or unexported fields }
func NewDocumentStore ¶
func NewDocumentStore( appState *models.AppState, client *bun.DB, docEmbeddingUpdateTaskCh chan []models.DocEmbeddingUpdate, docEmbeddingTaskCh chan<- []models.DocEmbeddingTask, ) (*DocumentStore, error)
NewDocumentStore returns a new DocumentStore. Use this to correctly initialize the store.
func (*DocumentStore) CreateCollection ¶
func (ds *DocumentStore) CreateCollection( ctx context.Context, collection models.DocumentCollection, ) error
func (*DocumentStore) CreateCollectionIndex ¶
func (*DocumentStore) CreateDocuments ¶
func (*DocumentStore) DeleteCollection ¶
func (ds *DocumentStore) DeleteCollection( ctx context.Context, collectionName string, ) error
func (*DocumentStore) DeleteDocuments ¶
func (*DocumentStore) GetClient ¶
func (ds *DocumentStore) GetClient() any
func (*DocumentStore) GetCollection ¶
func (ds *DocumentStore) GetCollection( ctx context.Context, collectionName string, ) (models.DocumentCollection, error)
func (*DocumentStore) GetCollectionList ¶
func (ds *DocumentStore) GetCollectionList( ctx context.Context, ) ([]models.DocumentCollection, error)
func (*DocumentStore) GetDocuments ¶
func (*DocumentStore) SearchCollection ¶
func (ds *DocumentStore) SearchCollection( ctx context.Context, query *models.DocumentSearchPayload, limit int, withMMR bool, pageNumber int, pageSize int, ) (*models.DocumentSearchResultPage, error)
func (*DocumentStore) UpdateCollection ¶
func (ds *DocumentStore) UpdateCollection( ctx context.Context, collection models.DocumentCollection, ) error
func (*DocumentStore) UpdateDocuments ¶
type MessageStoreSchema ¶
type MessageStoreSchema struct { bun.BaseModel `bun:"table:message,alias:m"` // TODO: replace UUIDs with sortable ULIDs or UUIDv7s to avoid having to have both a UUID and an ID. // see https://blog.daveallie.com/ulid-primary-keys UUID uuid.UUID `bun:",pk,type:uuid,default:gen_random_uuid()"` // ID is used only for sorting / slicing purposes as we can't sort by CreatedAt for messages created simultaneously ID int64 `bun:",autoincrement"` CreatedAt time.Time `bun:"type:timestamptz,notnull,default:current_timestamp"` UpdatedAt time.Time `bun:"type:timestamptz,nullzero,default:current_timestamp"` DeletedAt time.Time `bun:"type:timestamptz,soft_delete,nullzero"` SessionID string `bun:",notnull"` Role string `bun:",notnull"` Content string `bun:",notnull"` TokenCount int `bun:",notnull"` Metadata map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"` Session *SessionSchema `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"` }
func (*MessageStoreSchema) AfterCreateTable ¶
func (*MessageStoreSchema) AfterCreateTable( ctx context.Context, query *bun.CreateTableQuery, ) error
func (*MessageStoreSchema) BeforeCreateTable ¶
func (s *MessageStoreSchema) BeforeCreateTable( _ context.Context, _ *bun.CreateTableQuery, ) error
type MessageVectorStoreSchema ¶
type MessageVectorStoreSchema struct { bun.BaseModel `bun:"table:message_embedding,alias:me"` UUID uuid.UUID `bun:",pk,type:uuid,default:gen_random_uuid()"` CreatedAt time.Time `bun:"type:timestamptz,notnull,default:current_timestamp"` UpdatedAt time.Time `bun:"type:timestamptz,nullzero,default:current_timestamp"` DeletedAt time.Time `bun:"type:timestamptz,soft_delete,nullzero"` SessionID string `bun:",notnull"` MessageUUID uuid.UUID `bun:"type:uuid,notnull,unique"` Embedding pgvector.Vector `bun:"type:vector(1536)"` IsEmbedded bool `bun:"type:bool,notnull,default:false"` Session *SessionSchema `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"` Message *MessageStoreSchema `bun:"rel:belongs-to,join:message_uuid=uuid,on_delete:cascade"` }
MessageVectorStoreSchema stores the embeddings for a message. TODO: Vector dims from config
func (*MessageVectorStoreSchema) AfterCreateTable ¶
func (*MessageVectorStoreSchema) AfterCreateTable( ctx context.Context, query *bun.CreateTableQuery, ) error
func (*MessageVectorStoreSchema) BeforeCreateTable ¶
func (s *MessageVectorStoreSchema) BeforeCreateTable( _ context.Context, _ *bun.CreateTableQuery, ) error
type PostgresMemoryStore ¶
type PostgresMemoryStore struct { store.BaseMemoryStore[*bun.DB] }
func NewPostgresMemoryStore ¶
func NewPostgresMemoryStore( appState *models.AppState, client *bun.DB, ) (*PostgresMemoryStore, error)
NewPostgresMemoryStore returns a new PostgresMemoryStore. Use this to correctly initialize the store.
func (*PostgresMemoryStore) Close ¶
func (pms *PostgresMemoryStore) Close() error
func (*PostgresMemoryStore) DeleteSession ¶
func (pms *PostgresMemoryStore) DeleteSession(ctx context.Context, sessionID string) error
DeleteSession deletes a session from the memory store. This is a soft Delete. TODO: A hard Delete will be implemented as an out-of-band process or left to the implementer.
func (*PostgresMemoryStore) GetClient ¶
func (pms *PostgresMemoryStore) GetClient() *bun.DB
func (*PostgresMemoryStore) GetMemory ¶
func (pms *PostgresMemoryStore) GetMemory( ctx context.Context, appState *models.AppState, sessionID string, lastNMessages int, ) (*models.Memory, error)
GetMemory returns the most recent Summary and a list of messages for a given sessionID. GetMemory returns:
- the most recent Summary, if one exists
- the lastNMessages messages, if lastNMessages > 0
- all messages since the last SummaryPoint, if lastNMessages == 0
- if no Summary (and no SummaryPoint) exists and lastNMessages == 0, returns all undeleted messages
func (*PostgresMemoryStore) GetMessageVectors ¶
func (pms *PostgresMemoryStore) GetMessageVectors(ctx context.Context, _ *models.AppState, sessionID string, ) ([]models.MessageEmbedding, error)
func (*PostgresMemoryStore) GetSession ¶
func (pms *PostgresMemoryStore) GetSession( ctx context.Context, _ *models.AppState, sessionID string, ) (*models.Session, error)
GetSession retrieves a Session for a given sessionID.
func (*PostgresMemoryStore) GetSummary ¶
func (*PostgresMemoryStore) PurgeDeleted ¶
func (pms *PostgresMemoryStore) PurgeDeleted(ctx context.Context) error
func (*PostgresMemoryStore) PutMessageMetadata ¶
func (*PostgresMemoryStore) PutMessageVectors ¶
func (pms *PostgresMemoryStore) PutMessageVectors(ctx context.Context, _ *models.AppState, sessionID string, embeddings []models.MessageEmbedding, ) error
func (*PostgresMemoryStore) PutSession ¶
func (pms *PostgresMemoryStore) PutSession( ctx context.Context, _ *models.AppState, session *models.Session, ) error
PutSession creates or updates a Session for a given sessionID.
func (*PostgresMemoryStore) PutSummary ¶
func (*PostgresMemoryStore) SearchMemory ¶
func (pms *PostgresMemoryStore) SearchMemory( ctx context.Context, appState *models.AppState, sessionID string, query *models.MemorySearchPayload, limit int, ) ([]models.MemorySearchResult, error)
type SessionSchema ¶
type SessionSchema struct { bun.BaseModel `bun:"table:session,alias:s"` UUID uuid.UUID `bun:",pk,type:uuid,default:gen_random_uuid()"` SessionID string `bun:",unique,notnull"` CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` DeletedAt time.Time `bun:"type:timestamptz,soft_delete,nullzero"` Metadata map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"` }
func (*SessionSchema) AfterCreateTable ¶
func (*SessionSchema) AfterCreateTable( ctx context.Context, query *bun.CreateTableQuery, ) error
func (*SessionSchema) BeforeCreateTable ¶
func (s *SessionSchema) BeforeCreateTable( _ context.Context, _ *bun.CreateTableQuery, ) error
BeforeCreateTable is a marker method to ensure uniform interface across all table models - used in table creation iterator
type SummaryStoreSchema ¶
type SummaryStoreSchema struct { bun.BaseModel `bun:"table:summary,alias:su"` UUID uuid.UUID `bun:",pk,type:uuid,default:gen_random_uuid()"` CreatedAt time.Time `bun:"type:timestamptz,notnull,default:current_timestamp"` UpdatedAt time.Time `bun:"type:timestamptz,nullzero,default:current_timestamp"` DeletedAt time.Time `bun:"type:timestamptz,soft_delete,nullzero"` SessionID string `bun:",notnull"` Content string `bun:",nullzero"` // allow null as we might want to use Metadata without a summary Metadata map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"` TokenCount int `bun:",notnull"` SummaryPointUUID uuid.UUID `bun:"type:uuid,notnull,unique"` // the UUID of the most recent message that was used to create the summary Session *SessionSchema `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"` Message *MessageStoreSchema `bun:"rel:belongs-to,join:summary_point_uuid=uuid,on_delete:cascade"` }
func (*SummaryStoreSchema) AfterCreateTable ¶
func (*SummaryStoreSchema) AfterCreateTable( ctx context.Context, query *bun.CreateTableQuery, ) error
func (*SummaryStoreSchema) BeforeCreateTable ¶
func (s *SummaryStoreSchema) BeforeCreateTable( _ context.Context, _ *bun.CreateTableQuery, ) error
type VectorColIndex ¶
type VectorColIndex struct { Collection models.DocumentCollection ColName string RowCount int ListCount int ProbeCount int // contains filtered or unexported fields }
func NewVectorColIndex ¶
func NewVectorColIndex( ctx context.Context, appState *models.AppState, collection models.DocumentCollection, ) (*VectorColIndex, error)
func (*VectorColIndex) CalculateListCount ¶
func (vci *VectorColIndex) CalculateListCount() error
CalculateListCount calculates the number of lists to use for the index.
func (*VectorColIndex) CalculateProbes ¶
func (vci *VectorColIndex) CalculateProbes() error
func (*VectorColIndex) CreateIndex ¶
func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error