consumer

package
v0.0.0-...-92398f1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

Types

type Config

type Config struct {
	NumOfSlices int   `config:"num_of_slices"`
	Slices      []int `config:"slices"`

	IdleTimeoutInSecond     int                    `config:"idle_timeout_in_seconds"`
	MaxConnectionPerHost    int                    `config:"max_connection_per_node"`
	QueueLabels             map[string]interface{} `config:"queues,omitempty"`
	Selector                queue.QueueSelector    `config:"queue_selector"`
	Consumer                *queue.ConsumerConfig  `config:"consumer"`
	MaxWorkers              int                    `config:"max_worker_size"`
	DetectActiveQueue       bool                   `config:"detect_active_queue"`
	DetectIntervalInMs      int                    `config:"detect_interval"`
	QuitDetectAfterIdleInMs int                    `config:"quite_detect_after_idle_in_ms"`

	MessageProcessors []*config.Config `config:"processor"`

	SkipEmptyQueue bool `config:"skip_empty_queue"`
	QuitOnEOFQueue bool `config:"quit_on_eof_queue"`

	QuitNeedTag     bool   `config:"quit_need_tag"`      //need tag to quit, or wait for timeout
	QuitNeedTagName string `config:"quit_need_tag_name"` //need tag to quit, or wait for timeout

	QueueField             string   `config:"queue_name_field"`
	MessageField           string   `config:"message_field"`
	WaitingAfter           []string `config:"waiting_after"`
	RetryDelayIntervalInMs int      `config:"retry_delay_interval"`
	AutoCommitOffset       bool     `config:"auto_commit_offset"`
	// contains filtered or unexported fields
}

type MessageHandlerAPI

type MessageHandlerAPI interface {
	HandleMessages(c *pipeline.Context, msgs []queue.Message) (bool, error)
}

type QueueConsumerProcessor

type QueueConsumerProcessor struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*QueueConsumerProcessor) HandleQueueConfig

func (processor *QueueConsumerProcessor) HandleQueueConfig(qConfig *queue.QueueConfig, ctx *pipeline.Context) error

func (*QueueConsumerProcessor) Name

func (processor *QueueConsumerProcessor) Name() string

func (*QueueConsumerProcessor) NewSlicedWorker

func (processor *QueueConsumerProcessor) NewSlicedWorker(ctx *pipeline.Context, v ...interface{})

func (*QueueConsumerProcessor) Process

func (processor *QueueConsumerProcessor) Process(c *pipeline.Context) error

func (*QueueConsumerProcessor) Release

func (processor *QueueConsumerProcessor) Release() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL