Documentation ¶
Index ¶
- func New(c *config.Config) (pipeline.Processor, error)
- type Config
- type MessageHandlerAPI
- type QueueConsumerProcessor
- func (processor *QueueConsumerProcessor) HandleQueueConfig(qConfig *queue.QueueConfig, ctx *pipeline.Context) error
- func (processor *QueueConsumerProcessor) Name() string
- func (processor *QueueConsumerProcessor) NewSlicedWorker(ctx *pipeline.Context, v ...interface{})
- func (processor *QueueConsumerProcessor) Process(c *pipeline.Context) error
- func (processor *QueueConsumerProcessor) Release() error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { NumOfSlices int `config:"num_of_slices"` Slices []int `config:"slices"` IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"` MaxConnectionPerHost int `config:"max_connection_per_node"` QueueLabels map[string]interface{} `config:"queues,omitempty"` Selector queue.QueueSelector `config:"queue_selector"` Consumer *queue.ConsumerConfig `config:"consumer"` MaxWorkers int `config:"max_worker_size"` DetectActiveQueue bool `config:"detect_active_queue"` DetectIntervalInMs int `config:"detect_interval"` QuitDetectAfterIdleInMs int `config:"quite_detect_after_idle_in_ms"` MessageProcessors []*config.Config `config:"processor"` SkipEmptyQueue bool `config:"skip_empty_queue"` QuitOnEOFQueue bool `config:"quit_on_eof_queue"` QuitNeedTag bool `config:"quit_need_tag"` //need tag to quit, or wait for timeout QuitNeedTagName string `config:"quit_need_tag_name"` //need tag to quit, or wait for timeout QueueField string `config:"queue_name_field"` MessageField string `config:"message_field"` WaitingAfter []string `config:"waiting_after"` RetryDelayIntervalInMs int `config:"retry_delay_interval"` AutoCommitOffset bool `config:"auto_commit_offset"` // contains filtered or unexported fields }
type MessageHandlerAPI ¶
type QueueConsumerProcessor ¶
func (*QueueConsumerProcessor) HandleQueueConfig ¶
func (processor *QueueConsumerProcessor) HandleQueueConfig(qConfig *queue.QueueConfig, ctx *pipeline.Context) error
func (*QueueConsumerProcessor) Name ¶
func (processor *QueueConsumerProcessor) Name() string
func (*QueueConsumerProcessor) NewSlicedWorker ¶
func (processor *QueueConsumerProcessor) NewSlicedWorker(ctx *pipeline.Context, v ...interface{})
func (*QueueConsumerProcessor) Process ¶
func (processor *QueueConsumerProcessor) Process(c *pipeline.Context) error
func (*QueueConsumerProcessor) Release ¶
func (processor *QueueConsumerProcessor) Release() error
Click to show internal directories.
Click to hide internal directories.