Documentation
¶
Index ¶
- Variables
- type BatchIndexer
- type Document
- type DocumentError
- type ErrSchemaAlreadyExists
- type ErrSchemaNotFound
- type ErrSchemaUpdateOutOfOrder
- type ErrTypeInvalid
- type IndexerConfig
- type Mapper
- type Option
- type Severity
- type Store
- type StoreOption
- type StoreRetrier
- func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error
- func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
- func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error
- func (s *StoreRetrier) GetMapper() Mapper
- func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
- type StoreRetryConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrRetriable = errors.New("retriable error")
)
Functions ¶
This section is empty.
Types ¶
type BatchIndexer ¶
type BatchIndexer struct {
// contains filtered or unexported fields
}
BatchIndexer is the environment for ingesting the WAL logical replication events into a search store using the pgstream flow
func NewBatchIndexer ¶
func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsnParser replication.LSNParser, opts ...Option) *BatchIndexer
NewBatchIndexer returns a processor of wal events that indexes data into the search store provided on input.
func (*BatchIndexer) Close ¶
func (i *BatchIndexer) Close() error
func (*BatchIndexer) Name ¶
func (i *BatchIndexer) Name() string
func (*BatchIndexer) ProcessWALEvent ¶
ProcessWALEvent is responsible for sending the wal event to the search store and committing the event position. It can be called concurrently.
type DocumentError ¶
type ErrSchemaAlreadyExists ¶
type ErrSchemaAlreadyExists struct {
SchemaName string
}
func (ErrSchemaAlreadyExists) Error ¶
func (e ErrSchemaAlreadyExists) Error() string
type ErrSchemaNotFound ¶
type ErrSchemaNotFound struct {
SchemaName string
}
func (ErrSchemaNotFound) Error ¶
func (e ErrSchemaNotFound) Error() string
type ErrSchemaUpdateOutOfOrder ¶
type ErrSchemaUpdateOutOfOrder struct { SchemaName string SchemaID string NewVersion int CurrentVersion int CurrentCreatedAt time.Time NewCreatedAt time.Time }
func (ErrSchemaUpdateOutOfOrder) Error ¶
func (e ErrSchemaUpdateOutOfOrder) Error() string
type ErrTypeInvalid ¶
type ErrTypeInvalid struct {
Input string
}
func (ErrTypeInvalid) Error ¶
func (e ErrTypeInvalid) Error() string
type IndexerConfig ¶
type IndexerConfig struct { // BatchSize is the max number of wal events accumulated before triggering a // send to the search store. Defaults to 100 BatchSize int // BatchTime is the max time interval at which the batch sending to the search // store is triggered. Defaults to 1s BatchTime time.Duration // MaxQueueBytes is the max memory used by the batch indexer for inflight // batches. Defaults to 100MiB MaxQueueBytes int64 }
type Option ¶
type Option func(*BatchIndexer)
func WithCheckpoint ¶
func WithCheckpoint(c checkpointer.Checkpoint) Option
func WithLogger ¶
type Store ¶
type Store interface { GetMapper() Mapper // schema operations ApplySchemaChange(ctx context.Context, logEntry *schemalog.LogEntry) error DeleteSchema(ctx context.Context, schemaName string) error // data operations DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error) }
type StoreOption ¶
type StoreOption func(*StoreRetrier)
func WithStoreLogger ¶
func WithStoreLogger(logger loglib.Logger) StoreOption
type StoreRetrier ¶
type StoreRetrier struct {
// contains filtered or unexported fields
}
StoreRetrier applies a retry strategy to failed search store operations.
func NewStoreRetrier ¶
func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier
func (*StoreRetrier) ApplySchemaChange ¶
func (*StoreRetrier) DeleteSchema ¶
func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error
func (*StoreRetrier) DeleteTableDocuments ¶
func (*StoreRetrier) GetMapper ¶
func (s *StoreRetrier) GetMapper() Mapper
func (*StoreRetrier) SendDocuments ¶
func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error)
SendDocuments will go over failed documents, identifying any with retriable errors and retrying them with the configured backoff policy.
type StoreRetryConfig ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.