kafka

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2020 License: BSD-3-Clause Imports: 4 Imported by: 0

Documentation

Overview

Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. Message events will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) ForeachKey

func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Option

type Option func(cfg *config)

An Option customizes the config.

func WithAnalytics

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithContext

func WithContext(ctx context.Context) Option

WithContext sets the config context to ctx.

func WithServiceName

func WithServiceName(serviceName string) Option

WithServiceName sets the config service name to serviceName.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

A Producer wraps a kafka.Producer.

func NewProducer

func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer calls kafka.NewProducer and wraps the resulting Producer.

func WrapProducer

func WrapProducer(p *kafka.Producer, opts ...Option) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close calls the underlying Producer.Close and also closes the internal wrapping producer channel.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL