Documentation ¶
Overview ¶
Package sarama provides functions to trace the Shopify/sarama package (https://github.com/Shopify/sarama).
Deprecated: github.com/Shopify/sarama is no longer maintained. Please migrate to github.com/IBM/sarama and use the corresponding instrumentation.
Example (AsyncProducer) ¶
package main import ( saramatrace "github.com/0angelic0/dd-trace-go/contrib/Shopify/sarama" "github.com/Shopify/sarama" ) func main() { cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg) if err != nil { panic(err) } defer producer.Close() producer = saramatrace.WrapAsyncProducer(cfg, producer) msg := &sarama.ProducerMessage{ Topic: "some-topic", Value: sarama.StringEncoder("Hello World"), } producer.Input() <- msg }
Output:
Example (Consumer) ¶
package main import ( "log" saramatrace "github.com/0angelic0/dd-trace-go/contrib/Shopify/sarama" "github.com/0angelic0/dd-trace-go/ddtrace/tracer" "github.com/Shopify/sarama" ) func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer consumer.Close() consumer = saramatrace.WrapConsumer(consumer) partitionConsumer, err := consumer.ConsumePartition("some-topic", 0, sarama.OffsetNewest) if err != nil { panic(err) } defer partitionConsumer.Close() consumed := 0 for msg := range partitionConsumer.Messages() { // if you want to use the kafka message as a parent span: if spanctx, err := tracer.Extract(saramatrace.NewConsumerMessageCarrier(msg)); err == nil { // you can create a span using ChildOf(spanctx) _ = spanctx } log.Printf("Consumed message offset %d\n", msg.Offset) consumed++ } }
Output:
Example (SyncProducer) ¶
package main import ( saramatrace "github.com/0angelic0/dd-trace-go/contrib/Shopify/sarama" "github.com/Shopify/sarama" ) func main() { cfg := sarama.NewConfig() cfg.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg) if err != nil { panic(err) } defer producer.Close() producer = saramatrace.WrapSyncProducer(cfg, producer) msg := &sarama.ProducerMessage{ Topic: "some-topic", Value: sarama.StringEncoder("Hello World"), } _, _, err = producer.SendMessage(msg) if err != nil { panic(err) } }
Output:
Index ¶
- func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer
- func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer
- func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer
- func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer
- type ConsumerMessageCarrier
- type Option
- type ProducerMessageCarrier
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WrapAsyncProducer ¶
func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer
WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages are traced. It requires the underlying sarama Config so we can know whether or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 version which is the first version that supports headers. Only spans of successfully published messages have partition and offset tags set.
func WrapConsumer ¶
WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created via Consumer.ConsumePartition.
func WrapPartitionConsumer ¶
func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer
WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received message to be traced.
func WrapSyncProducer ¶
func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer
WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages are traced.
Types ¶
type ConsumerMessageCarrier ¶
type ConsumerMessageCarrier struct {
// contains filtered or unexported fields
}
A ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage.
func NewConsumerMessageCarrier ¶
func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier
NewConsumerMessageCarrier creates a new ConsumerMessageCarrier.
func (ConsumerMessageCarrier) ForeachKey ¶
func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey iterates over every header.
func (ConsumerMessageCarrier) Set ¶
func (c ConsumerMessageCarrier) Set(key, val string)
Set sets a header.
type Option ¶
type Option func(cfg *config)
An Option is used to customize the config for the sarama tracer.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithDataStreams ¶
func WithDataStreams() Option
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithGroupID ¶
WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)
func WithServiceName ¶
WithServiceName sets the given service name for the intercepted client.
type ProducerMessageCarrier ¶
type ProducerMessageCarrier struct {
// contains filtered or unexported fields
}
A ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage.
func NewProducerMessageCarrier ¶
func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier
NewProducerMessageCarrier creates a new ProducerMessageCarrier.
func (ProducerMessageCarrier) ForeachKey ¶
func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey iterates over every header.
func (ProducerMessageCarrier) Set ¶
func (c ProducerMessageCarrier) Set(key, val string)
Set sets a header.