otelkafka

package module
v0.0.0-...-b0047c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2024 License: Apache-2.0 Imports: 13 Imported by: 2

README

otelkafka

Open Telemetry instrumentation for confluent-kafka-go.

Documentation

Overview

Package otelkafka instruments the github.com/confluentinc/confluent-kafka-go/v2 package and supports function based Producer and Consumer

The consumer's span will be created as a child of the producer's span.

Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers. (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)

Based on: https://github.com/DataDog/dd-trace-go/tree/main/contrib/confluentinc/confluent-kafka-go/kafka.v2

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Version

func Version() string

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event kafka.Event)

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) *MessageCarrier

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key string, value string)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option interface used for setting optional config properties.

func WithCustomAttributeInjector

func WithCustomAttributeInjector(fn func(msg *kafka.Message) []attribute.KeyValue) Option

func WithPropagators

func WithPropagators(propagators propagation.TextMapPropagator) Option

WithPropagators specifies propagators to use for extracting information from the HTTP requests. If none are specified, global ones will be used.

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider) Option

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

Producer supports only tracing mechanism for Produce method over deprecated ProduceChannel method

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL