sarama

package module
v2.0.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Overview

Package sarama provides functions to trace the IBM/sarama package (https://github.com/IBM/sarama).

Example (AsyncProducer)
package main

import (
	saramatrace "github.com/DataDog/dd-trace-go/contrib/IBM/sarama/v2"

	"github.com/IBM/sarama"
)

func main() {
	cfg := sarama.NewConfig()
	cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing

	producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	producer = saramatrace.WrapAsyncProducer(cfg, producer)

	msg := &sarama.ProducerMessage{
		Topic: "some-topic",
		Value: sarama.StringEncoder("Hello World"),
	}
	producer.Input() <- msg
}
Output:

Example (Consumer)
package main

import (
	"log"

	saramatrace "github.com/DataDog/dd-trace-go/contrib/IBM/sarama/v2"
	"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"

	"github.com/IBM/sarama"
)

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	consumer = saramatrace.WrapConsumer(consumer)

	partitionConsumer, err := consumer.ConsumePartition("some-topic", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partitionConsumer.Close()

	consumed := 0
	for msg := range partitionConsumer.Messages() {
		// if you want to use the kafka message as a parent span:
		if spanctx, err := tracer.Extract(saramatrace.NewConsumerMessageCarrier(msg)); err == nil {
			// you can create a span using ChildOf(spanctx)
			_ = spanctx
		}

		log.Printf("Consumed message offset %d\n", msg.Offset)
		consumed++
	}
}
Output:

Example (SyncProducer)
package main

import (
	saramatrace "github.com/DataDog/dd-trace-go/contrib/IBM/sarama/v2"

	"github.com/IBM/sarama"
)

func main() {
	cfg := sarama.NewConfig()
	cfg.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	producer = saramatrace.WrapSyncProducer(cfg, producer)

	msg := &sarama.ProducerMessage{
		Topic: "some-topic",
		Value: sarama.StringEncoder("Hello World"),
	}
	_, _, err = producer.SendMessage(msg)
	if err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WrapAsyncProducer

func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer

WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages are traced. It requires the underlying sarama Config so we can know whether or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 version which is the first version that supports headers. Only spans of successfully published messages have partition and offset tags set.

func WrapConsumer

func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer

WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created via Consumer.ConsumePartition.

func WrapPartitionConsumer

func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer

WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received message to be traced.

func WrapSyncProducer

func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer

WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages are traced.

Types

type ConsumerMessageCarrier

type ConsumerMessageCarrier struct {
	// contains filtered or unexported fields
}

A ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage.

func NewConsumerMessageCarrier

func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier

NewConsumerMessageCarrier creates a new ConsumerMessageCarrier.

func (ConsumerMessageCarrier) ForeachKey

func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ConsumerMessageCarrier) Set

func (c ConsumerMessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option describes options for the Sarama integration.

type OptionFn

type OptionFn func(*config)

OptionFn represents options applicable to WrapConsumer, WrapPartitionConsumer, WrapAsyncProducer and WrapSyncProducer.

func WithAnalytics

func WithAnalytics(on bool) OptionFn

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) OptionFn

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithDataStreams

func WithDataStreams() OptionFn

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithGroupID

func WithGroupID(groupID string) OptionFn

WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)

func WithService

func WithService(name string) OptionFn

WithService sets the given service name for the intercepted client.

type ProducerMessageCarrier

type ProducerMessageCarrier struct {
	// contains filtered or unexported fields
}

A ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewProducerMessageCarrier

func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier

NewProducerMessageCarrier creates a new ProducerMessageCarrier.

func (ProducerMessageCarrier) ForeachKey

func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ProducerMessageCarrier) Set

func (c ProducerMessageCarrier) Set(key, val string)

Set sets a header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL