Documentation ¶
Overview ¶
Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
Example ¶
This example shows how a span context can be passed from a producer to a consumer.
// Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License Version 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" kafkatrace "github.com/lsgndln/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka" "github.com/lsgndln/dd-trace-go/ddtrace/tracer" ) var ( testGroupID = "gotest" testTopic = "gotest" ) // This example shows how a span context can be passed from a producer to a consumer. func main() { tracer.Start() defer tracer.Stop() c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{ "go.events.channel.enable": true, // required for the events channel to be turned on "group.id": testGroupID, "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, }) err = c.Subscribe(testTopic, nil) if err != nil { panic(err) } // Create the span to be passed parentSpan := tracer.StartSpan("test_parent_span") // Produce a message with a span go func() { msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &testTopic, Partition: 1, Offset: 1, }, Key: []byte("key1"), Value: []byte("value1"), } // Inject the span context in the message to be produced carrier := kafkatrace.NewMessageCarrier(msg) tracer.Inject(parentSpan.Context(), carrier) c.Consumer.Events() <- msg }() msg := (<-c.Events()).(*kafka.Message) // Extract the context from the message carrier := kafkatrace.NewMessageCarrier(msg) spanContext, err := tracer.Extract(carrier) if err != nil { panic(err) } parentContext := parentSpan.Context() // Validate that the context passed is the context sent via the message if spanContext.TraceID() == parentContext.TraceID() { fmt.Println("Span context passed sucessfully from producer to consumer") } else { fmt.Println("Span context not passed") } c.Close() // wait for the events channel to be closed <-c.Events() }
Output: Span context passed sucessfully from producer to consumer
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
A Consumer wraps a kafka.Consumer.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func (*Consumer) Close ¶
Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.
func (*Consumer) Events ¶
Events returns the kafka Events channel (if enabled). Message events will be traced.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
func (MessageCarrier) ForeachKey ¶
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey iterates over every header.
type Option ¶
type Option func(cfg *config)
An Option customizes the config.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithContext ¶
WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers
func WithCustomTag ¶
WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.
func WithServiceName ¶
WithServiceName sets the config service name to serviceName.
type Producer ¶
A Producer wraps a kafka.Producer.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer.
func WrapProducer ¶
WrapProducer wraps a kafka.Producer so requests are traced.
func (*Producer) Close ¶
func (p *Producer) Close()
Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.
func (*Producer) ProduceChannel ¶
ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.