Documentation ¶
Overview ¶
Example (AsyncProducer) ¶
This example demonstrates how to instrument an async Kafka producer using instasarama. Error handling is omitted for brevity.
package main import ( "github.com/Shopify/sarama" instana "github.com/mier85/go-sensor" "github.com/mier85/go-sensor/instrumentation/instasarama" "github.com/opentracing/opentracing-go/ext" ) func main() { sensor := instana.NewSensor("my-service") brokers := []string{"localhost:9092"} config := sarama.NewConfig() // enable the use record headers added in kafka v0.11.0 and used to propagate // trace context config.Version = sarama.V0_11_0_0 // create a new instrumented instance of sarama.SyncProducer producer, _ := instasarama.NewAsyncProducer(brokers, config, sensor) // start a new entry span sp := sensor.Tracer().StartSpan("my-producing-method") ext.SpanKind.Set(sp, "entry") msg := &sarama.ProducerMessage{ // ... } // inject the span before passing the message to producer producer.Input() <- instasarama.ProducerMessageWithSpan(msg, sp) }
Output:
Example (Consumer) ¶
This example demonstrates how to instrument a Kafka consumer using instasarama and extract the trace context to ensure continuation. Error handling is omitted for brevity.
// (c) Copyright IBM Corp. 2021 // (c) Copyright Instana Inc. 2020 package main import ( "fmt" "github.com/Shopify/sarama" instana "github.com/mier85/go-sensor" "github.com/mier85/go-sensor/instrumentation/instasarama" "github.com/opentracing/opentracing-go" ) // This example demonstrates how to instrument a Kafka consumer using instasarama // and extract the trace context to ensure continuation. Error handling is omitted for brevity. func main() { sensor := instana.NewSensor("my-service") brokers := []string{"localhost:9092"} conf := sarama.NewConfig() conf.Version = sarama.V0_11_0_0 // create a new instrumented instance of sarama.Consumer consumer, _ := instasarama.NewConsumer(brokers, conf, sensor) c, _ := consumer.ConsumePartition("test-topic-1", 0, sarama.OffsetNewest) defer c.Close() for msg := range c.Messages() { fmt.Println("Got messagge", msg) processMessage(msg, sensor) } } func processMessage(msg *sarama.ConsumerMessage, sensor *instana.Sensor) { // extract trace context and start a new span parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, sensor) sp := sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx)) defer sp.Finish() // process message }
Output:
Example (ConsumerGroup) ¶
This example demonstrates how to instrument a Kafka consumer group using instasarama and extract the trace context to ensure continuation. Error handling is omitted for brevity.
// (c) Copyright IBM Corp. 2021 // (c) Copyright Instana Inc. 2020 package main import ( "context" "github.com/Shopify/sarama" instana "github.com/mier85/go-sensor" "github.com/mier85/go-sensor/instrumentation/instasarama" "github.com/opentracing/opentracing-go" ) // This example demonstrates how to instrument a Kafka consumer group using instasarama // and extract the trace context to ensure continuation. Error handling is omitted for brevity. func main() { sensor := instana.NewSensor("my-service") brokers := []string{"localhost:9092"} topics := []string{"records", "more-records"} conf := sarama.NewConfig() conf.Version = sarama.V0_11_0_0 client, _ := sarama.NewConsumerGroup(brokers, "my-service-consumers", conf) defer client.Close() ctx := context.Background() consumer := instasarama.WrapConsumerGroupHandler(&Consumer{sensor: sensor}, sensor) // start consuming for { _ = client.Consume(ctx, topics, consumer) // ... } } type Consumer struct { sensor *instana.Sensor } func (*Consumer) Setup(sarama.ConsumerGroupSession) error { // setup consumer return nil } func (*Consumer) Cleanup(sarama.ConsumerGroupSession) error { // cleanup consumer return nil } func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { c.processMessage(msg) session.MarkMessage(msg, "") } return nil } func (c *Consumer) processMessage(msg *sarama.ConsumerMessage) { // extract trace context and start a new span parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, c.sensor) sp := c.sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx)) defer sp.Finish() // process message }
Output:
Example (SyncProducer) ¶
This example demonstrates how to instrument a sync Kafka producer using instasarama. Error handling is omitted for brevity.
package main import ( "github.com/Shopify/sarama" instana "github.com/mier85/go-sensor" "github.com/mier85/go-sensor/instrumentation/instasarama" "github.com/opentracing/opentracing-go/ext" ) func main() { sensor := instana.NewSensor("my-service") brokers := []string{"localhost:9092"} config := sarama.NewConfig() // sarama requires Producer.Return.Successes to be set for sync producers config.Producer.Return.Successes = true // enable the use record headers added in kafka v0.11.0 and used to propagate // trace context config.Version = sarama.V0_11_0_0 // create a new instrumented instance of sarama.SyncProducer producer, _ := instasarama.NewSyncProducer(brokers, config, sensor) // start a new entry span sp := sensor.Tracer().StartSpan("my-producing-method") ext.SpanKind.Set(sp, "entry") msg := &sarama.ProducerMessage{ Topic: "test-topic-1", Offset: sarama.OffsetNewest, Value: sarama.StringEncoder("I am a message"), // ... } // inject the span before passing the message to producer msg = instasarama.ProducerMessageWithSpan(msg, sp) // pass it to the producer producer.SendMessage(msg) }
Output:
Index ¶
- Constants
- func NewAsyncProducer(addrs []string, conf *sarama.Config, sensor *instana.Sensor) (sarama.AsyncProducer, error)
- func NewAsyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.AsyncProducer, error)
- func NewConsumer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.Consumer, error)
- func NewConsumerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.Consumer, error)
- func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config, sensor *instana.Sensor) (sarama.ConsumerGroup, error)
- func NewConsumerGroupFromClient(groupID string, client sarama.Client, sensor *instana.Sensor) (sarama.ConsumerGroup, error)
- func NewSyncProducer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.SyncProducer, error)
- func NewSyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.SyncProducer, error)
- func PackTraceContextHeader(traceID, spanID string) []byte
- func PackTraceLevelHeader(val string) []byte
- func ProducerMessageWithSpan(pm *sarama.ProducerMessage, sp ot.Span) *sarama.ProducerMessage
- func ProducerMessageWithSpanFromContext(ctx context.Context, pm *sarama.ProducerMessage) *sarama.ProducerMessage
- func SpanContextFromConsumerMessage(cm *sarama.ConsumerMessage, sensor *instana.Sensor) (ot.SpanContext, bool)
- func UnpackTraceContextHeader(val []byte) (string, string, error)
- func UnpackTraceLevelHeader(val []byte) (string, error)
- type AsyncProducer
- type Consumer
- type ConsumerGroupHandler
- type ConsumerMessageCarrier
- type PartitionConsumer
- type ProducerMessageCarrier
- type SyncProducer
Examples ¶
Constants ¶
const ( // FieldC is the trace context header key FieldC = "X_INSTANA_C" // FieldL is the trace level header key FieldL = "X_INSTANA_L" // FieldT is the trace id FieldT = "X_INSTANA_T" // FieldS is the span id FieldS = "X_INSTANA_S" // FieldLS is the trace level FieldLS = "X_INSTANA_L_S" )
const KafkaHeaderEnvVarKey = "INSTANA_KAFKA_HEADER_FORMAT"
const Version = "1.4.0"
Version is the instrumentation module semantic version
Variables ¶
This section is empty.
Functions ¶
func NewAsyncProducer ¶
func NewAsyncProducer(addrs []string, conf *sarama.Config, sensor *instana.Sensor) (sarama.AsyncProducer, error)
NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration, and instruments its calls
func NewAsyncProducerFromClient ¶
func NewAsyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.AsyncProducer, error)
NewAsyncProducerFromClient creates a new sarama.AsyncProducer using the given client, and instruments its calls
func NewConsumer ¶
func NewConsumer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.Consumer, error)
NewConsumer creates a new consumer using the given broker addresses and configuration, and instruments its calls
func NewConsumerFromClient ¶
NewConsumerFromClient creates a new consumer using the given client and instruments its calls
func NewConsumerGroup ¶
func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config, sensor *instana.Sensor) (sarama.ConsumerGroup, error)
NewConsumerGroup creates an instrumented sarama.ConsumerGroup
func NewConsumerGroupFromClient ¶
func NewConsumerGroupFromClient(groupID string, client sarama.Client, sensor *instana.Sensor) (sarama.ConsumerGroup, error)
NewConsumerGroupFromClient creates an instrumented sarama.ConsumerGroup from sarama.Client
func NewSyncProducer ¶
func NewSyncProducer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.SyncProducer, error)
NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration, and instruments its calls
func NewSyncProducerFromClient ¶
func NewSyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.SyncProducer, error)
NewSyncProducerFromClient creates a new SyncProducer using the given client, and instruments its calls
func PackTraceContextHeader ¶
PackTraceContextHeader packs the trace and span ID into a byte slice to be used as (sarama.RecordHeader).Value. The returned slice is always 24 bytes long.
func PackTraceLevelHeader ¶
PackTraceLevelHeader packs the X-INSTANA-L value into a byte slice to be used as (sarama.RecordHeader).Value. It returns a 1-byte slice containing 0x00 if the passed value is "0", and 0x01 otherwise.
func ProducerMessageWithSpan ¶
func ProducerMessageWithSpan(pm *sarama.ProducerMessage, sp ot.Span) *sarama.ProducerMessage
ProducerMessageWithSpan injects the tracing context into producer message headers to propagate them through the Kafka requests made with instasarama producers.
func ProducerMessageWithSpanFromContext ¶
func ProducerMessageWithSpanFromContext(ctx context.Context, pm *sarama.ProducerMessage) *sarama.ProducerMessage
ProducerMessageWithSpanFromContext injects the tracing context into producer's message headers from the context if it is there.
func SpanContextFromConsumerMessage ¶
func SpanContextFromConsumerMessage(cm *sarama.ConsumerMessage, sensor *instana.Sensor) (ot.SpanContext, bool)
SpanContextFromConsumerMessage extracts the tracing context from consumer message
func UnpackTraceContextHeader ¶
UnpackTraceContextHeader unpacks and returns the trace and span ID, padding them with zeroes to 32 and 16 characters correspondingly. It expects the provided buffer to have exactly 24 bytes.
func UnpackTraceLevelHeader ¶
UnpackTraceLevelHeader returns "1" if the value contains a non-zero byte, and "0" otherwise. It expects the provided buffer to have exactly 1 byte.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { sarama.AsyncProducer // contains filtered or unexported fields }
AsyncProducer is a wrapper for sarama.AsyncProducer that instruments its calls using provided instana.Sensor
func WrapAsyncProducer ¶
func WrapAsyncProducer(p sarama.AsyncProducer, conf *sarama.Config, sensor *instana.Sensor) *AsyncProducer
WrapAsyncProducer wraps an existing sarama.AsyncProducer and instruments its calls. It requires the same config that was used to create this producer to detect the Kafka version and whether it's supposed to return successes/errors. To initialize a new sync producer instance use instasarama.NewAsyncProducer() and instasarama.NewAsyncProducerFromClient() convenience methods instead
func (*AsyncProducer) Errors ¶
func (p *AsyncProducer) Errors() <-chan *sarama.ProducerError
Errors is the error output channel back to the user
func (*AsyncProducer) Input ¶
func (p *AsyncProducer) Input() chan<- *sarama.ProducerMessage
Input is the input channel for the user to write messages to that they wish to send. The async producer will than create a new exit span for each message that has trace context added with instasarama.ProducerMessageWithSpan()
func (*AsyncProducer) Successes ¶
func (p *AsyncProducer) Successes() <-chan *sarama.ProducerMessage
Successes is the success output channel back to the user
type Consumer ¶
Consumer is a wrapper for sarama.Consumer that wraps and returns instrumented partition consumers
func WrapConsumer ¶
WrapConsumer wraps an existing sarama.Consumer instance and instruments its calls. To initialize a new instance of sarama.Consumer use instasarama.NewConsumer() and instasarama.NewConsumerFromclient() convenience methods instead
func (*Consumer) ConsumePartition ¶
func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition instruments and returns the partition consumer returned by undelying sarama.Consumer
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
ConsumerGroupHandler is a wrapper for sarama.ConsumerGroupHandler that creates an entry span for each incoming Kafka message, ensuring the extraction and continuation of the existing trace context if provided
func WrapConsumerGroupHandler ¶
func WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler, sensor *instana.Sensor) *ConsumerGroupHandler
WrapConsumerGroupHandler wraps the existing group handler and instruments its calls
func (*ConsumerGroupHandler) Cleanup ¶
func (h *ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error
Cleanup calls the underlying handler's Cleanup() method
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (h *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim injects the trace context into incoming message headers and delegates further processing to the underlying handler
func (*ConsumerGroupHandler) Setup ¶
func (h *ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error
Setup calls the underlying handler's Setup() method
type ConsumerMessageCarrier ¶
type ConsumerMessageCarrier struct {
Message *sarama.ConsumerMessage
}
ConsumerMessageCarrier is a trace context carrier that extracts Instana OpenTracing headers from Kafka consumer messages
func (ConsumerMessageCarrier) ForeachKey ¶
func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey implements opentracing.TextMapReader for ConsumerMessageCarrier
func (ConsumerMessageCarrier) RemoveAll ¶
func (c ConsumerMessageCarrier) RemoveAll()
RemoveAll removes all tracing headers previously set by Set()
func (ConsumerMessageCarrier) Set ¶
func (c ConsumerMessageCarrier) Set(key, val string)
Set implements opentracing.TextMapWriter for ConsumerMessageCarrier
type PartitionConsumer ¶
type PartitionConsumer struct { sarama.PartitionConsumer // contains filtered or unexported fields }
PartitionConsumer is a wrapper for sarama.PartitionConsumer that instruments its calls using provided instana.Sensor
func WrapPartitionConsumer ¶
func WrapPartitionConsumer(c sarama.PartitionConsumer, sensor *instana.Sensor) *PartitionConsumer
WrapPartitionConsumer wraps sarama.PartitionConsumer instance and instruments its calls
func (*PartitionConsumer) Messages ¶
func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages returns a channel of consumer messages of the underlying partition consumer
type ProducerMessageCarrier ¶
type ProducerMessageCarrier struct {
Message *sarama.ProducerMessage
}
ProducerMessageCarrier is a trace context carrier that propagates Instana OpenTracing headers throughout Kafka producer messages
func (ProducerMessageCarrier) ForeachKey ¶
func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey implements opentracing.TextMapReader for ProducerMessageCarrier
func (ProducerMessageCarrier) RemoveAll ¶
func (c ProducerMessageCarrier) RemoveAll()
RemoveAll removes all tracing headers previously set by Set()
func (ProducerMessageCarrier) Set ¶
func (c ProducerMessageCarrier) Set(key, val string)
Set implements opentracing.TextMapWriter for ProducerMessageCarrier
type SyncProducer ¶
type SyncProducer struct { sarama.SyncProducer // contains filtered or unexported fields }
SyncProducer is a wrapper for sarama.SyncProducer that instruments its calls using provided instana.Sensor
func WrapSyncProducer ¶
func WrapSyncProducer(sp sarama.SyncProducer, config *sarama.Config, sensor *instana.Sensor) *SyncProducer
WrapSyncProducer wraps an existing sarama.SyncProducer instance and instruments its calls. It requires the same config that was used to create this producer to detect the Kafka version and whether it's supposed to return successes/errors. To initialize a new sync producer instance use instasarama.NewSyncProducer() and instasarama.NewSyncProducerFromClient() convenience methods instead
func (*SyncProducer) SendMessage ¶
func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)
SendMessage picks up the trace context previously added to the message with instasarama.ProducerMessageWithSpan(), starts a new child span and injects its context into the message headers before sending it to the underlying producer. The call will not be traced if there the message does not contain trace context
func (*SyncProducer) SendMessages ¶
func (p *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error
SendMessages starts a new span and injects its context into messages headers before sending them with the underlying producer.
This method attempts to use the existing trace context found in message headers. There will be NO SPAN CREATED for this call if messages originate from different trace contexts. A possible use case that result in such behavior would be if messages resulted from different HTTP requests are buffered and later being sent in one batch asynchronously. In case you want your batch publish operation to be a part of a specific trace, make sure that you inject the parent span of this trace explicitly before calling `SendMessages()`, i.e.
type MessageCollector struct { CollectedMessages []*sarama.ProducerMessage producer *instasarama.SyncProducer // ... }
func (c MessageCollector) Flush(ctx context.Context) error { // extract the parent span from context and use it to continue the trace if parentSpan, ok := instana.SpanFromContext(ctx); ok { // start a new span for the batch send job sp := parentSpan.Tracer().StartSpan("batch-send", ot.ChilfOf(parentSpan.Context())) defer sp.Finish() // inject the trace context into every collected message, overriding the existing one for i, msg := range c.CollectedMessages { c.CollectedMessages = instasarama.ProducerMessageWithSpan(msg, sp) } } return c.producer.SendMessages(c.CollectedMessages) }