Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExtractSpanContext ¶
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error)
ExtractSpanContext retrieves the SpanContext from a kafka.Message
Types ¶
type Option ¶
type Option func(cfg *config)
An Option customizes the config.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithDataStreams ¶ added in v1.63.0
func WithDataStreams() Option
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithServiceName ¶
WithServiceName sets the config service name to serviceName.
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
A Reader wraps a kafka.Reader.
Example ¶
package main import ( "context" "log" "time" kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" kafka "github.com/segmentio/kafka-go" ) func main() { r := kafkatrace.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", GroupID: "group-id", SessionTimeout: 30 * time.Second, }) msg, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal("Failed to read message", err) } // create a child span using span id and trace id in message header spanContext, err := kafkatrace.ExtractSpanContext(msg) if err != nil { log.Fatal("Failed to extract span context from carrier", err) } operationName := "child-span" s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) defer s.Finish() }
Output:
func WrapReader ¶
WrapReader wraps a kafka.Reader so that any consumed events are traced.
func (*Reader) Close ¶
Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.
func (*Reader) FetchMessage ¶ added in v1.40.0
FetchMessage reads and returns the next message from the reader. Message will be traced.
type Writer ¶
type Writer struct { *kafka.Writer // contains filtered or unexported fields }
Writer wraps a kafka.Writer with tracing config data
Example ¶
package main import ( "context" "log" kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0" kafka "github.com/segmentio/kafka-go" ) func main() { w := kafkatrace.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", }) // use slice as it passes the value by reference if you want message headers updated in kafkatrace msgs := []kafka.Message{ { Key: []byte("key1"), Value: []byte("value1"), }, } if err := w.WriteMessages(context.Background(), msgs...); err != nil { log.Fatal("Failed to write message", err) } }
Output:
func WrapWriter ¶
WrapWriter wraps a kafka.Writer so requests are traced.