postgres

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultDistanceFunction = "cosine"

DefaultDistanceFunction is the default distance function to use for indexing. Using cosine distance function by default in order to support both normalized and non-normalized embeddings. A future improvement would be to use a the inner product distance function for normalized embeddings.

View Source
const DefaultDocumentSearchLimit = 20
View Source
const DefaultMemorySearchLimit = 10
View Source
const EmbeddingColName = "embedding"
View Source
const MaxParallelWorkersPerGather = 4
View Source
const MinRowsForIndex = 10000

MinRowsForIndex is the minimum number of rows required to create an index. The pgvector docs recommend creating the index after a representative sample of data is loaded. This is a guesstimate.

Variables

View Source
var IndexMutexMap = make(map[string]*sync.Mutex)

IndexMutexMap stores a mutex for each collection.

Functions

func CleanDB

func CleanDB(t *testing.T, db *bun.DB)

func NewPostgresConn

func NewPostgresConn(appState *models.AppState) *bun.DB

NewPostgresConn creates a new bun.DB connection to a postgres database using the provided DSN. The connection is configured to pool connections based on the number of PROCs available.

Types

type DocumentCollectionDAO

type DocumentCollectionDAO struct {
	models.DocumentCollection
	// contains filtered or unexported fields
}

func NewDocumentCollectionDAO

func NewDocumentCollectionDAO(
	appState *models.AppState,
	db *bun.DB,
	collection models.DocumentCollection,
) *DocumentCollectionDAO

func (*DocumentCollectionDAO) Create

func (dc *DocumentCollectionDAO) Create(
	ctx context.Context,
) error

Create inserts a collection into the collections table and creates a table for the collection's documents.

func (*DocumentCollectionDAO) CreateDocuments

func (dc *DocumentCollectionDAO) CreateDocuments(
	ctx context.Context,
	documents []models.Document,
) ([]uuid.UUID, error)

CreateDocuments inserts the given documents into the given collection.

func (*DocumentCollectionDAO) Delete

func (dc *DocumentCollectionDAO) Delete(ctx context.Context) error

Delete deletes a collection from the collections table and drops the collection's document table.

func (*DocumentCollectionDAO) DeleteDocumentsByUUID

func (dc *DocumentCollectionDAO) DeleteDocumentsByUUID(
	ctx context.Context,
	documentUUIDs []uuid.UUID,
) error

DeleteDocumentsByUUID deletes a single document from a collection in the SqlDB, identified by its UUID.

func (*DocumentCollectionDAO) GetAll

GetAll returns a list of all collections from the collections table.

func (*DocumentCollectionDAO) GetByName

func (dc *DocumentCollectionDAO) GetByName(
	ctx context.Context,
) error

GetByName returns a collection from the collections table by name.

func (*DocumentCollectionDAO) GetDocuments

func (dc *DocumentCollectionDAO) GetDocuments(
	ctx context.Context,
	limit int,
	uuids []uuid.UUID,
	documentIDs []string,
) ([]models.Document, error)

GetDocuments retrieves documents. If `documents` is non-Nil, it will use the document UUIDs to retrieve these documents. Otherwise, it will retrieve all documents. If limit is greater than 0, it will only retrieve limit many documents.

func (*DocumentCollectionDAO) SearchDocuments

func (dc *DocumentCollectionDAO) SearchDocuments(ctx context.Context,
	query *models.DocumentSearchPayload,
	limit int,
	withMMR bool,
	pageNumber int,
	pageSize int) (*models.DocumentSearchResultPage, error)

SearchDocuments searches for documents in a collection. Currently pagination is not supported.

func (*DocumentCollectionDAO) Update

func (dc *DocumentCollectionDAO) Update(
	ctx context.Context,
) error

Update updates a collection in the collections table.

func (*DocumentCollectionDAO) UpdateDocuments

func (dc *DocumentCollectionDAO) UpdateDocuments(
	ctx context.Context,
	documents []models.Document,
) error

UpdateDocuments updates the document_id, metadata, and embedding columns of the given documents in the given collection. The documents must have non-nil uuids. **IMPORTANT:** We determine which columns to update based on the fields that are non-zero in the given documents. This means that all documents must have data for the same fields. If a document is missing data for a field, there could be data loss.

type DocumentCollectionSchema

type DocumentCollectionSchema struct {
	bun.BaseModel `bun:"table:document_collection,alias:dc"`
	models.DocumentCollection
}

DocumentCollectionSchema represents the schema for the DocumentCollectionDAO table.

func (*DocumentCollectionSchema) AfterCreateTable

func (*DocumentCollectionSchema) AfterCreateTable(
	ctx context.Context,
	query *bun.CreateTableQuery,
) error

func (*DocumentCollectionSchema) BeforeCreateTable

func (s *DocumentCollectionSchema) BeforeCreateTable(
	_ context.Context,
	_ *bun.CreateTableQuery,
) error

type DocumentSchemaTemplate

type DocumentSchemaTemplate struct {
	bun.BaseModel `bun:"table:document,alias:d"`
	models.DocumentBase
}

DocumentSchemaTemplate represents the schema template for Document tables. MessageEmbedding is manually added when createDocumentTable is run in order to set the correct dimensions. This means the embedding is not returned when querying using bun.

type DocumentStore

type DocumentStore struct {
	store.BaseDocumentStore[*bun.DB]

	DocEmbeddingUpdateTaskCh chan []models.DocEmbeddingUpdate
	DocEmbeddingTaskCh       chan<- []models.DocEmbeddingTask
	// contains filtered or unexported fields
}

func NewDocumentStore

func NewDocumentStore(
	appState *models.AppState,
	client *bun.DB,
	docEmbeddingUpdateTaskCh chan []models.DocEmbeddingUpdate,
	docEmbeddingTaskCh chan<- []models.DocEmbeddingTask,
) (*DocumentStore, error)

NewDocumentStore returns a new DocumentStore. Use this to correctly initialize the store.

func (*DocumentStore) CreateCollection

func (ds *DocumentStore) CreateCollection(
	ctx context.Context,
	collection models.DocumentCollection,
) error

func (*DocumentStore) CreateCollectionIndex

func (ds *DocumentStore) CreateCollectionIndex(
	ctx context.Context,
	collectionName string,
	force bool,
) error

func (*DocumentStore) CreateDocuments

func (ds *DocumentStore) CreateDocuments(
	ctx context.Context,
	collectionName string,
	documents []models.Document,
) ([]uuid.UUID, error)

func (*DocumentStore) DeleteCollection

func (ds *DocumentStore) DeleteCollection(
	ctx context.Context,
	collectionName string,
) error

func (*DocumentStore) DeleteDocuments

func (ds *DocumentStore) DeleteDocuments(
	ctx context.Context,
	collectionName string,
	documentUUID []uuid.UUID,
) error

func (*DocumentStore) GetClient

func (ds *DocumentStore) GetClient() any

func (*DocumentStore) GetCollection

func (ds *DocumentStore) GetCollection(
	ctx context.Context,
	collectionName string,
) (models.DocumentCollection, error)

func (*DocumentStore) GetCollectionList

func (ds *DocumentStore) GetCollectionList(
	ctx context.Context,
) ([]models.DocumentCollection, error)

func (*DocumentStore) GetDocuments

func (ds *DocumentStore) GetDocuments(
	ctx context.Context,
	collectionName string,
	uuids []uuid.UUID,
	documentIDs []string,
) ([]models.Document, error)

func (*DocumentStore) OnStart

func (ds *DocumentStore) OnStart(
	ctx context.Context,
) error

func (*DocumentStore) SearchCollection

func (ds *DocumentStore) SearchCollection(
	ctx context.Context,
	query *models.DocumentSearchPayload,
	limit int,
	withMMR bool,
	pageNumber int,
	pageSize int,
) (*models.DocumentSearchResultPage, error)

func (*DocumentStore) Shutdown

func (ds *DocumentStore) Shutdown(_ context.Context) error

func (*DocumentStore) UpdateCollection

func (ds *DocumentStore) UpdateCollection(
	ctx context.Context,
	collection models.DocumentCollection,
) error

func (*DocumentStore) UpdateDocuments

func (ds *DocumentStore) UpdateDocuments(
	ctx context.Context,
	collectionName string,
	documents []models.Document,
) error

type JSONQuery

type JSONQuery struct {
	JSONPath string       `json:"jsonpath"`
	And      []*JSONQuery `json:"and,omitempty"`
	Or       []*JSONQuery `json:"or,omitempty"`
}

type MessageStoreSchema

type MessageStoreSchema struct {
	bun.BaseModel `bun:"table:message,alias:m"`

	// TODO: replace UUIDs with sortable ULIDs or UUIDv7s to avoid having to have both a UUID and an ID.
	// see https://blog.daveallie.com/ulid-primary-keys
	UUID uuid.UUID `bun:",pk,type:uuid,default:gen_random_uuid()"`
	// ID is used only for sorting / slicing purposes as we can't sort by CreatedAt for messages created simultaneously
	ID         int64                  `bun:",autoincrement"`
	CreatedAt  time.Time              `bun:"type:timestamptz,notnull,default:current_timestamp"`
	UpdatedAt  time.Time              `bun:"type:timestamptz,nullzero,default:current_timestamp"`
	DeletedAt  time.Time              `bun:"type:timestamptz,soft_delete,nullzero"`
	SessionID  string                 `bun:",notnull"`
	Role       string                 `bun:",notnull"`
	Content    string                 `bun:",notnull"`
	TokenCount int                    `bun:",notnull"`
	Metadata   map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"`
	Session    *SessionSchema         `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"`
}

func (*MessageStoreSchema) AfterCreateTable

func (*MessageStoreSchema) AfterCreateTable(
	ctx context.Context,
	query *bun.CreateTableQuery,
) error

func (*MessageStoreSchema) BeforeCreateTable

func (s *MessageStoreSchema) BeforeCreateTable(
	_ context.Context,
	_ *bun.CreateTableQuery,
) error

type MessageVectorStoreSchema

type MessageVectorStoreSchema struct {
	bun.BaseModel `bun:"table:message_embedding,alias:me"`

	UUID        uuid.UUID           `bun:",pk,type:uuid,default:gen_random_uuid()"`
	CreatedAt   time.Time           `bun:"type:timestamptz,notnull,default:current_timestamp"`
	UpdatedAt   time.Time           `bun:"type:timestamptz,nullzero,default:current_timestamp"`
	DeletedAt   time.Time           `bun:"type:timestamptz,soft_delete,nullzero"`
	SessionID   string              `bun:",notnull"`
	MessageUUID uuid.UUID           `bun:"type:uuid,notnull,unique"`
	Embedding   pgvector.Vector     `bun:"type:vector(1536)"`
	IsEmbedded  bool                `bun:"type:bool,notnull,default:false"`
	Session     *SessionSchema      `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"`
	Message     *MessageStoreSchema `bun:"rel:belongs-to,join:message_uuid=uuid,on_delete:cascade"`
}

MessageVectorStoreSchema stores the embeddings for a message. TODO: Vector dims from config

func (*MessageVectorStoreSchema) AfterCreateTable

func (*MessageVectorStoreSchema) AfterCreateTable(
	ctx context.Context,
	query *bun.CreateTableQuery,
) error

func (*MessageVectorStoreSchema) BeforeCreateTable

func (s *MessageVectorStoreSchema) BeforeCreateTable(
	_ context.Context,
	_ *bun.CreateTableQuery,
) error

type PostgresMemoryStore

type PostgresMemoryStore struct {
	store.BaseMemoryStore[*bun.DB]
}

func NewPostgresMemoryStore

func NewPostgresMemoryStore(
	appState *models.AppState,
	client *bun.DB,
) (*PostgresMemoryStore, error)

NewPostgresMemoryStore returns a new PostgresMemoryStore. Use this to correctly initialize the store.

func (*PostgresMemoryStore) Close

func (pms *PostgresMemoryStore) Close() error

func (*PostgresMemoryStore) DeleteSession

func (pms *PostgresMemoryStore) DeleteSession(ctx context.Context, sessionID string) error

DeleteSession deletes a session from the memory store. This is a soft Delete. TODO: A hard Delete will be implemented as an out-of-band process or left to the implementer.

func (*PostgresMemoryStore) GetClient

func (pms *PostgresMemoryStore) GetClient() *bun.DB

func (*PostgresMemoryStore) GetMemory

func (pms *PostgresMemoryStore) GetMemory(
	ctx context.Context,
	appState *models.AppState,
	sessionID string,
	lastNMessages int,
) (*models.Memory, error)

GetMemory returns the most recent Summary and a list of messages for a given sessionID. GetMemory returns:

  • the most recent Summary, if one exists
  • the lastNMessages messages, if lastNMessages > 0
  • all messages since the last SummaryPoint, if lastNMessages == 0
  • if no Summary (and no SummaryPoint) exists and lastNMessages == 0, returns all undeleted messages

func (*PostgresMemoryStore) GetMessageVectors

func (pms *PostgresMemoryStore) GetMessageVectors(ctx context.Context,
	_ *models.AppState,
	sessionID string,
) ([]models.MessageEmbedding, error)

func (*PostgresMemoryStore) GetSession

func (pms *PostgresMemoryStore) GetSession(
	ctx context.Context,
	_ *models.AppState,
	sessionID string,
) (*models.Session, error)

GetSession retrieves a Session for a given sessionID.

func (*PostgresMemoryStore) GetSummary

func (pms *PostgresMemoryStore) GetSummary(
	ctx context.Context,
	_ *models.AppState,
	sessionID string,
) (*models.Summary, error)

func (*PostgresMemoryStore) OnStart

func (pms *PostgresMemoryStore) OnStart(
	_ context.Context,
	appState *models.AppState,
) error

func (*PostgresMemoryStore) PurgeDeleted

func (pms *PostgresMemoryStore) PurgeDeleted(ctx context.Context) error

func (*PostgresMemoryStore) PutMemory

func (pms *PostgresMemoryStore) PutMemory(
	ctx context.Context,
	appState *models.AppState,
	sessionID string,
	memoryMessages *models.Memory,
	skipNotify bool,
) error

func (*PostgresMemoryStore) PutMessageMetadata

func (pms *PostgresMemoryStore) PutMessageMetadata(
	ctx context.Context,
	_ *models.AppState,
	sessionID string,
	messages []models.Message,
	isPrivileged bool,
) error

func (*PostgresMemoryStore) PutMessageVectors

func (pms *PostgresMemoryStore) PutMessageVectors(ctx context.Context,
	_ *models.AppState,
	sessionID string,
	embeddings []models.MessageEmbedding,
) error

func (*PostgresMemoryStore) PutSession

func (pms *PostgresMemoryStore) PutSession(
	ctx context.Context,
	_ *models.AppState,
	session *models.Session,
) error

PutSession creates or updates a Session for a given sessionID.

func (*PostgresMemoryStore) PutSummary

func (pms *PostgresMemoryStore) PutSummary(
	ctx context.Context,
	_ *models.AppState,
	sessionID string,
	summary *models.Summary,
) error

func (*PostgresMemoryStore) SearchMemory

func (pms *PostgresMemoryStore) SearchMemory(
	ctx context.Context,
	appState *models.AppState,
	sessionID string,
	query *models.MemorySearchPayload,
	limit int,
) ([]models.MemorySearchResult, error)

type SessionSchema

type SessionSchema struct {
	bun.BaseModel `bun:"table:session,alias:s"`

	UUID      uuid.UUID              `bun:",pk,type:uuid,default:gen_random_uuid()"`
	SessionID string                 `bun:",unique,notnull"`
	CreatedAt time.Time              `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
	UpdatedAt time.Time              `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
	DeletedAt time.Time              `bun:"type:timestamptz,soft_delete,nullzero"`
	Metadata  map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"`
}

func (*SessionSchema) AfterCreateTable

func (*SessionSchema) AfterCreateTable(
	ctx context.Context,
	query *bun.CreateTableQuery,
) error

func (*SessionSchema) BeforeCreateTable

func (s *SessionSchema) BeforeCreateTable(
	_ context.Context,
	_ *bun.CreateTableQuery,
) error

BeforeCreateTable is a marker method to ensure uniform interface across all table models - used in table creation iterator

type SummaryStoreSchema

type SummaryStoreSchema struct {
	bun.BaseModel `bun:"table:summary,alias:su"`

	UUID             uuid.UUID              `bun:",pk,type:uuid,default:gen_random_uuid()"`
	CreatedAt        time.Time              `bun:"type:timestamptz,notnull,default:current_timestamp"`
	UpdatedAt        time.Time              `bun:"type:timestamptz,nullzero,default:current_timestamp"`
	DeletedAt        time.Time              `bun:"type:timestamptz,soft_delete,nullzero"`
	SessionID        string                 `bun:",notnull"`
	Content          string                 `bun:",nullzero"` // allow null as we might want to use Metadata without a summary
	Metadata         map[string]interface{} `bun:"type:jsonb,nullzero,json_use_number"`
	TokenCount       int                    `bun:",notnull"`
	SummaryPointUUID uuid.UUID              `bun:"type:uuid,notnull,unique"` // the UUID of the most recent message that was used to create the summary
	Session          *SessionSchema         `bun:"rel:belongs-to,join:session_id=session_id,on_delete:cascade"`
	Message          *MessageStoreSchema    `bun:"rel:belongs-to,join:summary_point_uuid=uuid,on_delete:cascade"`
}

func (*SummaryStoreSchema) AfterCreateTable

func (*SummaryStoreSchema) AfterCreateTable(
	ctx context.Context,
	query *bun.CreateTableQuery,
) error

func (*SummaryStoreSchema) BeforeCreateTable

func (s *SummaryStoreSchema) BeforeCreateTable(
	_ context.Context,
	_ *bun.CreateTableQuery,
) error

type VectorColIndex

type VectorColIndex struct {
	Collection models.DocumentCollection
	ColName    string
	RowCount   int
	ListCount  int
	ProbeCount int
	// contains filtered or unexported fields
}

func NewVectorColIndex

func NewVectorColIndex(
	ctx context.Context,
	appState *models.AppState,
	collection models.DocumentCollection,
) (*VectorColIndex, error)

func (*VectorColIndex) CalculateListCount

func (vci *VectorColIndex) CalculateListCount() error

CalculateListCount calculates the number of lists to use for the index.

func (*VectorColIndex) CalculateProbes

func (vci *VectorColIndex) CalculateProbes() error

func (*VectorColIndex) CountRows

func (vci *VectorColIndex) CountRows(ctx context.Context) error

func (*VectorColIndex) CreateIndex

func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL