Documentation ¶
Index ¶
- func New(c *config.Config) (pipeline.Processor, error)
- type BulkIndexingProcessor
- func (processor *BulkIndexingProcessor) HandleQueueConfig(v *queue.QueueConfig, parentContext *pipeline.Context)
- func (processor *BulkIndexingProcessor) Name() string
- func (processor *BulkIndexingProcessor) NewBulkWorker(parentContext *pipeline.Context, qConfig *queue.QueueConfig, ...)
- func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Context, key, workerID string, sliceID, maxSlices int, ...)
- func (processor *BulkIndexingProcessor) Process(c *pipeline.Context) error
- func (processor *BulkIndexingProcessor) Release() error
- type Config
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BulkIndexingProcessor ¶
处理 bulk 格式的数据索引。
func (*BulkIndexingProcessor) HandleQueueConfig ¶
func (processor *BulkIndexingProcessor) HandleQueueConfig(v *queue.QueueConfig, parentContext *pipeline.Context)
func (*BulkIndexingProcessor) Name ¶
func (processor *BulkIndexingProcessor) Name() string
func (*BulkIndexingProcessor) NewBulkWorker ¶
func (processor *BulkIndexingProcessor) NewBulkWorker(parentContext *pipeline.Context, qConfig *queue.QueueConfig, preferedHost string)
func (*BulkIndexingProcessor) NewSlicedBulkWorker ¶
func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Context, key, workerID string, sliceID, maxSlices int, tag string, bulkSizeInByte int, qConfig *queue.QueueConfig, host string)
func (*BulkIndexingProcessor) Process ¶
func (processor *BulkIndexingProcessor) Process(c *pipeline.Context) error
func (*BulkIndexingProcessor) Release ¶
func (processor *BulkIndexingProcessor) Release() error
type Config ¶
type Config struct { NumOfSlices int `config:"num_of_slices"` Slices []int `config:"slices"` DocumentLevelSlicing bool `config:"document_level_slicing"` IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"` MaxConnectionPerHost int `config:"max_connection_per_node"` QueueLabels map[string]interface{} `config:"queues,omitempty"` Selector queue.QueueSelector `config:"queue_selector"` Consumer queue.ConsumerConfig `config:"consumer"` MaxWorkers int `config:"max_worker_size"` DetectActiveQueue bool `config:"detect_active_queue"` VerboseBulkResult bool `config:"verbose_bulk_result"` DetectIntervalInMs int `config:"detect_interval"` ValidateRequest bool `config:"valid_request"` SkipEmptyQueue bool `config:"skip_empty_queue"` SkipOnMissingInfo bool `config:"skip_info_missing"` LogBulkError bool `config:"log_bulk_error"` BulkConfig elastic.BulkProcessorConfig `config:"bulk"` Elasticsearch string `config:"elasticsearch,omitempty"` ElasticsearchConfig *elastic.ElasticsearchConfig `config:"elasticsearch_config"` WaitingAfter []string `config:"waiting_after"` RetryDelayIntervalInMs int `config:"retry_delay_interval"` // contains filtered or unexported fields }
Click to show internal directories.
Click to hide internal directories.