kafka

package module
v2.0.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExtractSpanContext

func ExtractSpanContext(msg kafka.Message) (*tracer.SpanContext, error)

ExtractSpanContext retrieves the SpanContext from a kafka.Message

Types

type Header interface {
	GetKey() string
	GetValue() []byte
}

type KafkaConfig

type KafkaConfig struct {
	BootstrapServers string
	ConsumerGroupID  string
}

KafkaConfig holds information from the kafka config for span tags.

type KafkaHeader

type KafkaHeader struct {
	Key   string
	Value []byte
}

func (KafkaHeader) GetKey

func (h KafkaHeader) GetKey() string

func (KafkaHeader) GetValue

func (h KafkaHeader) GetValue() []byte

type KafkaWriter

type KafkaWriter struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

Writer wraps a kafka.Writer with tracing config data

func NewWriter

func NewWriter(conf kafka.WriterConfig, opts ...Option) *KafkaWriter

NewWriter calls kafka.NewWriter and wraps the resulting Producer.

func WrapWriter

func WrapWriter(w *kafka.Writer, opts ...Option) *KafkaWriter

WrapWriter wraps a kafka.Writer so requests are traced.

func (*KafkaWriter) WriteMessages

func (w *KafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

WriteMessages calls kafka-go.Writer.WriteMessages and traces the requests.

type Message

type Message interface {
	GetValue() []byte
	GetKey() []byte
	GetHeaders() []Header
	SetHeaders([]Header)
	GetTopic() string
	GetPartition() int
	GetOffset() int64
}

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message

func NewMessageCarrier

func NewMessageCarrier(msg Message) MessageCarrier

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey conforms to the TextMapReader interface.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set implements TextMapWriter

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option describes options for the Kafka integration.

type OptionFn

type OptionFn func(*config)

OptionFn represents options applicable to NewReader, NewWriter, WrapReader and WrapWriter.

func WithAnalytics

func WithAnalytics(on bool) OptionFn

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) OptionFn

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithDataStreams

func WithDataStreams() OptionFn

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithService

func WithService(serviceName string) OptionFn

WithService sets the config service name to serviceName.

type Reader

type Reader struct {
	*kafka.Reader
	// contains filtered or unexported fields
}

A Reader wraps a kafka.Reader.

Example
r := kafkatrace.NewReader(kafka.ReaderConfig{
	Brokers:        []string{"localhost:9092"},
	Topic:          "some-topic",
	GroupID:        "group-id",
	SessionTimeout: 30 * time.Second,
})
msg, err := r.ReadMessage(context.Background())
if err != nil {
	log.Fatal("Failed to read message", err)
}

// create a child span using span id and trace id in message header
spanContext, err := kafkatrace.ExtractSpanContext(msg)
if err != nil {
	log.Fatal("Failed to extract span context from carrier", err)
}
operationName := "child-span"
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
defer s.Finish()
Output:

func NewReader

func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader

NewReader calls kafka.NewReader and wraps the resulting Consumer.

func WrapReader

func WrapReader(c *kafka.Reader, opts ...Option) *Reader

WrapReader wraps a kafka.Reader so that any consumed events are traced.

func (*Reader) Close

func (r *Reader) Close() error

Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.

func (*Reader) FetchMessage

func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error)

FetchMessage reads and returns the next message from the reader. Message will be traced.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

func NewTracer

func NewTracer(kafkaCfg KafkaConfig, opts ...Option) *Tracer

func (*Tracer) FinishProduceSpan

func (*Tracer) FinishProduceSpan(span *tracer.Span, partition int, offset int64, err error)

func (*Tracer) SetConsumeDSMCheckpoint

func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message)

func (*Tracer) SetProduceDSMCheckpoint

func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer)

func (*Tracer) StartConsumeSpan

func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) *tracer.Span

func (*Tracer) StartProduceSpan

func (tr *Tracer) StartProduceSpan(ctx context.Context, writer Writer, msg Message, spanOpts ...tracer.StartSpanOption) *tracer.Span

type Writer

type Writer interface {
	GetTopic() string
}
Example
w := kafkatrace.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092"},
	Topic:   "some-topic",
})

// use slice as it passes the value by reference if you want message headers updated in kafkatrace
msgs := []kafka.Message{
	{
		Key:   []byte("key1"),
		Value: []byte("value1"),
	},
}
if err := w.WriteMessages(context.Background(), msgs...); err != nil {
	log.Fatal("Failed to write message", err)
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL