kafka

package
v1.15.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	KafkaClient
	// contains filtered or unexported fields
}

func NewClient

func NewClient(conf []kgo.Opt, opts ...Option) (*Client, error)

NewClient calls kgo.NewClient and wraps the resulting Client.

func WrapClient

func WrapClient(c KafkaClient, opts ...Option) *Client

WrapClient wraps kgo.Client so that produced and consumed messages are traced.

func (*Client) Close

func (c *Client) Close()

Close calls the underlying *kgo.Close and finishes the remaining span.

func (*Client) Produce

func (c *Client) Produce(ctx context.Context, msg *kgo.Record, fn func(record *kgo.Record, err error))

Produce calls the underlying *kgo.Client.Produce, the request will be traced. This function is used for producing message asynchronously.

func (*Client) ProduceSync

func (c *Client) ProduceSync(ctx context.Context, msgs ...*kgo.Record) kgo.ProduceResults

ProduceSync calls the underlying *kgo.Client.ProduceSync and traces all results.

func (*Client) WrapFetchesRecordIter

func (c *Client) WrapFetchesRecordIter(ctx context.Context, i *kgo.FetchesRecordIter) *FetchesRecordIter

WrapFetchesRecordIter wraps the kgo.FetchesRecordIter and links it to the Client.

type FetchesRecordIter

type FetchesRecordIter struct {
	*kgo.FetchesRecordIter
	// contains filtered or unexported fields
}

func (*FetchesRecordIter) Next

func (i *FetchesRecordIter) Next() *kgo.Record

Next calls underlying kgo.FetchesRecordIter.Next and traces the message.

type KafkaClient

type KafkaClient interface {
	Produce(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error))
	ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults
	PollRecords(ctx context.Context, num int) kgo.Fetches
	Flush(ctx context.Context) error
	Close()
}

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kgo.Record) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option customizes the config.

func WithAnalytics

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL