Documentation ¶
Index ¶
- func ExtractSpanContext(msg kafka.Message) (*tracer.SpanContext, error)
- type Header
- type KafkaConfig
- type KafkaHeader
- type KafkaWriter
- type Message
- type MessageCarrier
- type Option
- type OptionFn
- type Reader
- type Tracer
- func (*Tracer) FinishProduceSpan(span *tracer.Span, partition int, offset int64, err error)
- func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message)
- func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer)
- func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) *tracer.Span
- func (tr *Tracer) StartProduceSpan(ctx context.Context, writer Writer, msg Message, ...) *tracer.Span
- type Writer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExtractSpanContext ¶
func ExtractSpanContext(msg kafka.Message) (*tracer.SpanContext, error)
ExtractSpanContext retrieves the SpanContext from a kafka.Message
Types ¶
type KafkaConfig ¶
KafkaConfig holds information from the kafka config for span tags.
type KafkaHeader ¶
func (KafkaHeader) GetKey ¶
func (h KafkaHeader) GetKey() string
func (KafkaHeader) GetValue ¶
func (h KafkaHeader) GetValue() []byte
type KafkaWriter ¶
type KafkaWriter struct { *kafka.Writer // contains filtered or unexported fields }
Writer wraps a kafka.Writer with tracing config data
func NewWriter ¶
func NewWriter(conf kafka.WriterConfig, opts ...Option) *KafkaWriter
NewWriter calls kafka.NewWriter and wraps the resulting Producer.
func WrapWriter ¶
func WrapWriter(w *kafka.Writer, opts ...Option) *KafkaWriter
WrapWriter wraps a kafka.Writer so requests are traced.
func (*KafkaWriter) WriteMessages ¶
func (w *KafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error
WriteMessages calls kafka-go.Writer.WriteMessages and traces the requests.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
func NewMessageCarrier ¶
func NewMessageCarrier(msg Message) MessageCarrier
func (MessageCarrier) ForeachKey ¶
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey conforms to the TextMapReader interface.
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key, val string)
Set implements TextMapWriter
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option describes options for the Kafka integration.
type OptionFn ¶
type OptionFn func(*config)
OptionFn represents options applicable to NewReader, NewWriter, WrapReader and WrapWriter.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithDataStreams ¶
func WithDataStreams() OptionFn
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithService ¶
WithService sets the config service name to serviceName.
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
A Reader wraps a kafka.Reader.
Example ¶
r := kafkatrace.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", GroupID: "group-id", SessionTimeout: 30 * time.Second, }) msg, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal("Failed to read message", err) } // create a child span using span id and trace id in message header spanContext, err := kafkatrace.ExtractSpanContext(msg) if err != nil { log.Fatal("Failed to extract span context from carrier", err) } operationName := "child-span" s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) defer s.Finish()
Output:
func WrapReader ¶
WrapReader wraps a kafka.Reader so that any consumed events are traced.
func (*Reader) Close ¶
Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.
func (*Reader) FetchMessage ¶
FetchMessage reads and returns the next message from the reader. Message will be traced.
type Tracer ¶
type Tracer struct {
// contains filtered or unexported fields
}
func NewTracer ¶
func NewTracer(kafkaCfg KafkaConfig, opts ...Option) *Tracer
func (*Tracer) FinishProduceSpan ¶
func (*Tracer) SetConsumeDSMCheckpoint ¶
func (*Tracer) SetProduceDSMCheckpoint ¶
func (*Tracer) StartConsumeSpan ¶
type Writer ¶
type Writer interface {
GetTopic() string
}
Example ¶
w := kafkatrace.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "some-topic", }) // use slice as it passes the value by reference if you want message headers updated in kafkatrace msgs := []kafka.Message{ { Key: []byte("key1"), Value: []byte("value1"), }, } if err := w.WriteMessages(context.Background(), msgs...); err != nil { log.Fatal("Failed to write message", err) }
Output: