kafka

package
v1.999.0-beta.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 4 Imported by: 2

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExtractSpanContext

func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error)

ExtractSpanContext retrieves the SpanContext from a kafka.Message

Types

type Option

type Option = v2.Option

An Option customizes the config.

func WithAnalytics

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithDataStreams added in v1.63.0

func WithDataStreams() Option

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

type Reader

type Reader = v2.Reader

A Reader wraps a kafka.Reader.

Example
package main

import (
	"context"
	"log"
	"time"

	kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

	kafka "github.com/segmentio/kafka-go"
)

func main() {
	r := kafkatrace.NewReader(kafka.ReaderConfig{
		Brokers:        []string{"localhost:9092"},
		Topic:          "some-topic",
		GroupID:        "group-id",
		SessionTimeout: 30 * time.Second,
	})
	msg, err := r.ReadMessage(context.Background())
	if err != nil {
		log.Fatal("Failed to read message", err)
	}

	// create a child span using span id and trace id in message header
	spanContext, err := kafkatrace.ExtractSpanContext(msg)
	if err != nil {
		log.Fatal("Failed to extract span context from carrier", err)
	}
	operationName := "child-span"
	s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
	defer s.Finish()
}
Output:

func NewReader

func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader

NewReader calls kafka.NewReader and wraps the resulting Consumer.

func WrapReader

func WrapReader(c *kafka.Reader, opts ...Option) *Reader

WrapReader wraps a kafka.Reader so that any consumed events are traced.

type Writer

type Writer = v2.Writer

Writer wraps a kafka.Writer with tracing config data

Example
package main

import (
	"context"
	"log"

	kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"

	kafka "github.com/segmentio/kafka-go"
)

func main() {
	w := kafkatrace.NewWriter(kafka.WriterConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "some-topic",
	})

	// use slice as it passes the value by reference if you want message headers updated in kafkatrace
	msgs := []kafka.Message{
		{
			Key:   []byte("key1"),
			Value: []byte("value1"),
		},
	}
	if err := w.WriteMessages(context.Background(), msgs...); err != nil {
		log.Fatal("Failed to write message", err)
	}
}
Output:

func NewWriter

func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer

NewWriter calls kafka.NewWriter and wraps the resulting Producer.

func WrapWriter

func WrapWriter(w *kafka.Writer, opts ...Option) *Writer

WrapWriter wraps a kafka.Writer so requests are traced.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL