search

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRetriable = errors.New("retriable error")
)

Functions

This section is empty.

Types

type BatchIndexer

type BatchIndexer struct {
	// contains filtered or unexported fields
}

BatchIndexer is the environment for ingesting the WAL logical replication events into a search store using the pgstream flow

func NewBatchIndexer

func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsnParser replication.LSNParser, opts ...Option) *BatchIndexer

NewBatchIndexer returns a processor of wal events that indexes data into the search store provided on input.

func (*BatchIndexer) Close

func (i *BatchIndexer) Close() error

func (*BatchIndexer) Name

func (i *BatchIndexer) Name() string

func (*BatchIndexer) ProcessWALEvent

func (i *BatchIndexer) ProcessWALEvent(ctx context.Context, event *wal.Event) (err error)

ProcessWALEvent is responsible for sending the wal event to the search store and committing the event position. It can be called concurrently.

func (*BatchIndexer) Send

func (i *BatchIndexer) Send(ctx context.Context) error

type Document

type Document struct {
	ID      string
	Schema  string
	Data    map[string]any
	Version int
	Delete  bool
}

type DocumentError

type DocumentError struct {
	Document Document
	Severity Severity
	Error    string
}

type ErrSchemaAlreadyExists

type ErrSchemaAlreadyExists struct {
	SchemaName string
}

func (ErrSchemaAlreadyExists) Error

func (e ErrSchemaAlreadyExists) Error() string

type ErrSchemaNotFound

type ErrSchemaNotFound struct {
	SchemaName string
}

func (ErrSchemaNotFound) Error

func (e ErrSchemaNotFound) Error() string

type ErrSchemaUpdateOutOfOrder

type ErrSchemaUpdateOutOfOrder struct {
	SchemaName       string
	SchemaID         string
	NewVersion       int
	CurrentVersion   int
	CurrentCreatedAt time.Time
	NewCreatedAt     time.Time
}

func (ErrSchemaUpdateOutOfOrder) Error

type ErrTypeInvalid

type ErrTypeInvalid struct {
	Input string
}

func (ErrTypeInvalid) Error

func (e ErrTypeInvalid) Error() string

type IndexerConfig

type IndexerConfig struct {
	// BatchSize is the max number of wal events accumulated before triggering a
	// send to the search store. Defaults to 100
	BatchSize int
	// BatchTime is the max time interval at which the batch sending to the search
	// store is triggered. Defaults to 1s
	BatchTime time.Duration
	// MaxQueueBytes is the max memory used by the batch indexer for inflight
	// batches. Defaults to 100MiB
	MaxQueueBytes int64
}

type Mapper

type Mapper interface {
	ColumnToSearchMapping(column schemalog.Column) (map[string]any, error)
	MapColumnValue(column schemalog.Column, value any) (any, error)
}

type Option

type Option func(*BatchIndexer)

func WithCheckpoint

func WithCheckpoint(c checkpointer.Checkpoint) Option

func WithLogger

func WithLogger(l loglib.Logger) Option

type Severity

type Severity uint
const (
	SeverityNone Severity = iota
	SeverityDataLoss
	SeverityIgnored
	SeverityRetriable
)

func (*Severity) String

func (s *Severity) String() string

type Store

type Store interface {
	GetMapper() Mapper
	// schema operations
	ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
	DeleteSchema(ctx context.Context, schemaName string) error
	// data operations
	DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
	SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
}

type StoreOption

type StoreOption func(*StoreRetrier)

func WithStoreLogger

func WithStoreLogger(logger loglib.Logger) StoreOption

type StoreRetrier

type StoreRetrier struct {
	// contains filtered or unexported fields
}

StoreRetrier applies a retry strategy to failed search store operations.

func NewStoreRetrier

func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier

func (*StoreRetrier) ApplySchemaChange

func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error

func (*StoreRetrier) DeleteSchema

func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error

func (*StoreRetrier) DeleteTableDocuments

func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error

func (*StoreRetrier) GetMapper

func (s *StoreRetrier) GetMapper() Mapper

func (*StoreRetrier) SendDocuments

func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)

SendDocuments will go over failed documents, identifying any with retriable errors and retrying them with the configured backoff policy.

type StoreRetryConfig

type StoreRetryConfig struct {
	// If not provided it defaults to using exponential backoff with initial
	// interval of 1s, max interval of 1min, and 0 max retries.
	Backoff backoff.Config
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL