Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var WithAnalytics = tracing.WithAnalytics
WithAnalytics enables Trace Analytics for all started spans.
var WithAnalyticsRate = tracing.WithAnalyticsRate
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
var WithDataStreams = tracing.WithDataStreams
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
var WithServiceName = tracing.WithServiceName
WithServiceName sets the config service name to serviceName.
Functions ¶
func ExtractSpanContext ¶
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error)
ExtractSpanContext retrieves the SpanContext from a kafka.Message
Types ¶
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
A Reader wraps a kafka.Reader.
Example ¶
r := kafkatrace.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", GroupID: "group-id", SessionTimeout: 30 * time.Second, }) msg, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal("Failed to read message", err) } // create a child span using span id and trace id in message header spanContext, err := kafkatrace.ExtractSpanContext(msg) if err != nil { log.Fatal("Failed to extract span context from carrier", err) } operationName := "child-span" s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) defer s.Finish()
Output:
func WrapReader ¶
WrapReader wraps a kafka.Reader so that any consumed events are traced.
func (*Reader) Close ¶
Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.
func (*Reader) FetchMessage ¶ added in v1.40.0
FetchMessage reads and returns the next message from the reader. Message will be traced.
type Writer ¶
type Writer struct { *kafka.Writer // contains filtered or unexported fields }
Writer wraps a kafka.Writer with tracing config data
Example ¶
w := kafkatrace.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", }) // use slice as it passes the value by reference if you want message headers updated in kafkatrace msgs := []kafka.Message{ { Key: []byte("key1"), Value: []byte("value1"), }, } if err := w.WriteMessages(context.Background(), msgs...); err != nil { log.Fatal("Failed to write message", err) }
Output:
func WrapWriter ¶
WrapWriter wraps a kafka.Writer so requests are traced.