Versions in this module Expand all Collapse all v1 v1.0.2 May 10, 2023 Changes in this version + type Consumer struct + func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) + func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer + func (c *Consumer) Close() error + func (c *Consumer) Events() chan kafka.Event + func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) + func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) + type MessageCarrier struct + func NewMessageCarrier(msg *kafka.Message) MessageCarrier + func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error + func (c MessageCarrier) Set(key, val string) + type Option func(cfg *config) + func WithAnalytics(on bool) Option + func WithAnalyticsRate(rate float64) Option + func WithContext(ctx context.Context) Option + func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option + func WithServiceName(serviceName string) Option + type Producer struct + func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) + func WrapProducer(p *kafka.Producer, opts ...Option) *Producer + func (p *Producer) Close() + func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error + func (p *Producer) ProduceChannel() chan *kafka.Message