Documentation ¶
Index ¶
- Constants
- func FromHeaders(sh []segmentio.Header) []kafka.Header
- func Mechanism(sasl kafka.SASLConfig) sasl.Mechanism
- func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer
- func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error)
- func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config
- func ToHeaders(h []kafka.Header) []segmentio.Header
- func VerifyTopics(client kafkaClient, topics ...string) (bool, error)
- type Collector
- type Consumer
- type CronsumerMetric
- type MessageWrapper
- func (m *MessageWrapper) GetHeaders() map[string][]byte
- func (m *MessageWrapper) IncreaseRetryAttemptCount()
- func (m *MessageWrapper) IncreaseRetryCount()
- func (m *MessageWrapper) IsGteMaxRetryCount(maxRetry int) bool
- func (m *MessageWrapper) NewProduceTime()
- func (m *MessageWrapper) ResetRetryAttempt()
- func (m *MessageWrapper) RouteMessageToTopic(topic string)
- func (m *MessageWrapper) To(increaseRetry bool, increaseRetryAttempt bool) segmentio.Message
- type Producer
Constants ¶
View Source
const ( RetryHeaderKey = "x-retry-count" RetryAttemptHeaderKey = "x-retry-attempt-count" MessageProduceTimeHeaderKey = "x-produce-time" MessageErrHeaderKey = "x-error-message" )
View Source
const Name = "kafka_cronsumer"
Variables ¶
This section is empty.
Functions ¶
func Mechanism ¶
func Mechanism(sasl kafka.SASLConfig) sasl.Mechanism
TODO: we can support `plain` authentication type link: https://github.com/segmentio/kafka-go#plain
func NewKafkaClient ¶ added in v1.5.3
func NewTLSConfig ¶
func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config
func VerifyTopics ¶ added in v1.5.3
Types ¶
type Collector ¶ added in v1.2.2
type Collector struct {
// contains filtered or unexported fields
}
func NewCollector ¶ added in v1.2.2
func NewCollector(metricPrefix string, cronsumerMetric *CronsumerMetric) *Collector
func (*Collector) Collect ¶ added in v1.2.2
func (s *Collector) Collect(ch chan<- prometheus.Metric)
func (*Collector) Describe ¶ added in v1.2.2
func (s *Collector) Describe(ch chan<- *prometheus.Desc)
type CronsumerMetric ¶ added in v1.2.2
type MessageWrapper ¶
type MessageWrapper struct { kafka.Message RetryCount int ProduceTime int64 // Nano time RetryAttemptCount int }
func NewMessageWrapper ¶
func NewMessageWrapper(msg segmentio.Message, strategyName string) *MessageWrapper
func (*MessageWrapper) GetHeaders ¶
func (m *MessageWrapper) GetHeaders() map[string][]byte
func (*MessageWrapper) IncreaseRetryAttemptCount ¶ added in v1.4.5
func (m *MessageWrapper) IncreaseRetryAttemptCount()
func (*MessageWrapper) IncreaseRetryCount ¶
func (m *MessageWrapper) IncreaseRetryCount()
func (*MessageWrapper) IsGteMaxRetryCount ¶ added in v1.2.1
func (m *MessageWrapper) IsGteMaxRetryCount(maxRetry int) bool
func (*MessageWrapper) NewProduceTime ¶
func (m *MessageWrapper) NewProduceTime()
func (*MessageWrapper) ResetRetryAttempt ¶ added in v1.4.5
func (m *MessageWrapper) ResetRetryAttempt()
func (*MessageWrapper) RouteMessageToTopic ¶
func (m *MessageWrapper) RouteMessageToTopic(topic string)
Click to show internal directories.
Click to hide internal directories.