Versions in this module Expand all Collapse all v1 v1.4.0 Jan 10, 2024 Changes in this version + func ConsumeMessage(handler HandlerFunc, msg *sarama.ConsumerMessage) error + func ConsumeMessageContext(handler HandlerContextFunc, ctx context.Context, msg *sarama.ConsumerMessage) error + func NewAsyncProducer(addrs []string, config *sarama.Config) (sarama.AsyncProducer, error) + func NewContext(ctx context.Context, addrs []string) context.Context + func WithContext(ctx context.Context, producer interface{}) + type Consumer struct + func NewConsumer(addrs []string, config *sarama.Config) (*Consumer, error) + func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error) + type ConsumerMessage struct + func WrapConsumerMessage(msg *sarama.ConsumerMessage) *ConsumerMessage + func (c *ConsumerMessage) SpanTracer() pinpoint.Tracer + func (c *ConsumerMessage) Tracer() pinpoint.Tracer + type HandlerContextFunc func(context.Context, *sarama.ConsumerMessage) error + type HandlerFunc func(msg *ConsumerMessage) error + type PartitionConsumer struct + func WrapPartitionConsumer(pc sarama.PartitionConsumer) *PartitionConsumer + func (pc *PartitionConsumer) Messages() <-chan *ConsumerMessage + type SyncProducer interface + SendMessageContext func(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) + SendMessagesContext func(ctx context.Context, msgs []*sarama.ProducerMessage) error + func NewSyncProducer(addrs []string, config *sarama.Config) (SyncProducer, error)