indexer

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2021 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	IndexerConcurrency       dynamicconfig.IntPropertyFn
	ESProcessorNumOfWorkers  dynamicconfig.IntPropertyFn
	ESProcessorBulkActions   dynamicconfig.IntPropertyFn // max number of requests in bulk
	ESProcessorBulkSize      dynamicconfig.IntPropertyFn // max total size of bytes in bulk
	ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
	ValidSearchAttributes    dynamicconfig.MapPropertyFn
}

Config contains all configs for indexer

type ESProcessor

type ESProcessor interface {
	// Stop processor and clean up
	Stop()
	// Add request to bulk, and record kafka message in map with provided key
	// This call will be blocked when downstream has issues
	Add(request elastic.BulkableRequest, key string, kafkaMsg messaging.Message)
}

ESProcessor is interface for elastic search bulk processor

func NewESProcessorAndStart

func NewESProcessorAndStart(config *Config, client es.Client, processorName string,
	logger log.Logger, metricsClient metrics.Client, msgEncoder *codec.JSONPBEncoder) (ESProcessor, error)

NewESProcessorAndStart create new ESProcessor and start

type ElasticBulkProcessor

type ElasticBulkProcessor interface {
	Start(ctx context.Context) error
	Stop() error
	Close() error
	Stats() elastic.BulkProcessorStats
	Add(request elastic.BulkableRequest)
	Flush() error
}

ElasticBulkProcessor is interface for elastic.BulkProcessor (elastic package doesn't provide such interface that tests can mock)

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

Indexer used to consumer data from kafka then send to ElasticSearch

func NewIndexer

func NewIndexer(config *Config, client messaging.Client, esClient es.Client, esConfig *es.Config,
	logger log.Logger, metricsClient metrics.Client) *Indexer

NewIndexer create a new Indexer

func (Indexer) Start

func (x Indexer) Start() error

Start indexer

func (Indexer) Stop

func (x Indexer) Stop()

Stop indexer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL