kafka

package
v1.71.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).

Example

This example shows how a span context can be passed from a producer to a consumer.

// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package main

import (
	"fmt"

	kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"
	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
	testGroupID = "gotest"
	testTopic   = "gotest"
)

// This example shows how a span context can be passed from a producer to a consumer.
func main() {

	tracer.Start()
	defer tracer.Stop()

	c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{
		"go.events.channel.enable": true, // required for the events channel to be turned on
		"group.id":                 testGroupID,
		"socket.timeout.ms":        10,
		"session.timeout.ms":       10,
		"enable.auto.offset.store": false,
	})

	err = c.Subscribe(testTopic, nil)
	if err != nil {
		panic(err)
	}

	// Create the span to be passed
	parentSpan := tracer.StartSpan("test_parent_span")

	// Produce a message with a span
	go func() {
		msg := &kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic:     &testTopic,
				Partition: 1,
				Offset:    1,
			},
			Key:   []byte("key1"),
			Value: []byte("value1"),
		}

		// Inject the span context in the message to be produced
		carrier := kafkatrace.NewMessageCarrier(msg)
		tracer.Inject(parentSpan.Context(), carrier)

		c.Consumer.Events() <- msg

	}()

	msg := (<-c.Events()).(*kafka.Message)

	// Extract the context from the message
	carrier := kafkatrace.NewMessageCarrier(msg)
	spanContext, err := tracer.Extract(carrier)
	if err != nil {
		panic(err)
	}

	parentContext := parentSpan.Context()

	// Validate that the context passed is the context sent via the message
	if spanContext.TraceID() == parentContext.TraceID() {
		fmt.Println("Span context passed sucessfully from producer to consumer")
	} else {
		fmt.Println("Span context not passed")
	}

	c.Close()
	// wait for the events channel to be closed
	<-c.Events()
}
Output:

Span context passed sucessfully from producer to consumer

Index

Examples

Constants

This section is empty.

Variables

View Source
var WithAnalytics = tracing.WithAnalytics

WithAnalytics enables Trace Analytics for all started spans.

View Source
var WithAnalyticsRate = tracing.WithAnalyticsRate

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

View Source
var WithContext = tracing.WithContext

WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers

View Source
var WithDataStreams = tracing.WithDataStreams

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

View Source
var WithServiceName = tracing.WithServiceName

WithServiceName sets the config service name to serviceName.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Commit added in v1.55.0

func (c *Consumer) Commit() ([]kafka.TopicPartition, error)

Commit commits current offsets and tracks the commit offsets if data streams is enabled.

func (*Consumer) CommitMessage added in v1.55.0

func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error)

CommitMessage commits a message and tracks the commit offsets if data streams is enabled.

func (*Consumer) CommitOffsets added in v1.55.0

func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)

CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). msg events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. msg will be traced.

func (*Consumer) ReadMessage added in v1.30.0

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. msg will be traced.

type MessageCarrier

type MessageCarrier = tracing.MessageCarrier

A MessageCarrier injects and extracts traces from a kafka.Message.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

type Option

type Option = tracing.Option

An Option customizes the config.

func WithConfig added in v1.52.0

func WithConfig(cm *kafka.ConfigMap) Option

WithConfig extracts the config information for the client to be tagged

func WithCustomTag added in v1.42.0

func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option

WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

A Producer wraps a kafka.Producer.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer.

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Events added in v1.55.0

func (p *Producer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). msg events will be monitored with data streams monitoring (if enabled)

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL