internal

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RetryHeaderKey              = "x-retry-count"
	RetryAttemptHeaderKey       = "x-retry-attempt-count"
	MessageProduceTimeHeaderKey = "x-produce-time"
	MessageErrHeaderKey         = "x-error-message"
)
View Source
const Name = "kafka_cronsumer"

Variables

This section is empty.

Functions

func FromHeaders added in v0.6.2

func FromHeaders(sh []segmentio.Header) []kafka.Header

func Mechanism

func Mechanism(sasl kafka.SASLConfig) sasl.Mechanism

TODO: we can support `plain` authentication type link: https://github.com/segmentio/kafka-go#plain

func NewCronsumer

func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer

func NewKafkaClient added in v1.5.3

func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error)

func NewTLSConfig

func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config

func ToHeaders added in v0.6.2

func ToHeaders(h []kafka.Header) []segmentio.Header

func VerifyTopics added in v1.5.3

func VerifyTopics(client kafkaClient, topics ...string) (bool, error)

Types

type Collector added in v1.2.2

type Collector struct {
	// contains filtered or unexported fields
}

func NewCollector added in v1.2.2

func NewCollector(metricPrefix string, cronsumerMetric *CronsumerMetric) *Collector

func (*Collector) Collect added in v1.2.2

func (s *Collector) Collect(ch chan<- prometheus.Metric)

func (*Collector) Describe added in v1.2.2

func (s *Collector) Describe(ch chan<- *prometheus.Desc)

type Consumer

type Consumer interface {
	ReadMessage(ctx context.Context) (*segmentio.Message, error)
	Stop()
}

type CronsumerMetric added in v1.2.2

type CronsumerMetric struct {
	TotalRetriedMessagesCounter   int64
	TotalDiscardedMessagesCounter int64
}

type MessageWrapper

type MessageWrapper struct {
	kafka.Message
	RetryCount        int
	ProduceTime       int64 // Nano time
	RetryAttemptCount int
}

func NewMessageWrapper

func NewMessageWrapper(msg segmentio.Message, strategyName string) *MessageWrapper

func (*MessageWrapper) GetHeaders

func (m *MessageWrapper) GetHeaders() map[string][]byte

func (*MessageWrapper) IncreaseRetryAttemptCount added in v1.4.5

func (m *MessageWrapper) IncreaseRetryAttemptCount()

func (*MessageWrapper) IncreaseRetryCount

func (m *MessageWrapper) IncreaseRetryCount()

func (*MessageWrapper) IsGteMaxRetryCount added in v1.2.1

func (m *MessageWrapper) IsGteMaxRetryCount(maxRetry int) bool

func (*MessageWrapper) NewProduceTime

func (m *MessageWrapper) NewProduceTime()

func (*MessageWrapper) ResetRetryAttempt added in v1.4.5

func (m *MessageWrapper) ResetRetryAttempt()

func (*MessageWrapper) RouteMessageToTopic

func (m *MessageWrapper) RouteMessageToTopic(topic string)

func (*MessageWrapper) To

func (m *MessageWrapper) To(increaseRetry bool, increaseRetryAttempt bool) segmentio.Message

type Producer

type Producer interface {
	ProduceWithRetryOption(message MessageWrapper, increaseRetry bool, increaseRetryAttempt bool) error
	Produce(message kafka.Message) error
	ProduceBatch(messages []kafka.Message) error
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL