kafka

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2024 License: MIT Imports: 9 Imported by: 2

Documentation

Index

Constants

View Source
const (
	OffsetEarliest             = "earliest"
	OffsetLatest               = "latest"
	ExponentialBackOffStrategy = "exponential"
	LinearBackOffStrategy      = "linear"
	FixedBackOffStrategy       = "fixed"
)

Variables

View Source
var NonStopWork time.Duration = 0

Functions

This section is empty.

Types

type BackoffStrategyInterface added in v1.4.5

type BackoffStrategyInterface interface {
	ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool
	String() string
}

func GetBackoffStrategy added in v1.4.5

func GetBackoffStrategy(strategyName string) BackoffStrategyInterface

type Config

type Config struct {
	Brokers  []string         `yaml:"brokers"`
	Consumer ConsumerConfig   `yaml:"consumer"`
	Producer ProducerConfig   `yaml:"producer"`
	SASL     SASLConfig       `yaml:"sasl"`
	LogLevel logger.Level     `yaml:"logLevel"`
	Logger   logger.Interface `yaml:"-"`
	ClientID string           `yaml:"clientId"`

	// MetricPrefix is used for prometheus fq name prefix.
	// If not provided, default metric prefix value is `kafka_cronsumer`.
	// Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`.
	// So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and
	// `kafka_cronsumer_discarded_messages_total_current`.
	MetricPrefix string `yaml:"metricPrefix"`
}

func (*Config) GetBrokerAddr added in v1.5.2

func (c *Config) GetBrokerAddr() net.Addr

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate()

type ConsumeFn

type ConsumeFn func(message Message) error

ConsumeFn function describes how to consume messages from specified topic

type ConsumerConfig

type ConsumerConfig struct {
	ClientID              string                   `yaml:"clientId"`
	GroupID               string                   `yaml:"groupId"`
	Topic                 string                   `yaml:"topic"`
	DeadLetterTopic       string                   `yaml:"deadLetterTopic"`
	MinBytes              int                      `yaml:"minBytes"`
	MaxBytes              int                      `yaml:"maxBytes"`
	MaxRetry              int                      `yaml:"maxRetry"`
	MaxWait               time.Duration            `yaml:"maxWait"`
	CommitInterval        time.Duration            `yaml:"commitInterval"`
	HeartbeatInterval     time.Duration            `yaml:"heartbeatInterval"`
	SessionTimeout        time.Duration            `yaml:"sessionTimeout"`
	RebalanceTimeout      time.Duration            `yaml:"rebalanceTimeout"`
	StartOffset           Offset                   `yaml:"startOffset"`
	RetentionTime         time.Duration            `yaml:"retentionTime"`
	Concurrency           int                      `yaml:"concurrency"`
	Duration              time.Duration            `yaml:"duration"`
	Cron                  string                   `yaml:"cron"`
	BackOffStrategy       BackoffStrategyInterface `yaml:"backOffStrategy"`
	SkipMessageByHeaderFn SkipMessageByHeaderFn    `yaml:"skipMessageByHeaderFn"`
	VerifyTopicOnStartup  bool                     `yaml:"verifyTopicOnStartup"`
	QueueCapacity         int                      `yaml:"queueCapacity"`
}

type Cronsumer

type Cronsumer interface {
	// Start starts the kafka consumer KafkaCronsumer with a new goroutine so its asynchronous operation (non-blocking)
	Start()

	// Run runs the kafka consumer KafkaCronsumer with the caller goroutine so its synchronous operation (blocking)
	Run()

	// Stop stops the cron and kafka KafkaCronsumer consumer
	Stop()

	// WithLogger for injecting custom log implementation
	WithLogger(logger logger.Interface)

	// Produce produces the message to kafka KafkaCronsumer producer. Offset and Time fields will be ignored in the message.
	Produce(message Message) error

	// ProduceBatch produces the list of messages to kafka KafkaCronsumer producer.
	ProduceBatch(messages []Message) error

	// GetMetricCollectors  for the purpose of making metric collectors available to other libraries
	GetMetricCollectors() []prometheus.Collector
}

type ExponentialBackoffStrategy added in v1.4.5

type ExponentialBackoffStrategy struct{}

func (*ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *ExponentialBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool

func (*ExponentialBackoffStrategy) String added in v1.4.5

func (s *ExponentialBackoffStrategy) String() string

type FixedBackoffStrategy added in v1.4.5

type FixedBackoffStrategy struct{}

func (*FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *FixedBackoffStrategy) ShouldIncreaseRetryAttemptCount(_ int, _ int) bool

func (*FixedBackoffStrategy) String added in v1.4.5

func (s *FixedBackoffStrategy) String() string
type Header struct {
	Key   string
	Value []byte
}

type Headers added in v1.5.4

type Headers []Header

func (Headers) Pretty added in v1.5.4

func (hs Headers) Pretty() string

Pretty Writes every header key and value, it is useful for debugging purpose

type LinearBackoffStrategy added in v1.4.5

type LinearBackoffStrategy struct{}

func (*LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount added in v1.4.5

func (s *LinearBackoffStrategy) ShouldIncreaseRetryAttemptCount(retryCount int, retryAttemptCount int) bool

func (*LinearBackoffStrategy) String added in v1.4.5

func (s *LinearBackoffStrategy) String() string

type Message

type Message struct {
	Topic         string
	Partition     int
	Offset        int64
	HighWaterMark int64
	Key           []byte
	Value         []byte
	Headers       Headers
	Time          time.Time
}

func (*Message) AddHeader added in v1.4.6

func (m *Message) AddHeader(header Header)

AddHeader works as a idempotent function

type MessageBuilder added in v0.6.2

type MessageBuilder struct {
	// contains filtered or unexported fields
}

func NewMessageBuilder added in v0.6.2

func NewMessageBuilder() *MessageBuilder

func (*MessageBuilder) Build added in v0.6.2

func (mb *MessageBuilder) Build() Message

func (*MessageBuilder) WithHeaders added in v0.6.2

func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder

func (*MessageBuilder) WithHighWatermark added in v0.6.2

func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder

func (*MessageBuilder) WithKey added in v0.6.2

func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder

func (*MessageBuilder) WithPartition added in v0.6.2

func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder

func (*MessageBuilder) WithTopic added in v0.6.2

func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder

func (*MessageBuilder) WithValue added in v0.6.2

func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder

type Offset

type Offset string

func ToStringOffset added in v0.6.4

func ToStringOffset(offset int64) Offset

func (Offset) Value

func (o Offset) Value() int64

type ProducerConfig

type ProducerConfig struct {
	Brokers      []string           `yaml:"brokers"`
	BatchSize    int                `yaml:"batchSize"`
	BatchTimeout time.Duration      `yaml:"batchTimeout"`
	Balancer     segmentio.Balancer `yaml:"balancer"`
}

type SASLConfig

type SASLConfig struct {
	Enabled            bool   `yaml:"enabled"`
	AuthType           string `yaml:"authType"` // plain or scram
	Username           string `yaml:"username"`
	Password           string `yaml:"password"`
	RootCAPath         string `yaml:"rootCAPath"`
	IntermediateCAPath string `yaml:"intermediateCAPath"`
	Rack               string `yaml:"rack"`
}

type SkipMessageByHeaderFn added in v1.4.7

type SkipMessageByHeaderFn func(headers []Header) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL