kafka

package
v2.0.1-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).

Example

This example shows how a span context can be passed from a producer to a consumer.

// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package main

import (
	"fmt"

	kafkatrace "github.com/DataDog/dd-trace-go/v2/contrib/confluentinc/confluent-kafka-go/kafka"
	"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
	testGroupID = "gotest"
	testTopic   = "gotest"
)

// This example shows how a span context can be passed from a producer to a consumer.
func main() {

	tracer.Start()
	defer tracer.Stop()

	c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{
		"go.events.channel.enable": true, // required for the events channel to be turned on
		"group.id":                 testGroupID,
		"socket.timeout.ms":        10,
		"session.timeout.ms":       10,
		"enable.auto.offset.store": false,
	})

	err = c.Subscribe(testTopic, nil)
	if err != nil {
		panic(err)
	}

	// Create the span to be passed
	parentSpan := tracer.StartSpan("test_parent_span")

	// Produce a message with a span
	go func() {
		msg := &kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic:     &testTopic,
				Partition: 1,
				Offset:    1,
			},
			Key:   []byte("key1"),
			Value: []byte("value1"),
		}

		// Inject the span context in the message to be produced
		carrier := kafkatrace.NewMessageCarrier(msg)
		tracer.Inject(parentSpan.Context(), carrier)

		c.Consumer.Events() <- msg

	}()

	msg := (<-c.Events()).(*kafka.Message)

	// Extract the context from the message
	carrier := kafkatrace.NewMessageCarrier(msg)
	spanContext, err := tracer.Extract(carrier)
	if err != nil {
		panic(err)
	}

	parentContext := parentSpan.Context()

	// Validate that the context passed is the context sent via the message
	if spanContext.TraceID() == parentContext.TraceID() {
		fmt.Println("Span context passed sucessfully from producer to consumer")
	} else {
		fmt.Println("Span context not passed")
	}

	c.Close()
	// wait for the events channel to be closed
	<-c.Events()
}
Output:

Span context passed sucessfully from producer to consumer

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Commit

func (c *Consumer) Commit() ([]kafka.TopicPartition, error)

Commit commits current offsets and tracks the commit offsets if data streams is enabled.

func (*Consumer) CommitMessage

func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error)

CommitMessage commits a message and tracks the commit offsets if data streams is enabled.

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)

CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. Message will be traced.

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a message. Message will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option customizes the config.

func WithAnalytics

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithConfig

func WithConfig(cg *kafka.ConfigMap) Option

WithConfig extracts the config information for the client to be tagged

func WithContext

func WithContext(ctx context.Context) Option

WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers

func WithCustomTag

func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option

WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.

func WithDataStreams

func WithDataStreams() Option

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

A Producer wraps a kafka.Producer.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer.

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Events

func (p *Producer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be monitored with data streams monitoring (if enabled)

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL