Documentation ¶
Overview ¶
Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
Example ¶
This example shows how a span context can be passed from a producer to a consumer.
// Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. package main import ( "fmt" kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( testGroupID = "gotest" testTopic = "gotest" ) // This example shows how a span context can be passed from a producer to a consumer. func main() { tracer.Start() defer tracer.Stop() c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{ "go.events.channel.enable": true, // required for the events channel to be turned on "group.id": testGroupID, "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, }) err = c.Subscribe(testTopic, nil) if err != nil { panic(err) } // Create the span to be passed parentSpan := tracer.StartSpan("test_parent_span") // Produce a message with a span go func() { msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &testTopic, Partition: 1, Offset: 1, }, Key: []byte("key1"), Value: []byte("value1"), } // Inject the span context in the message to be produced carrier := kafkatrace.NewMessageCarrier(msg) tracer.Inject(parentSpan.Context(), carrier) c.Consumer.Events() <- msg }() msg := (<-c.Events()).(*kafka.Message) // Extract the context from the message carrier := kafkatrace.NewMessageCarrier(msg) spanContext, err := tracer.Extract(carrier) if err != nil { panic(err) } parentContext := parentSpan.Context() // Validate that the context passed is the context sent via the message if spanContext.TraceID() == parentContext.TraceID() { fmt.Println("Span context passed sucessfully from producer to consumer") } else { fmt.Println("Span context not passed") } c.Close() // wait for the events channel to be closed <-c.Events() }
Output: Span context passed sucessfully from producer to consumer
Index ¶
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Commit() ([]kafka.TopicPartition, error)
- func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error)
- func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
- func (c *Consumer) Events() chan kafka.Event
- func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)
- func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)
- type MessageCarrier
- type Option
- func WithAnalytics(on bool) Option
- func WithAnalyticsRate(rate float64) Option
- func WithConfig(cg *kafka.ConfigMap) Option
- func WithContext(ctx context.Context) Option
- func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option
- func WithDataStreams() Option
- func WithServiceName(serviceName string) Option
- type Producer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
A Consumer wraps a kafka.Consumer.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func (*Consumer) Close ¶
Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.
func (*Consumer) Commit ¶ added in v1.55.0
func (c *Consumer) Commit() ([]kafka.TopicPartition, error)
Commit commits current offsets and tracks the commit offsets if data streams is enabled.
func (*Consumer) CommitMessage ¶ added in v1.55.0
CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (*Consumer) CommitOffsets ¶ added in v1.55.0
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (*Consumer) Events ¶
Events returns the kafka Events channel (if enabled). Message events will be traced.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
func (MessageCarrier) ForeachKey ¶
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey iterates over every header.
type Option ¶
type Option func(cfg *config)
An Option customizes the config.
func WithAnalytics ¶ added in v1.11.0
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶ added in v1.11.0
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithConfig ¶ added in v1.52.0
WithConfig extracts the config information for the client to be tagged
func WithContext ¶
WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers
func WithCustomTag ¶ added in v1.42.0
WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.
func WithDataStreams ¶ added in v1.55.0
func WithDataStreams() Option
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithServiceName ¶
WithServiceName sets the config service name to serviceName.
type Producer ¶
A Producer wraps a kafka.Producer.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer.
func WrapProducer ¶
WrapProducer wraps a kafka.Producer so requests are traced.
func (*Producer) Close ¶
func (p *Producer) Close()
Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.
func (*Producer) Events ¶ added in v1.55.0
Events returns the kafka Events channel (if enabled). Message events will be monitored with data streams monitoring (if enabled)
func (*Producer) ProduceChannel ¶
ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.