kafka

package
v1.71.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 7 Imported by: 2

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var WithAnalytics = tracing.WithAnalytics

WithAnalytics enables Trace Analytics for all started spans.

View Source
var WithAnalyticsRate = tracing.WithAnalyticsRate

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

View Source
var WithDataStreams = tracing.WithDataStreams

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

View Source
var WithServiceName = tracing.WithServiceName

WithServiceName sets the config service name to serviceName.

Functions

func ExtractSpanContext

func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error)

ExtractSpanContext retrieves the SpanContext from a kafka.Message

Types

type Option

type Option = tracing.Option

An Option customizes the config.

type Reader

type Reader struct {
	*kafka.Reader
	// contains filtered or unexported fields
}

A Reader wraps a kafka.Reader.

Example
r := kafkatrace.NewReader(kafka.ReaderConfig{
	Brokers:        []string{"localhost:9092"},
	Topic:          "some-topic",
	GroupID:        "group-id",
	SessionTimeout: 30 * time.Second,
})
msg, err := r.ReadMessage(context.Background())
if err != nil {
	log.Fatal("Failed to read message", err)
}

// create a child span using span id and trace id in message header
spanContext, err := kafkatrace.ExtractSpanContext(msg)
if err != nil {
	log.Fatal("Failed to extract span context from carrier", err)
}
operationName := "child-span"
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
defer s.Finish()
Output:

func NewReader

func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader

NewReader calls kafka.NewReader and wraps the resulting Consumer.

func WrapReader

func WrapReader(c *kafka.Reader, opts ...Option) *Reader

WrapReader wraps a kafka.Reader so that any consumed events are traced.

func (*Reader) Close

func (r *Reader) Close() error

Close calls the underlying Reader.Close and if polling is enabled, finishes any remaining span.

func (*Reader) FetchMessage added in v1.40.0

func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error)

FetchMessage reads and returns the next message from the reader. Message will be traced.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type Writer

type Writer struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

Writer wraps a kafka.Writer with tracing config data

Example
w := kafkatrace.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092"},
	Topic:   "some-topic",
})

// use slice as it passes the value by reference if you want message headers updated in kafkatrace
msgs := []kafka.Message{
	{
		Key:   []byte("key1"),
		Value: []byte("value1"),
	},
}
if err := w.WriteMessages(context.Background(), msgs...); err != nil {
	log.Fatal("Failed to write message", err)
}
Output:

func NewWriter

func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer

NewWriter calls kafka.NewWriter and wraps the resulting Producer.

func WrapWriter

func WrapWriter(w *kafka.Writer, opts ...Option) *Writer

WrapWriter wraps a kafka.Writer so requests are traced.

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests.

Directories

Path Synopsis
internal
tracing
Package tracing contains tracing logic for the segmentio/kafka-go.v0 instrumentation.
Package tracing contains tracing logic for the segmentio/kafka-go.v0 instrumentation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL