Documentation ¶
Overview ¶
Package kafka copied from https://github.com/burdiyan/kafkautil/blob/master/partitioner.go copied here to ensure this stay.
Index ¶
- Constants
- Variables
- func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
- func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
- func MurmurHasher() hash.Hash32
- func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
- func SerializeKafkaHeadersFromContext(ctx context.Context) (string, error)
- type ConsumerMetricsService
- type Handler
- type Handlers
- type Listener
- type ListenerOption
- type Producer
- type ProducerHandler
- type ProducerMetricsService
- type ProducerOption
- type StdLogger
- type TracingFunc
Constants ¶
const ( TimestampTypeLogAppendTime = "LogAppendTime" TimestampTypeCreateTime = "CreateTime" )
const InfiniteRetries = -1
InfiniteRetries is a constant to define infinite retries. It is used to set the ConsumerMaxRetries to a blocking retry process.
Variables ¶
var ( ErrEventUnretriable = errors.New("the event will not be retried") ErrEventOmitted = errors.New("the event will be omitted") )
var Brokers []string
Brokers is the list of Kafka brokers to connect to.
var Config = sarama.NewConfig()
Config is the sarama (cluster) config used for the consumer and producer.
var ConsumerMaxRetries = 3
ConsumerMaxRetries is the maximum number of time we want to retry to process an event before throwing the error. By default 3 times.
var DurationBeforeRetry = 2 * time.Second
DurationBeforeRetry is the duration we wait between process retries. By default 2 seconds.
var ErrorTopicPattern = "$$CG$$-$$T$$-error"
ErrorTopicPattern is the error topic name pattern. By default "consumergroup-topicname-error" Use $$CG$$ as consumer group placeholder Use $$T$$ as original topic name placeholder
var PushConsumerErrorsToTopic = true
PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.
Functions ¶
func DefaultTracing ¶
func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
DefaultTracing implements TracingFunc It fetches opentracing headers from the kafka message headers, then creates a span using the opentracing.GlobalTracer() usage: `listener, err = kafka.NewListener(brokers, appName, handlers, kafka.WithTracing(kafka.DefaultTracing))`
func DeserializeContextFromKafkaHeaders ¶
func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
DeserializeContextFromKafkaHeaders fetches tracing headers from json encoded carrier and returns the context
func GetContextFromKafkaMessage ¶
func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
GetContextFromKafkaMessage fetches tracing headers from the kafka message
func GetKafkaHeadersFromContext ¶
func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
GetKafkaHeadersFromContext fetch tracing metadata from context and returns them in format []RecordHeader
func MurmurHasher ¶
MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface. The implementation is not full and does not support streaming. It only implements the interface to comply with sarama.NewCustomHashPartitioner signature. But Sarama only uses Write method once, when writing keys and values of the message, so streaming support is not necessary.
func NewJVMCompatiblePartitioner ¶
func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.
Types ¶
type ConsumerMetricsService ¶
type ConsumerMetricsService struct {
// contains filtered or unexported fields
}
ConsumerMetricsService object represents consumer metrics
func NewConsumerMetricsService ¶
func NewConsumerMetricsService(groupID string) *ConsumerMetricsService
NewConsumerMetricsService creates a layer of service that add metrics capability
func (*ConsumerMetricsService) Instrumentation ¶
func (c *ConsumerMetricsService) Instrumentation(next Handler) Handler
Instrumentation middleware used to add metrics
type Handler ¶
type Handler func(ctx context.Context, msg *sarama.ConsumerMessage) error
Handler that handle received kafka messages
type Listener ¶
Listener is able to listen multiple topics with one handler by topic
func NewListener ¶
func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (Listener, error)
NewListener creates a new instance of Listener
type ListenerOption ¶
type ListenerOption func(l *listener)
ListenerOption add listener option
func WithInstrumenting ¶
func WithInstrumenting() ListenerOption
WithInstrumenting adds the instrumenting layer on a listener.
func WithTracing ¶
func WithTracing(tracer TracingFunc) ListenerOption
WithTracing accepts a TracingFunc to execute before each message
type Producer ¶
type Producer interface {
Produce(msg *sarama.ProducerMessage) error
}
func NewProducer ¶
func NewProducer(options ...ProducerOption) (Producer, error)
NewProducer creates a new producer that uses the default sarama client.
type ProducerHandler ¶
type ProducerHandler func(p *producer, msg *sarama.ProducerMessage) error
ProducerHandler is a function that handles the production of a message. It is exposed to allow for easy middleware building.
type ProducerMetricsService ¶
type ProducerMetricsService struct {
// contains filtered or unexported fields
}
ProducerMetricsService is a service that provides metrics for the producer.
func NewDeadletterProducerMetricsService ¶
func NewDeadletterProducerMetricsService() *ProducerMetricsService
func NewProducerMetricsService ¶
func NewProducerMetricsService() *ProducerMetricsService
func (*ProducerMetricsService) DeadletterInstrumentation ¶
func (p *ProducerMetricsService) DeadletterInstrumentation(next ProducerHandler) ProducerHandler
DeadletterInstrumentation is a middleware that provides metrics for a deadletter producer.
func (*ProducerMetricsService) Instrumentation ¶
func (p *ProducerMetricsService) Instrumentation(next ProducerHandler) ProducerHandler
Instrumentation is a middleware that provides metrics for the producer.
type ProducerOption ¶
type ProducerOption func(p *producer)
ProducerOption is a function that is passed to the producer constructor to configure it.
func WithDeadletterProducerInstrumenting ¶
func WithDeadletterProducerInstrumenting() ProducerOption
WithDeadletterProducerInstrumenting adds the instrumenting layer on a deadletter producer.
func WithProducerInstrumenting ¶
func WithProducerInstrumenting() ProducerOption
WithProducerInstrumenting adds the instrumenting layer on a producer.
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log messages.
ErrorLogger is the instance of a StdLogger interface. By default it is set to output on stderr all log messages, but you can set it to redirect wherever you want.
type TracingFunc ¶
type TracingFunc func(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
TracingFunc is used to create tracing and/or propagate the tracing context from each messages to the go context.