Documentation ¶
Overview ¶
Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
Example ¶
This example shows how a span context can be passed from a producer to a consumer.
// Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. package main import ( "fmt" kafkatrace "github.com/DataDog/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka.v2/v2" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) var ( testGroupID = "gotest" testTopic = "gotest" ) // This example shows how a span context can be passed from a producer to a consumer. func main() { tracer.Start() defer tracer.Stop() c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{ "go.events.channel.enable": true, // required for the events channel to be turned on "group.id": testGroupID, "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, }) err = c.Subscribe(testTopic, nil) if err != nil { panic(err) } // Create the span to be passed parentSpan := tracer.StartSpan("test_parent_span") // Produce a message with a span go func() { msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &testTopic, Partition: 1, Offset: 1, }, Key: []byte("key1"), Value: []byte("value1"), } // Inject the span context in the message to be produced carrier := kafkatrace.NewMessageCarrier(msg) tracer.Inject(parentSpan.Context(), carrier) c.Consumer.Events() <- msg }() msg := (<-c.Events()).(*kafka.Message) // Extract the context from the message carrier := kafkatrace.NewMessageCarrier(msg) spanContext, err := tracer.Extract(carrier) if err != nil { panic(err) } parentContext := parentSpan.Context() // Validate that the context passed is the context sent via the message if spanContext.TraceID() == parentContext.TraceID() { fmt.Println("Span context passed sucessfully from producer to consumer") } else { fmt.Println("Span context not passed") } c.Close() // wait for the events channel to be closed <-c.Events() }
Output: Span context passed sucessfully from producer to consumer
Index ¶
- Variables
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Commit() ([]kafka.TopicPartition, error)
- func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error)
- func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
- func (c *Consumer) Events() chan kafka.Event
- func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)
- func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)
- type MessageCarrier
- type Option
- type OptionFn
- type Producer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var WithAnalytics = tracing.WithAnalytics
WithAnalytics enables Trace Analytics for all started spans.
var WithAnalyticsRate = tracing.WithAnalyticsRate
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
var WithContext = tracing.WithContext
WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers
var WithDataStreams = tracing.WithDataStreams
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
var WithService = tracing.WithService
WithService sets the config service name to serviceName.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
A Consumer wraps a kafka.Consumer.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func (*Consumer) Close ¶
Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.
func (*Consumer) Commit ¶
func (c *Consumer) Commit() ([]kafka.TopicPartition, error)
Commit commits current offsets and tracks the commit offsets if data streams is enabled.
func (*Consumer) CommitMessage ¶
CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (*Consumer) CommitOffsets ¶
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (*Consumer) Events ¶
Events returns the kafka Events channel (if enabled). msg events will be traced.
type MessageCarrier ¶
type MessageCarrier = tracing.MessageCarrier
A MessageCarrier injects and extracts traces from a kafka.Message.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
type Option ¶
Option describes an option for the Kafka integration.
func WithConfig ¶
WithConfig extracts the config information for the client to be tagged
type OptionFn ¶
OptionFn represents options applicable to NewConsumer, NewProducer, WrapConsumer and WrapProducer.
type Producer ¶
A Producer wraps a kafka.Producer.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer.
func WrapProducer ¶
WrapProducer wraps a kafka.Producer so requests are traced.
func (*Producer) Close ¶
func (p *Producer) Close()
Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.
func (*Producer) Events ¶
Events returns the kafka Events channel (if enabled). msg events will be monitored with data streams monitoring (if enabled)
func (*Producer) ProduceChannel ¶
ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.