Documentation ¶
Overview ¶
Package kafka copied from https://github.com/burdiyan/kafkautil/blob/master/partitioner.go copied here to ensure this stay.
Index ¶
- Variables
- func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
- func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
- func MurmurHasher() hash.Hash32
- func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
- func SerializeKafkaHeadersFromContext(ctx context.Context) (string, error)
- type ConsumerMetricsService
- type Handler
- type Handlers
- type Listener
- type ListenerOption
- type StdLogger
- type TracingFunc
Constants ¶
This section is empty.
Variables ¶
var Config = sarama.NewConfig()
Config is the sarama (cluster) config used for the consumer and producer.
var ConsumerMaxRetries = 3
ConsumerMaxRetries is the maximum number of time we want to retry to process an event before throwing the error. By default 3 times.
var DurationBeforeRetry = 2 * time.Second
DurationBeforeRetry is the duration we wait between process retries. By default 2 seconds.
var ErrorTopicPattern = "$$CG$$-$$T$$-error"
ErrorTopicPattern is the error topic name pattern. By default "consumergroup-topicname-error" Use $$CG$$ as consumer group placeholder Use $$T$$ as original topic name placeholder
var PushConsumerErrorsToTopic = true
PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.
Functions ¶
func DefaultTracing ¶
func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
DefaultTracing implements TracingFunc It fetches opentracing headers from the kafka message headers, then creates a span using the opentracing.GlobalTracer() usage: `listener, err = kafka.NewListener(brokers, appName, handlers, kafka.WithTracing(kafka.DefaultTracing))`
func DeserializeContextFromKafkaHeaders ¶
func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
DeserializeContextFromKafkaHeaders fetches tracing headers from json encoded carrier and returns the context
func GetContextFromKafkaMessage ¶
func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
GetContextFromKafkaMessage fetches tracing headers from the kafka message
func GetKafkaHeadersFromContext ¶
func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
GetKafkaHeadersFromContext fetch tracing metadata from context and returns them in format []RecordHeader
func MurmurHasher ¶
MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface. The implementation is not full and does not support streaming. It only implements the interface to comply with sarama.NewCustomHashPartitioner signature. But Sarama only uses Write method once, when writing keys and values of the message, so streaming support is not necessary.
func NewJVMCompatiblePartitioner ¶
func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.
Types ¶
type ConsumerMetricsService ¶
type ConsumerMetricsService struct {
// contains filtered or unexported fields
}
ConsumerMetricsService object represents consumer metrics
func NewConsumerMetricsService ¶
func NewConsumerMetricsService(groupID string) *ConsumerMetricsService
NewConsumerMetricsService creates a layer of service that add metrics capability
func (*ConsumerMetricsService) Instrumentation ¶
func (c *ConsumerMetricsService) Instrumentation(next Handler) Handler
Instrumentation middleware used to add metrics
type Handler ¶
type Handler func(ctx context.Context, msg *sarama.ConsumerMessage) error
Handler that handle received kafka messages
type Listener ¶
Listener is able to listen multiple topics with one handler by topic
func NewListener ¶
func NewListener(brokers []string, groupID string, handlers Handlers, options ...ListenerOption) (Listener, error)
NewListener creates a new instance of Listener
type ListenerOption ¶
type ListenerOption func(l *listener)
ListenerOption add listener option
func WithInstrumenting ¶
func WithInstrumenting() ListenerOption
WithInstrumenting adds an instance of Prometheus metrics
func WithTracing ¶
func WithTracing(tracer TracingFunc) ListenerOption
WithTracing accepts a TracingFunc to execute before each message
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log messages.
ErrorLogger is the instance of a StdLogger interface. By default it is set to output on stderr all log messages, but you can set it to redirect wherever you want.
type TracingFunc ¶
type TracingFunc func(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
TracingFunc is used to create tracing and/or propagate the tracing context from each messages to the go context.