Documentation ¶
Overview ¶
Package item implements the generic item based analyzer.
Item based analyzer uses a ItemProcessor to process work items and handles the common logic for fetching work items and processing them in parallel.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyBatch = errors.New("no items in batch")
ErrEmptyBatch is returned by the item based analyzer when there are no work items in the batch.
Functions ¶
func NewAnalyzer ¶
func NewAnalyzer[Item any]( name string, cfg config.ItemBasedAnalyzerConfig, processor ItemProcessor[Item], target storage.TargetStorage, logger *log.Logger, ) (analyzer.Analyzer, error)
NewAnalyzer returns a new item based analyzer using the provided item processor.
If stopIfQueueEmptyFor is a non-zero duration, the analyzer will process batches of items until its work queue is empty for `stopIfQueueEmptyFor`, at which point it will terminate and return. Likely to be used in the regression tests.
If fixedInterval is provided, the analyzer will process one batch every fixedInterval. By default, the analyzer will use a backoff mechanism that will attempt to run as fast as possible until encountering an error.
Types ¶
type ItemProcessor ¶
type ItemProcessor[Item any] interface { // GetItems fetches the next batch of work items. GetItems(ctx context.Context, limit uint64) ([]Item, error) // ProcessItem processes a single item, retrieving all required information // from source storage or out of band and committing the resulting batch // of queries to target storage. ProcessItem(ctx context.Context, batch *storage.QueryBatch, item Item) error // QueueLength returns the number of total items in the work queue. This // is currently used for observability metrics. QueueLength(ctx context.Context) (int, error) }