Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExtractSpanContext ¶
func ExtractSpanContext(msg kafka.Message) (*tracer.SpanContext, error)
ExtractSpanContext retrieves the SpanContext from a kafka.Message
Types ¶
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option describes options for the Kafka integration.
type OptionFn ¶
type OptionFn func(*config)
OptionFn represents options applicable to NewReader, NewWriter, WrapReader and WrapWriter.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithService ¶
WithService sets the config service name to serviceName.
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
A Reader wraps a kafka.Reader.
Example ¶
package main import ( "context" "log" "time" kafkatrace "github.com/DataDog/dd-trace-go/v2/contrib/segmentio/kafka.go.v0" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" kafka "github.com/segmentio/kafka-go" ) func main() { r := kafkatrace.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", GroupID: "group-id", SessionTimeout: 30 * time.Second, }) msg, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal("Failed to read message", err) } // create a child span using span id and trace id in message header spanContext, err := kafkatrace.ExtractSpanContext(msg) if err != nil { log.Fatal("Failed to extract span context from carrier", err) } operationName := "child-span" s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) defer s.Finish() }
Output:
func WrapReader ¶
WrapReader wraps a kafka.Reader so that any consumed events are traced.
func (*Reader) Close ¶
Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.
func (*Reader) FetchMessage ¶
FetchMessage reads and returns the next message from the reader. Message will be traced.
type Writer ¶
type Writer struct { *kafka.Writer // contains filtered or unexported fields }
Writer wraps a kafka.Writer with tracing config data
Example ¶
package main import ( "context" "log" kafkatrace "github.com/DataDog/dd-trace-go/v2/contrib/segmentio/kafka.go.v0" kafka "github.com/segmentio/kafka-go" ) func main() { w := kafkatrace.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", }) // use slice as it passes the value by reference if you want message headers updated in kafkatrace msgs := []kafka.Message{ { Key: []byte("key1"), Value: []byte("value1"), }, } if err := w.WriteMessages(context.Background(), msgs...); err != nil { log.Fatal("Failed to write message", err) } }
Output:
func WrapWriter ¶
WrapWriter wraps a kafka.Writer so requests are traced.