Documentation ¶
Index ¶
- type Client
- func (c *Client) Close()
- func (c *Client) Produce(ctx context.Context, msg *kgo.Record, fn func(record *kgo.Record, err error))
- func (c *Client) ProduceSync(ctx context.Context, msgs ...*kgo.Record) kgo.ProduceResults
- func (c *Client) WrapFetchPartition(ctx context.Context, fp kgo.FetchPartition) *FetchPartition
- func (c *Client) WrapFetchesRecordIter(ctx context.Context, i *kgo.FetchesRecordIter) *FetchesRecordIter
- type FetchPartition
- type FetchesRecordIter
- type KafkaClient
- type MessageCarrier
- type Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { KafkaClient // contains filtered or unexported fields }
func WrapClient ¶
func WrapClient(c KafkaClient, opts ...Option) *Client
WrapClient wraps kgo.Client so that produced and consumed messages are traced.
func (*Client) Close ¶
func (c *Client) Close()
Close calls the underlying *kgo.Close and finishes the remaining span.
func (*Client) Produce ¶
func (c *Client) Produce(ctx context.Context, msg *kgo.Record, fn func(record *kgo.Record, err error))
Produce calls the underlying *kgo.Client.Produce, the request will be traced. This function is used for producing message asynchronously.
func (*Client) ProduceSync ¶
ProduceSync calls the underlying *kgo.Client.ProduceSync and traces all results.
func (*Client) WrapFetchPartition ¶ added in v1.21.0
func (c *Client) WrapFetchPartition(ctx context.Context, fp kgo.FetchPartition) *FetchPartition
WrapFetchPartition wraps the kgo.FetchPartition and links it to the Client.
func (*Client) WrapFetchesRecordIter ¶
func (c *Client) WrapFetchesRecordIter(ctx context.Context, i *kgo.FetchesRecordIter) *FetchesRecordIter
WrapFetchesRecordIter wraps the kgo.FetchesRecordIter and links it to the Client.
type FetchPartition ¶ added in v1.21.0
type FetchPartition struct { kgo.FetchPartition // contains filtered or unexported fields }
func (*FetchPartition) ConsumeRecord ¶ added in v1.21.0
func (fp *FetchPartition) ConsumeRecord(rec *kgo.Record)
ConsumeRecord finishes the span for a particular record.
func (*FetchPartition) EachRecord ¶ added in v1.21.0
func (fp *FetchPartition) EachRecord(fn func(rec *kgo.Record))
EachRecord calls underlying kgo.FetchPartition.EachRecord and traces the message.
type FetchesRecordIter ¶
type FetchesRecordIter struct { *kgo.FetchesRecordIter // contains filtered or unexported fields }
func (*FetchesRecordIter) Done ¶ added in v1.17.1
func (i *FetchesRecordIter) Done() bool
Done calls underlying kgo.FetchesRecordIter.Done and finishes any remaining span.
func (*FetchesRecordIter) Next ¶
func (i *FetchesRecordIter) Next() *kgo.Record
Next calls underlying kgo.FetchesRecordIter.Next and traces the message.
type KafkaClient ¶
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kgo.Record) MessageCarrier
NewMessageCarrier creates a new MessageCarrier.
func (MessageCarrier) ForeachKey ¶
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error
ForeachKey iterates over every header.
type Option ¶
type Option func(cfg *config)
An Option customizes the config.
func WithAnalytics ¶
WithAnalytics enables Trace Analytics for all started spans.
func WithAnalyticsRate ¶
WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.
func WithServiceName ¶
WithServiceName sets the config service name to serviceName.