tracing

package
v2.0.0-beta.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 10 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComponentName

func ComponentName(v CKGoVersion) string

func IntegrationName

func IntegrationName(v CKGoVersion) string

func WrapConsumeEventsChannel

func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E

func WrapDeliveryChannel

func WrapDeliveryChannel[E any, TE Event](tr *KafkaTracer, deliveryChan chan E, span *tracer.Span, translateFn func(E) TE) (chan E, chan error)

func WrapProduceChannel

func WrapProduceChannel[M any, TM Message](tr *KafkaTracer, out chan M, translateFn func(M) TM) chan M

func WrapProduceEventsChannel

func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E

Types

type CKGoVersion

type CKGoVersion int32
const (
	CKGoVersion1 CKGoVersion = 1
	CKGoVersion2 CKGoVersion = 2
)

type ConfigMap

type ConfigMap interface {
	Get(key string, defval any) (any, error)
}

type Consumer

type Consumer interface {
	GetWatermarkOffsets(topic string, partition int32) (low int64, high int64, err error)
}

type Event

type Event interface {
	KafkaMessage() (Message, bool)
	KafkaOffsetsCommitted() (OffsetsCommitted, bool)
}
type Header interface {
	GetKey() string
	GetValue() []byte
}

type KafkaHeader

type KafkaHeader struct {
	Key   string
	Value []byte
}

func (KafkaHeader) GetKey

func (h KafkaHeader) GetKey() string

func (KafkaHeader) GetValue

func (h KafkaHeader) GetValue() []byte

type KafkaTracer

type KafkaTracer struct {
	PrevSpan *tracer.Span
	// contains filtered or unexported fields
}

func NewKafkaTracer

func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer

func (*KafkaTracer) DSMEnabled

func (tr *KafkaTracer) DSMEnabled() bool

func (*KafkaTracer) SetConsumeCheckpoint

func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message)

func (*KafkaTracer) SetProduceCheckpoint

func (tr *KafkaTracer) SetProduceCheckpoint(msg Message)

func (*KafkaTracer) StartConsumeSpan

func (tr *KafkaTracer) StartConsumeSpan(msg Message) *tracer.Span

func (*KafkaTracer) StartProduceSpan

func (tr *KafkaTracer) StartProduceSpan(msg Message) *tracer.Span

func (*KafkaTracer) TrackCommitOffsets

func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error)

func (*KafkaTracer) TrackHighWatermarkOffset

func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer)

func (*KafkaTracer) TrackProduceOffsets

func (tr *KafkaTracer) TrackProduceOffsets(msg Message)

type Message

type Message interface {
	GetValue() []byte
	GetKey() []byte
	GetHeaders() []Header
	SetHeaders([]Header)
	GetTopicPartition() TopicPartition
	Unwrap() any
}

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.msg

func NewMessageCarrier

func NewMessageCarrier(msg Message) MessageCarrier

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey conforms to the TextMapReader interface.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set implements TextMapWriter

type OffsetsCommitted

type OffsetsCommitted interface {
	GetError() error
	GetOffsets() []TopicPartition
}

type Option

type Option interface {
	// contains filtered or unexported methods
}

type OptionFn

type OptionFn func(*KafkaTracer)

OptionFn represents options applicable to NewConsumer, NewProducer, WrapConsumer and WrapProducer.

func WithAnalytics

func WithAnalytics(on bool) OptionFn

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) OptionFn

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithConfig

func WithConfig(cg ConfigMap) OptionFn

WithConfig extracts the config information for the client to be tagged

func WithContext

func WithContext(ctx context.Context) OptionFn

WithContext sets the config context to ctx. Deprecated: This is deprecated in favor of passing the context via the message headers

func WithCustomTag

func WithCustomTag(tag string, tagFn func(msg Message) interface{}) OptionFn

WithCustomTag will cause the given tagFn to be evaluated after executing a query and attach the result to the span tagged by the key.

func WithDataStreams

func WithDataStreams() OptionFn

WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/

func WithService

func WithService(serviceName string) OptionFn

WithService sets the config service name to serviceName.

type SpanStore

type SpanStore struct {
	Prev *tracer.Span
}

type TopicPartition

type TopicPartition interface {
	GetTopic() string
	GetPartition() int32
	GetOffset() int64
	GetError() error
}

Directories

Path Synopsis
kafka module
kafka.v2 module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL