Documentation ¶
Index ¶
- func ComponentName(v CKGoVersion) string
- func IntegrationName(v CKGoVersion) string
- func Package(v CKGoVersion) *instrumentation.Instrumentation
- func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E
- func WrapDeliveryChannel[E any, TE Event](tr *KafkaTracer, deliveryChan chan E, span *tracer.Span, ...) (chan E, chan error)
- func WrapProduceChannel[M any, TM Message](tr *KafkaTracer, out chan M, translateFn func(M) TM) chan M
- func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E
- type CKGoVersion
- type ConfigMap
- type Consumer
- type Event
- type Header
- type KafkaHeader
- type KafkaTracer
- func (tr *KafkaTracer) DSMEnabled() bool
- func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message)
- func (tr *KafkaTracer) SetProduceCheckpoint(msg Message)
- func (tr *KafkaTracer) StartConsumeSpan(msg Message) *tracer.Span
- func (tr *KafkaTracer) StartProduceSpan(msg Message) *tracer.Span
- func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error)
- func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer)
- func (tr *KafkaTracer) TrackProduceOffsets(msg Message)
- type Message
- type MessageCarrier
- type OffsetsCommitted
- type Option
- type OptionFn
- func WithAnalytics(on bool) OptionFn
- func WithAnalyticsRate(rate float64) OptionFn
- func WithConfig(cg ConfigMap) OptionFn
- func WithContext(ctx context.Context) OptionFn
- func WithCustomTag(tag string, tagFn func(msg Message) interface{}) OptionFn
- func WithDataStreams() OptionFn
- func WithService(serviceName string) OptionFn
- type SpanStore
- type TopicPartition
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComponentName ¶
func ComponentName(v CKGoVersion) string
func IntegrationName ¶
func IntegrationName(v CKGoVersion) string
func Package ¶
func Package(v CKGoVersion) *instrumentation.Instrumentation
func WrapConsumeEventsChannel ¶
func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E
func WrapDeliveryChannel ¶
func WrapProduceChannel ¶
func WrapProduceChannel[M any, TM Message](tr *KafkaTracer, out chan M, translateFn func(M) TM) chan M
func WrapProduceEventsChannel ¶
func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E
Types ¶
type CKGoVersion ¶
type CKGoVersion int32
const ( CKGoVersion1 CKGoVersion = 1 CKGoVersion2 CKGoVersion = 2 )
type Event ¶
type Event interface { KafkaMessage() (Message, bool) KafkaOffsetsCommitted() (OffsetsCommitted, bool) }
type KafkaHeader ¶
func (KafkaHeader) GetKey ¶
func (h KafkaHeader) GetKey() string
func (KafkaHeader) GetValue ¶
func (h KafkaHeader) GetValue() []byte
type KafkaTracer ¶
func NewKafkaTracer ¶
func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer
func (*KafkaTracer) DSMEnabled ¶
func (tr *KafkaTracer) DSMEnabled() bool
func (*KafkaTracer) SetConsumeCheckpoint ¶
func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message)
func (*KafkaTracer) SetProduceCheckpoint ¶
func (tr *KafkaTracer) SetProduceCheckpoint(msg Message)
func (*KafkaTracer) StartConsumeSpan ¶
func (tr *KafkaTracer) StartConsumeSpan(msg Message) *tracer.Span
func (*KafkaTracer) StartProduceSpan ¶
func (tr *KafkaTracer) StartProduceSpan(msg Message) *tracer.Span
func (*KafkaTracer) TrackCommitOffsets ¶
func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error)
func (*KafkaTracer) TrackHighWatermarkOffset ¶
func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer)
func (*KafkaTracer) TrackProduceOffsets ¶
func (tr *KafkaTracer) TrackProduceOffsets(msg Message)
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.msg
func NewMessageCarrier ¶
func NewMessageCarrier(msg Message) MessageCarrier
func (MessageCarrier) ForeachKey ¶
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey conforms to the TextMapReader interface.
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key, val string)
Set implements TextMapWriter
type OffsetsCommitted ¶
type OffsetsCommitted interface { GetError() error GetOffsets() []TopicPartition }
type OptionFn ¶
type OptionFn func(*KafkaTracer)
OptionFn represents options applicable to NewConsumer, NewProducer, WrapConsumer and WrapProducer.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithConfig ¶
WithConfig extracts the config information for the client to be tagged
func WithContext ¶
WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers
func WithCustomTag ¶
WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.
func WithDataStreams ¶
func WithDataStreams() OptionFn
WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithService ¶
WithService sets the config service name to serviceName.