Documentation ¶
Index ¶
- type ConsumeFunc
- type Consumer
- func (c *Consumer) Poll(timeoutMs int) kafka.Event
- func (c *Consumer) PollWithHandler(timeoutMs int, handler ConsumeFunc) kafka.Event
- func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)
- func (c *Consumer) ReadMessageWithHandler(timeout time.Duration, handler ConsumeFunc) (*kafka.Message, error)
- type MessageCarrier
- type Option
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
func NewConsumerWithTracing ¶
func (*Consumer) Poll ¶
Poll retrieves an event from current consumer and creates a new span if it is a kafka.Message event type.
func (*Consumer) PollWithHandler ¶
func (c *Consumer) PollWithHandler(timeoutMs int, handler ConsumeFunc) kafka.Event
PollWithHandler retrieves an event from current consumer, creates a new span if it is a kafka.Message event type and also runs the given handler.
func (*Consumer) ReadMessage ¶
ReadMessage creates a new span and reads a Kafka message from current consumer.
func (*Consumer) ReadMessageWithHandler ¶
func (c *Consumer) ReadMessageWithHandler(timeout time.Duration, handler ConsumeFunc) (*kafka.Message, error)
ReadMessageWithHandler reads a message and runs the given handler by tracing it.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
MessageCarrier injects and extracts traces from a sarama.Message.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
func (MessageCarrier) Get ¶
func (c MessageCarrier) Get(key string) string
Get retrieves a single value for a given key from Kafka message headers.
func (MessageCarrier) Keys ¶
func (c MessageCarrier) Keys() []string
Keys returns all keys identifiers from the Kafka message headers.
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key, value string)
Set sets a header on Kafka message.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is used to configure the client.
func WithConsumerGroupID ¶
WithConsumerGroupID specifies the consumer group ID that is used for creating a consumer.
func WithPropagator ¶
func WithPropagator(propagator propagation.TextMapPropagator) Option
WithPropagator specifies a custom TextMapPropagator.
func WithTracerName ¶
WithTracerName specifies a specific name for the current tracer.
func WithTracerProvider ¶
func WithTracerProvider(provider oteltrace.TracerProvider) Option
WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.
type Producer ¶
func NewProducerWithTracing ¶
func (*Producer) Produce ¶
Produce creates a new span and produces the given Kafka message synchronously using the original producer.
func (*Producer) ProduceChannel ¶
ProduceChannel creates a new span for every messages sent into the channel and forwards to the original producer channel.