Documentation ¶
Overview ¶
Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
Example ¶
This example shows how a span context can be passed from a producer to a consumer.
// Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. package main import ( "fmt" kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( testGroupID = "gotest" testTopic = "gotest" ) // This example shows how a span context can be passed from a producer to a consumer. func main() { tracer.Start() defer tracer.Stop() c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{ "go.events.channel.enable": true, // required for the events channel to be turned on "group.id": testGroupID, "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, }) err = c.Subscribe(testTopic, nil) if err != nil { panic(err) } // Create the span to be passed parentSpan := tracer.StartSpan("test_parent_span") // Produce a message with a span go func() { msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &testTopic, Partition: 1, Offset: 1, }, Key: []byte("key1"), Value: []byte("value1"), } // Inject the span context in the message to be produced carrier := kafkatrace.NewMessageCarrier(msg) tracer.Inject(parentSpan.Context(), carrier) c.Consumer.Events() <- msg }() msg := (<-c.Events()).(*kafka.Message) // Extract the context from the message carrier := kafkatrace.NewMessageCarrier(msg) spanContext, err := tracer.Extract(carrier) if err != nil { panic(err) } parentContext := parentSpan.Context() // Validate that the context passed is the context sent via the message if spanContext.TraceID() == parentContext.TraceID() { fmt.Println("Span context passed sucessfully from producer to consumer") } else { fmt.Println("Span context not passed") } c.Close() // wait for the events channel to be closed <-c.Events() }
Output: Span context passed sucessfully from producer to consumer
Index ¶
- type Consumer
- type MessageCarrier
- type Option
- func WithAnalytics(on bool) Option
- func WithAnalyticsRate(rate float64) Option
- func WithConfig(cg *kafka.ConfigMap) Option
- func WithContext(ctx context.Context) Option
- func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option
- func WithDataStreams() Option
- func WithServiceName(serviceName string) Option
- type Producer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
A Consumer wraps a kafka.Consumer.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
type MessageCarrier ¶
type MessageCarrier = v2.MessageCarrier
A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
type Option ¶
An Option customizes the config.
func WithAnalytics ¶ added in v1.11.0
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶ added in v1.11.0
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithConfig ¶ added in v1.52.0
WithConfig extracts the config information for the client to be tagged
func WithContext ¶
WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers
func WithCustomTag ¶ added in v1.42.0
WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.
func WithDataStreams ¶ added in v1.55.0
func WithDataStreams() Option
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithServiceName ¶
WithServiceName sets the config service name to serviceName.
type Producer ¶
A Producer wraps a kafka.Producer.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer.