search

package
v0.0.0-...-61eb987 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRetriable = errors.New("retriable error")
)

Functions

This section is empty.

Types

type BatchIndexer

type BatchIndexer struct {
	// contains filtered or unexported fields
}

BatchIndexer is the environment for ingesting the WAL logical replication events into a search store using the pgstream flow

func NewBatchIndexer

func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsnParser replication.LSNParser, opts ...Option) *BatchIndexer

NewBatchIndexer returns a processor of wal events that indexes data into the search store provided on input.

func (*BatchIndexer) Close

func (i *BatchIndexer) Close() error

func (*BatchIndexer) Name

func (i *BatchIndexer) Name() string

func (*BatchIndexer) ProcessWALEvent

func (i *BatchIndexer) ProcessWALEvent(ctx context.Context, event *wal.Event) (err error)

ProcessWALEvent is called on every new message from the WAL logical replication The function is responsible for sending the data to the search store and committing the event position.

func (*BatchIndexer) Send

func (i *BatchIndexer) Send(ctx context.Context) error

type Document

type Document struct {
	ID      string
	Schema  string
	Data    map[string]any
	Version int
	Delete  bool
}

type DocumentError

type DocumentError struct {
	Document Document
	Severity Severity
	Error    string
}

type ErrSchemaAlreadyExists

type ErrSchemaAlreadyExists struct {
	SchemaName string
}

func (ErrSchemaAlreadyExists) Error

func (e ErrSchemaAlreadyExists) Error() string

type ErrSchemaNotFound

type ErrSchemaNotFound struct {
	SchemaName string
}

func (ErrSchemaNotFound) Error

func (e ErrSchemaNotFound) Error() string

type ErrSchemaUpdateOutOfOrder

type ErrSchemaUpdateOutOfOrder struct {
	SchemaName       string
	SchemaID         string
	NewVersion       int
	CurrentVersion   int
	CurrentCreatedAt time.Time
	NewCreatedAt     time.Time
}

func (ErrSchemaUpdateOutOfOrder) Error

type ErrTypeInvalid

type ErrTypeInvalid struct {
	Input string
}

func (ErrTypeInvalid) Error

func (e ErrTypeInvalid) Error() string

type IndexerConfig

type IndexerConfig struct {
	// BatchSize is the max number of wal events accumulated before triggering a
	// send to the search store. Defaults to 100
	BatchSize int
	// BatchTime is the max time interval at which the batch sending to the search
	// store is triggered. Defaults to 1s
	BatchTime time.Duration
	// MaxQueueBytes is the max memory used by the batch indexer for inflight
	// batches. Defaults to 100MiB
	MaxQueueBytes int64
	// CleanupBackoff is the retry policy to follow for the async index
	// deletion. If no config is provided, no retry policy is applied.
	CleanupBackoff backoff.Config
}

type Mapper

type Mapper interface {
	ColumnToSearchMapping(column schemalog.Column) (map[string]any, error)
	MapColumnValue(column schemalog.Column, value any) (any, error)
}

type Option

type Option func(*BatchIndexer)

func WithCheckpoint

func WithCheckpoint(c checkpointer.Checkpoint) Option

func WithLogger

func WithLogger(l loglib.Logger) Option

type Severity

type Severity uint
const (
	SeverityNone Severity = iota
	SeverityDataLoss
	SeverityIgnored
	SeverityRetriable
)

func (*Severity) String

func (s *Severity) String() string

type Store

type Store interface {
	GetMapper() Mapper
	// schema operations
	ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
	DeleteSchema(ctx context.Context, schemaName string) error
	// data operations
	DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
	SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
}

type StoreOption

type StoreOption func(*StoreRetrier)

func WithStoreLogger

func WithStoreLogger(logger loglib.Logger) StoreOption

type StoreRetrier

type StoreRetrier struct {
	// contains filtered or unexported fields
}

StoreRetrier applies a retry strategy to failed search store operations.

func NewStoreRetrier

func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier

func (*StoreRetrier) ApplySchemaChange

func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error

func (*StoreRetrier) DeleteSchema

func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error

func (*StoreRetrier) DeleteTableDocuments

func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error

func (*StoreRetrier) GetMapper

func (s *StoreRetrier) GetMapper() Mapper

func (*StoreRetrier) SendDocuments

func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)

SendDocuments will go over failed documents, identifying any with retriable errors and retrying them with the configured backoff policy.

type StoreRetryConfig

type StoreRetryConfig struct {
	// If not provided it defaults to using exponential backoff with initial
	// interval of 1s, max interval of 1min, and 0 max retries.
	Backoff backoff.Config
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL