sarama

package
v1.71.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 12 Imported by: 1

Documentation

Overview

Package sarama provides functions to trace the IBM/sarama package (https://github.com/IBM/sarama).

Example (AsyncProducer)
package main

import (
	saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"

	"github.com/IBM/sarama"
)

func main() {
	cfg := sarama.NewConfig()
	cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing

	producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	producer = saramatrace.WrapAsyncProducer(cfg, producer)

	msg := &sarama.ProducerMessage{
		Topic: "some-topic",
		Value: sarama.StringEncoder("Hello World"),
	}
	producer.Input() <- msg
}
Output:

Example (Consumer)
package main

import (
	"log"

	saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"
	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

	"github.com/IBM/sarama"
)

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	consumer = saramatrace.WrapConsumer(consumer)

	partitionConsumer, err := consumer.ConsumePartition("some-topic", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partitionConsumer.Close()

	consumed := 0
	for msg := range partitionConsumer.Messages() {
		// if you want to use the kafka message as a parent span:
		if spanctx, err := tracer.Extract(saramatrace.NewConsumerMessageCarrier(msg)); err == nil {
			// you can create a span using ChildOf(spanctx)
			_ = spanctx
		}

		log.Printf("Consumed message offset %d\n", msg.Offset)
		consumed++
	}
}
Output:

Example (SyncProducer)
package main

import (
	saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"

	"github.com/IBM/sarama"
)

func main() {
	cfg := sarama.NewConfig()
	cfg.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	producer = saramatrace.WrapSyncProducer(cfg, producer)

	msg := &sarama.ProducerMessage{
		Topic: "some-topic",
		Value: sarama.StringEncoder("Hello World"),
	}
	_, _, err = producer.SendMessage(msg)
	if err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WrapAsyncProducer

func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer

WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages are traced. It requires the underlying sarama Config so we can know whether or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 version which is the first version that supports headers. Only spans of successfully published messages have partition and offset tags set.

func WrapConsumer

func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer

WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created via Consumer.ConsumePartition.

func WrapPartitionConsumer

func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer

WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received message to be traced.

func WrapSyncProducer

func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer

WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages are traced.

Types

type ConsumerMessageCarrier

type ConsumerMessageCarrier struct {
	// contains filtered or unexported fields
}

A ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage.

func NewConsumerMessageCarrier

func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier

NewConsumerMessageCarrier creates a new ConsumerMessageCarrier.

func (ConsumerMessageCarrier) ForeachKey

func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ConsumerMessageCarrier) Set

func (c ConsumerMessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option is used to customize the config for the sarama tracer.

func WithAnalytics

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithDataStreams added in v1.67.0

func WithDataStreams() Option

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithGroupID added in v1.67.0

func WithGroupID(groupID string) Option

WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)

func WithServiceName

func WithServiceName(name string) Option

WithServiceName sets the given service name for the intercepted client.

type ProducerMessageCarrier

type ProducerMessageCarrier struct {
	// contains filtered or unexported fields
}

A ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewProducerMessageCarrier

func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier

NewProducerMessageCarrier creates a new ProducerMessageCarrier.

func (ProducerMessageCarrier) ForeachKey

func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ProducerMessageCarrier) Set

func (c ProducerMessageCarrier) Set(key, val string)

Set sets a header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL