Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { IndexerConcurrency dynamicconfig.IntPropertyFn ESProcessorNumOfWorkers dynamicconfig.IntPropertyFn ESProcessorBulkActions dynamicconfig.IntPropertyFn // max number of requests in bulk ESProcessorBulkSize dynamicconfig.IntPropertyFn // max total size of bytes in bulk ESProcessorFlushInterval dynamicconfig.DurationPropertyFn ValidSearchAttributes dynamicconfig.MapPropertyFn }
Config contains all configs for indexer
type ESProcessor ¶
type ESProcessor interface { // Stop processor and clean up Stop() // Add request to bulk, and record kafka message in map with provided key // This call will be blocked when downstream has issues Add(request elastic.BulkableRequest, key string, kafkaMsg messaging.Message) }
ESProcessor is interface for elastic search bulk processor
func NewESProcessorAndStart ¶
func NewESProcessorAndStart(config *Config, client es.Client, processorName string, logger log.Logger, metricsClient metrics.Client, msgEncoder *codec.JSONPBEncoder) (ESProcessor, error)
NewESProcessorAndStart create new ESProcessor and start
type ElasticBulkProcessor ¶
type ElasticBulkProcessor interface { Start(ctx context.Context) error Stop() error Close() error Stats() elastic.BulkProcessorStats Add(request elastic.BulkableRequest) Flush() error }
ElasticBulkProcessor is interface for elastic.BulkProcessor (elastic package doesn't provide such interface that tests can mock)
type Indexer ¶
type Indexer struct {
// contains filtered or unexported fields
}
Indexer used to consumer data from kafka then send to ElasticSearch
Click to show internal directories.
Click to hide internal directories.