Documentation ¶
Index ¶
- Constants
- Variables
- type BackoffStrategyInterface
- type Config
- type ConsumeFn
- type ConsumerConfig
- type Cronsumer
- type ExponentialBackoffStrategy
- type FixedBackoffStrategy
- type Header
- type Headers
- type LinearBackoffStrategy
- type Message
- type MessageBuilder
- func (mb *MessageBuilder) Build() Message
- func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder
- func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder
- func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder
- func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder
- func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder
- func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder
- type Offset
- type ProducerConfig
- type SASLConfig
- type SkipMessageByHeaderFn
Constants ¶
View Source
const ( OffsetEarliest = "earliest" OffsetLatest = "latest" ExponentialBackOffStrategy = "exponential" LinearBackOffStrategy = "linear" FixedBackOffStrategy = "fixed" )
Variables ¶
View Source
var NonStopWork time.Duration = 0
Functions ¶
This section is empty.
Types ¶
type BackoffStrategyInterface ¶ added in v1.4.5
type BackoffStrategyInterface interface { ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool String() string }
func GetBackoffStrategy ¶ added in v1.4.5
func GetBackoffStrategy(strategyName string) BackoffStrategyInterface
type Config ¶
type Config struct { Brokers []string `yaml:"brokers"` Consumer ConsumerConfig `yaml:"consumer"` Producer ProducerConfig `yaml:"producer"` SASL SASLConfig `yaml:"sasl"` LogLevel logger.Level `yaml:"logLevel"` Logger logger.Interface `yaml:"-"` ClientID string `yaml:"clientId"` // MetricPrefix is used for prometheus fq name prefix. // If not provided, default metric prefix value is `kafka_cronsumer`. // Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`. // So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and // `kafka_cronsumer_discarded_messages_total_current`. MetricPrefix string `yaml:"metricPrefix"` }
func (*Config) GetBrokerAddr ¶ added in v1.5.2
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
type ConsumerConfig ¶
type ConsumerConfig struct { ClientID string `yaml:"clientId"` GroupID string `yaml:"groupId"` Topic string `yaml:"topic"` DeadLetterTopic string `yaml:"deadLetterTopic"` MinBytes int `yaml:"minBytes"` MaxBytes int `yaml:"maxBytes"` MaxRetry int `yaml:"maxRetry"` MaxWait time.Duration `yaml:"maxWait"` CommitInterval time.Duration `yaml:"commitInterval"` HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` SessionTimeout time.Duration `yaml:"sessionTimeout"` RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` StartOffset Offset `yaml:"startOffset"` RetentionTime time.Duration `yaml:"retentionTime"` Concurrency int `yaml:"concurrency"` Duration time.Duration `yaml:"duration"` Cron string `yaml:"cron"` BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"` SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"` VerifyTopicOnStartup bool `yaml:"verifyTopicOnStartup"` QueueCapacity int `yaml:"queueCapacity"` }
type Cronsumer ¶
type Cronsumer interface { // Start starts the kafka consumer KafkaCronsumer with a new goroutine so its asynchronous operation (non-blocking) Start() // Run runs the kafka consumer KafkaCronsumer with the caller goroutine so its synchronous operation (blocking) Run() // Stop stops the cron and kafka KafkaCronsumer consumer Stop() // WithLogger for injecting custom log implementation WithLogger(logger logger.Interface) // Produce produces the message to kafka KafkaCronsumer producer. Offset and Time fields will be ignored in the message. Produce(message Message) error // ProduceBatch produces the list of messages to kafka KafkaCronsumer producer. ProduceBatch(messages []Message) error // GetMetricCollectors for the purpose of making metric collectors available to other libraries GetMetricCollectors() []prometheus.Collector }
type ExponentialBackoffStrategy ¶ added in v1.4.5
type ExponentialBackoffStrategy struct{}
func (*ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount ¶ added in v1.4.5
func (s *ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool
func (*ExponentialBackoffStrategy) String ¶ added in v1.4.5
func (s *ExponentialBackoffStrategy) String() string
type FixedBackoffStrategy ¶ added in v1.4.5
type FixedBackoffStrategy struct{}
func (*FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount ¶ added in v1.4.5
func (s *FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount(_ int, _ int) bool
func (*FixedBackoffStrategy) String ¶ added in v1.4.5
func (s *FixedBackoffStrategy) String() string
type LinearBackoffStrategy ¶ added in v1.4.5
type LinearBackoffStrategy struct{}
func (*LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount ¶ added in v1.4.5
func (s *LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool
func (*LinearBackoffStrategy) String ¶ added in v1.4.5
func (s *LinearBackoffStrategy) String() string
type Message ¶
type MessageBuilder ¶ added in v0.6.2
type MessageBuilder struct {
// contains filtered or unexported fields
}
func NewMessageBuilder ¶ added in v0.6.2
func NewMessageBuilder() *MessageBuilder
func (*MessageBuilder) Build ¶ added in v0.6.2
func (mb *MessageBuilder) Build() Message
func (*MessageBuilder) WithHeaders ¶ added in v0.6.2
func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder
func (*MessageBuilder) WithHighWatermark ¶ added in v0.6.2
func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder
func (*MessageBuilder) WithKey ¶ added in v0.6.2
func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder
func (*MessageBuilder) WithPartition ¶ added in v0.6.2
func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder
func (*MessageBuilder) WithTopic ¶ added in v0.6.2
func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder
func (*MessageBuilder) WithValue ¶ added in v0.6.2
func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder
type ProducerConfig ¶
type SASLConfig ¶
type SkipMessageByHeaderFn ¶ added in v1.4.7
Click to show internal directories.
Click to hide internal directories.