Documentation ¶
Index ¶
- Variables
- type BatchIndexer
- type Document
- type DocumentError
- type ErrSchemaAlreadyExists
- type ErrSchemaNotFound
- type ErrSchemaUpdateOutOfOrder
- type ErrTypeInvalid
- type IndexerConfig
- type Mapper
- type Option
- type Severity
- type Store
- type StoreOption
- type StoreRetrier
- func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
- func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
- func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
- func (s *StoreRetrier) GetMapper() Mapper
- func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
- type StoreRetryConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrRetriable = errors.New("retriable error")
)
Functions ¶
This section is empty.
Types ¶
type BatchIndexer ¶
type BatchIndexer struct {
// contains filtered or unexported fields
}
BatchIndexer is the environment for ingesting the WAL logical replication events into a search store using the pgstream flow
func NewBatchIndexer ¶
func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsnParser replication.LSNParser, opts ...Option) *BatchIndexer
NewBatchIndexer returns a processor of wal events that indexes data into the search store provided on input.
func (*BatchIndexer) Close ¶
func (i *BatchIndexer) Close() error
func (*BatchIndexer) Name ¶
func (i *BatchIndexer) Name() string
func (*BatchIndexer) ProcessWALEvent ¶
ProcessWALEvent is called on every new message from the WAL logical replication The function is responsible for sending the data to the search store and committing the event position.
type DocumentError ¶
type ErrSchemaAlreadyExists ¶
type ErrSchemaAlreadyExists struct {
SchemaName string
}
func (ErrSchemaAlreadyExists) Error ¶
func (e ErrSchemaAlreadyExists) Error() string
type ErrSchemaNotFound ¶
type ErrSchemaNotFound struct {
SchemaName string
}
func (ErrSchemaNotFound) Error ¶
func (e ErrSchemaNotFound) Error() string
type ErrSchemaUpdateOutOfOrder ¶
type ErrSchemaUpdateOutOfOrder struct { SchemaName string SchemaID string NewVersion int CurrentVersion int CurrentCreatedAt time.Time NewCreatedAt time.Time }
func (ErrSchemaUpdateOutOfOrder) Error ¶
func (e ErrSchemaUpdateOutOfOrder) Error() string
type ErrTypeInvalid ¶
type ErrTypeInvalid struct {
Input string
}
func (ErrTypeInvalid) Error ¶
func (e ErrTypeInvalid) Error() string
type IndexerConfig ¶
type IndexerConfig struct { // BatchSize is the max number of wal events accumulated before triggering a // send to the search store. Defaults to 100 BatchSize int // BatchTime is the max time interval at which the batch sending to the search // store is triggered. Defaults to 1s BatchTime time.Duration // MaxQueueBytes is the max memory used by the batch indexer for inflight // batches. Defaults to 100MiB MaxQueueBytes int64 // CleanupBackoff is the retry policy to follow for the async index // deletion. If no config is provided, no retry policy is applied. CleanupBackoff backoff.Config }
type Option ¶
type Option func(*BatchIndexer)
func WithCheckpoint ¶
func WithCheckpoint(c checkpointer.Checkpoint) Option
func WithLogger ¶
type Store ¶
type Store interface { GetMapper() Mapper // schema operations ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error DeleteSchema(ctx context.Context, schemaName string) error // data operations DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error) }
type StoreOption ¶
type StoreOption func(*StoreRetrier)
func WithStoreLogger ¶
func WithStoreLogger(logger loglib.Logger) StoreOption
type StoreRetrier ¶
type StoreRetrier struct {
// contains filtered or unexported fields
}
StoreRetrier applies a retry strategy to failed search store operations.
func NewStoreRetrier ¶
func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier
func (*StoreRetrier) ApplySchemaChange ¶
func (*StoreRetrier) DeleteSchema ¶
func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
func (*StoreRetrier) DeleteTableDocuments ¶
func (*StoreRetrier) GetMapper ¶
func (s *StoreRetrier) GetMapper() Mapper
func (*StoreRetrier) SendDocuments ¶
func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
SendDocuments will go over failed documents, identifying any with retriable errors and retrying them with the configured backoff policy.
type StoreRetryConfig ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.