Documentation ¶
Overview ¶
Package ppsarama instruments the Shopify/sarama package (https://github.com/Shopify/sarama).
This package instruments Kafka consumers and producers.
To instrument a Kafka consumer, use ConsumeMessageContext. In order to display the kafka broker on the pinpoint screen, a context with broker addresses must be created and delivered using NewContext.
ConsumePartition example:
ctx := ppsarama.NewContext(context.Background(), broker) pc, _ := consumer.ConsumePartition(topic, partition, offset) for msg := range pc.Messages() { ppsarama.ConsumeMessageContext(processMessage, ctx, msg) }
ConsumerGroupHandler example:
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { ctx := sess.Context() for msg := range claim.Messages() { _ = ppsarama.ConsumeMessageContext(process, ctx, msg) }
ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc. In HandlerContextFunc, this tracer can be obtained by using the pinpoint.FromContext function.
func process(ctx context.Context, msg *sarama.ConsumerMessage) error { tracer := pinpoint.FromContext(ctx) defer tracer.NewSpanEvent("process").EndSpanEvent() fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
To instrument a Kafka producer, use NewSyncProducer or NewAsyncProducer.
config := sarama.NewConfig() producer, err = ppsarama.NewSyncProducer(brokers, config)
It is necessary to pass the context containing the pinpoint.Tracer to sarama.SyncProducer (or sarama.AsyncProducer) using WithContext function.
ppsarama.WithContext(pinpoint.NewContext(context.Background(), tracer), producer) partition, offset, err := producer.SendMessage(msg)
The WithContext function() function is not thread-safe, so use the SendMessageContext function() if you have a data trace.
partition, offset, err := producer.SendMessageContext(r.Context(), msg)
Index ¶
- Constants
- func ConsumeMessage(handler HandlerFunc, msg *sarama.ConsumerMessage) error
- func ConsumeMessageContext(handler HandlerContextFunc, ctx context.Context, msg *sarama.ConsumerMessage) error
- func NewContext(ctx context.Context, addrs []string) context.Context
- func WithContext(ctx context.Context, producer interface{})
- type AsyncProducer
- type Consumer
- type ConsumerMessage
- type HandlerContextFunc
- type HandlerFunc
- type PartitionConsumer
- type SyncProducer
Constants ¶
const HeaderAsyncSpanId = "Pinpoint-AsyncSpanID"
Variables ¶
This section is empty.
Functions ¶
func ConsumeMessage ¶
func ConsumeMessage(handler HandlerFunc, msg *sarama.ConsumerMessage) error
ConsumeMessage is deprecated. ConsumeMessage creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction. ConsumeMessage passes a ConsumerMessage having pinpoint.Tracer to HandlerFunc.
func ConsumeMessageContext ¶
func ConsumeMessageContext(handler HandlerContextFunc, ctx context.Context, msg *sarama.ConsumerMessage) error
ConsumeMessageContext creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction. ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc.
func NewContext ¶
NewContext returns a new Context that contains the given broker addresses.
func WithContext ¶
WithContext is deprecated and not thread-safe. WithContext passes the context to the provided producer. It is possible to trace only when the given context contains a pinpoint.Tracer.
Types ¶
type AsyncProducer ¶ added in v1.4.3
type AsyncProducer interface { sarama.AsyncProducer InputContext(ctx context.Context, msg *sarama.ProducerMessage) }
AsyncProducer wraps the sarama.AsyncProducer and provides additional function InputContext for trace.
func NewAsyncProducer ¶
func NewAsyncProducer(addrs []string, config *sarama.Config) (AsyncProducer, error)
NewAsyncProducer wraps sarama.NewAsyncProducer and returns a AsyncProducer ready to instrument. It requires the underlying sarama Config.Producer.Return.Successes, so we can know whether successes will be returned.
type Consumer ¶
Consumer is deprecated.
func NewConsumer ¶
NewConsumer is deprecated.
func (*Consumer) ConsumePartition ¶
func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error)
ConsumePartition is deprecated.
type ConsumerMessage ¶
type ConsumerMessage struct { *sarama.ConsumerMessage // contains filtered or unexported fields }
ConsumerMessage is deprecated.
func WrapConsumerMessage ¶
func WrapConsumerMessage(msg *sarama.ConsumerMessage) *ConsumerMessage
WrapConsumerMessage is deprecated. WrapConsumerMessage wraps a sarama.ConsumerMessage and creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction.
func (*ConsumerMessage) SpanTracer ¶
func (c *ConsumerMessage) SpanTracer() pinpoint.Tracer
SpanTracer is deprecated. Use Tracer.
func (*ConsumerMessage) Tracer ¶
func (c *ConsumerMessage) Tracer() pinpoint.Tracer
Tracer returns the pinpoint.Tracer.
type HandlerContextFunc ¶
type HandlerContextFunc func(context.Context, *sarama.ConsumerMessage) error
type PartitionConsumer ¶
type PartitionConsumer struct { sarama.PartitionConsumer // contains filtered or unexported fields }
PartitionConsumer is deprecated.
func WrapPartitionConsumer ¶
func WrapPartitionConsumer(pc sarama.PartitionConsumer) *PartitionConsumer
WrapPartitionConsumer is deprecated.
func (*PartitionConsumer) Messages ¶
func (pc *PartitionConsumer) Messages() <-chan *ConsumerMessage
Messages is deprecated.
type SyncProducer ¶ added in v1.4.0
type SyncProducer interface { sarama.SyncProducer SendMessageContext(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) SendMessagesContext(ctx context.Context, msgs []*sarama.ProducerMessage) error }
SyncProducer wraps the sarama.SyncProducer and provides additional functions (SendMessageContext, SendMessagesContext) for trace.
func NewSyncProducer ¶
func NewSyncProducer(addrs []string, config *sarama.Config) (SyncProducer, error)
NewSyncProducer wraps sarama.NewSyncProducer and returns a sarama.SyncProducer ready to instrument.