otkafka

package
v0.13.1-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 15 Imported by: 1

Documentation

Overview

Package otkafka contains the opentracing integrated a kafka transport for package Core. The underlying kafka library is kafka-go: https://github.com/segmentio/kafka-go.

Integration

otkafka exports the configuration factoryIn this format:

kafka:
  writer:
	foo:
	  brokers:
		- localhost:9092
	  topic: foo
  reader:
	bar:
	  brokers:
		- localhost:9092
	  topic: bar
	  groupID: bar-group

For a complete overview of all available options, call the config init command.

To use package otkafka with package core, add:

var c *core.C = core.New()
c.Provide(otkafka.Providers())

The reader and writer factories are bundled into that single provider.

Standalone Usage

factoryIn some scenarios, the whole go kit family might be overkill. To directly interact with kafka, use the factory to make writers and readers. Those writers/readers are provided by github.com/segmentio/kafka-go.

c.Invoke(func(writer *kafka.Writer) {
	writer.WriteMessage(kafka.Message{})
})
Example (Reader)
if os.Getenv("KAFKA_ADDR") == "" {
	fmt.Println("set KAFKA_ADDR to run this example")
	return
}
brokers := strings.Split(os.Getenv("KAFKA_ADDR"), ",")
conf := map[string]any{
	"log": map[string]any{
		"level": "none",
	},
	"kafka": map[string]any{
		"reader": map[string]any{
			"default": otkafka.ReaderConfig{
				Brokers: brokers,
				Topic:   "example",
			},
		},
		"writer": map[string]any{
			"default": otkafka.WriterConfig{
				Brokers: brokers,
				Topic:   "example",
			},
		},
	},
}
c := core.Default(core.WithConfigStack(confmap.Provider(conf, "."), nil))
c.Provide(otkafka.Providers())
c.Invoke(func(writer *kafka.Writer) {
	err := writer.WriteMessages(context.Background(), kafka.Message{Value: []byte(`hello`)})
	if err != nil {
		panic(err)
	}
})
c.Invoke(func(reader *kafka.Reader) {
	msg, err := reader.ReadMessage(context.Background())
	if err != nil {
		panic(err)
	}
	fmt.Println(string(msg.Value))
})
Output:

hello

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Providers

func Providers(optionFunc ...ProvidersOptionFunc) di.Deps

Providers is a set of dependencies including ReaderMaker, WriterMaker and exported configs.

Depends On:
	contract.ConfigAccessor
	log.Logger
Provide:
	ReaderFactory
	WriterFactory
	ReaderMaker
	WriterMaker
	*kafka.Reader
	*kafka.Writer
	*readerCollector
	*writerCollector

func SpanFromMessage

func SpanFromMessage(ctx context.Context, tracer opentracing.Tracer, message *kafka.Message) (opentracing.Span, context.Context, error)

SpanFromMessage reads the message

Types

type AggStats added in v0.9.0

type AggStats struct {
	Min metrics.Gauge
	Max metrics.Gauge
	Avg metrics.Gauge
}

AggStats is a gauge group struct.

type KafkaLogAdapter

type KafkaLogAdapter struct {
	Logging log.Logger
}

KafkaLogAdapter is an log adapter bridging kitlog and kafka.

func (KafkaLogAdapter) Printf

func (k KafkaLogAdapter) Printf(s string, i ...any)

Printf implements kafka log interface.

type ProvidersOptionFunc added in v0.9.0

type ProvidersOptionFunc func(options *providersOption)

ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.

func WithReaderInterceptor added in v0.9.0

func WithReaderInterceptor(interceptor ReaderInterceptor) ProvidersOptionFunc

WithReaderInterceptor instructs the Providers to accept the ReaderInterceptor so that users can change reader config during runtime. This can be useful when some dynamic computations on configs are required.

func WithReaderReload added in v0.10.0

func WithReaderReload(shouldReload bool) ProvidersOptionFunc

WithReaderReload toggles whether the reader factory should reload cached instances upon OnReload event.

func WithWriterInterceptor added in v0.9.0

func WithWriterInterceptor(interceptor WriterInterceptor) ProvidersOptionFunc

WithWriterInterceptor instructs the Providers to accept the WriterInterceptor so that users can change reader config during runtime. This can be useful when some dynamic computations on configs are required.

func WithWriterReload added in v0.10.0

func WithWriterReload(shouldReload bool) ProvidersOptionFunc

WithWriterReload toggles whether the writer factory should reload cached instances upon OnReload event.

type ReaderConfig

type ReaderConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string `json:"brokers" yaml:"brokers"`

	// GroupID holds the optional consumer group id.  If GroupID is specified, then
	// Partition should NOT be specified e.g. 0
	GroupID string `json:"groupId" yaml:"groupID"`

	// The topic to read messages from.
	Topic string `json:"topic" yaml:"topic"`

	// Partition to read messages from.  Either Partition or GroupID may
	// be assigned, but not both
	Partition int `json:"partition" yaml:"partition"`

	// The capacity of the internal message queue, defaults to 100 if none is
	// set.
	QueueCapacity int `json:"queue_capacity" yaml:"queue_capacity"`

	// Min and max number of bytes to fetch from kafka factoryIn each request.
	MinBytes int `json:"minBytes" yaml:"minBytes"`
	MaxBytes int `json:"maxBytes" yaml:"maxBytes"`

	// Maximum amount of time to wait for new data to come when fetching batches
	// of messages from kafka.
	MaxWait time.Duration `json:"maxWait" yaml:"maxWait"`

	// ReadLagInterval sets the frequency at which the reader lag is updated.
	// Setting this field to a negative value disables lag reporting.
	ReadLagInterval time.Duration `json:"readLagInterval" yaml:"readLagInterval"`

	// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
	// group heartbeat update.
	//
	// Default: 3s
	//
	// Only used when GroupID is set
	HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"`

	// CommitInterval indicates the interval at which offsets are committed to
	// the broker.  If 0, commits will be handled synchronously.
	//
	// Default: 0
	//
	// Only used when GroupID is set
	CommitInterval time.Duration `json:"commitInterval" yaml:"commitInterval"`

	// PartitionWatchInterval indicates how often a reader checks for partition changes.
	// If a reader sees a partition change (such as a partition add) it will rebalance the group
	// picking up new partitions.
	//
	// Default: 5s
	//
	// Only used when GroupID is set and WatchPartitionChanges is set.
	PartitionWatchInterval time.Duration `json:"partitionWatchInterval" yaml:"partitionWatchInterval"`

	// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
	// polling the brokers and rebalancing if any partition changes happen to the topic.
	WatchPartitionChanges bool `json:"watchPartitionChanges" yaml:"watchPartitionChanges"`

	// SessionTimeout optionally sets the length of time that may pass without a heartbeat
	// before the coordinator considers the consumer dead and initiates a rebalance.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	SessionTimeout time.Duration `json:"sessionTimeout" yaml:"sessionTimeout"`

	// RebalanceTimeout optionally sets the length of time the coordinator will wait
	// for members to join as part of a rebalance.  For kafka servers under higher
	// load, it may be useful to set this value higher.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	RebalanceTimeout time.Duration `json:"rebalanceTimeout" yaml:"rebalanceTimeout"`

	// JoinGroupBackoff optionally sets the length of time to wait between re-joining
	// the consumer group after an error.
	//
	// Default: 5s
	JoinGroupBackoff time.Duration `json:"joinGroupBackoff" yaml:"joinGroupBackoff"`

	// RetentionTime optionally sets the length of time the consumer group will be saved
	// by the broker
	//
	// Default: 24h
	//
	// Only used when GroupID is set
	RetentionTime time.Duration `json:"retentionTime" yaml:"retentionTime"`

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	//
	// Only used when GroupID is set
	StartOffset int64 `json:"startOffset" yaml:"startOffset"`

	// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 100ms
	ReadBackoffMin time.Duration `json:"readBackoffMin" yaml:"readBackoffMin"`

	// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 1s
	ReadBackoffMax time.Duration `json:"readBackoffMax" yaml:"readBackoffMax"`

	// Limit of how many attempts will be made before delivering the error.
	//
	// The default is to try 3 times.
	MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`
}

ReaderConfig is a configuration object used to create new instances of Reader.

type ReaderFactory

type ReaderFactory = di.Factory[*kafka.Reader]

ReaderFactory is a di.Factory[*kafka.Reader] that creates *kafka.Reader.

Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.

type ReaderInterceptor

type ReaderInterceptor func(name string, reader *kafka.ReaderConfig)

ReaderInterceptor is an interceptor that makes last minute change to a *kafka.ReaderConfig during kafka.Reader's creation

type ReaderMaker

type ReaderMaker interface {
	Make(name string) (*kafka.Reader, error)
}

ReaderMaker models a ReaderFactory

type ReaderStats added in v0.6.0

type ReaderStats struct {
	Dials      metrics.Counter
	Fetches    metrics.Counter
	Messages   metrics.Counter
	Bytes      metrics.Counter
	Rebalances metrics.Counter
	Timeouts   metrics.Counter
	Errors     metrics.Counter

	Offset        metrics.Gauge
	Lag           metrics.Gauge
	MinBytes      metrics.Gauge
	MaxBytes      metrics.Gauge
	MaxWait       metrics.Gauge
	QueueLength   metrics.Gauge
	QueueCapacity metrics.Gauge

	DialTime   AggStats
	ReadTime   AggStats
	WaitTime   AggStats
	FetchSize  AggStats
	FetchBytes AggStats
	// contains filtered or unexported fields
}

ReaderStats is a collection of metrics for kafka reader info.

func (*ReaderStats) Observe added in v0.9.0

func (r *ReaderStats) Observe(stats kafka.ReaderStats)

Observe records the reader stats. It should be called periodically.

func (*ReaderStats) Reader added in v0.9.0

func (r *ReaderStats) Reader(reader string) *ReaderStats

Reader sets the writer label in WriterStats.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a type which traces the interacting with kafka brokers.

func NewTransport

func NewTransport(underlying kafka.RoundTripper, tracer opentracing.Tracer) *Transport

NewTransport creates a new kafka transport

func (*Transport) RoundTrip

func (t *Transport) RoundTrip(ctx context.Context, addr net.Addr, request kafka.Request) (kafka.Response, error)

RoundTrip implements kafka.RoundTripper factoryIn kafka-go. It wraps the original kafka.RoundTripper and adds a tracing span to it.

type Writer

type Writer struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

Writer is a decorator around kafka.Writer that provides tracing capabilities.

func Trace

func Trace(writer *kafka.Writer, tracer opentracing.Tracer, opts ...WriterOption) *Writer

Trace takes a kafka.Writer and returns a decorated Writer.

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

WriteMessages writes a batch of messages to the kafka topic configured on this writer. Each message written has been injected tracing headers. The upstream consumer can extract tracing spans from kafka headers, forming a distributed tracing via messaging.

type WriterConfig

type WriterConfig struct {
	// The list of brokers used to discover the partitions available on the
	// kafka cluster.
	//
	// This field is required, attempting to create a writer with an empty list
	// of brokers will panic.
	Brokers []string `json:"brokers" yaml:"brokers"`

	// The topic that the writer will produce messages to.
	//
	// If provided, this will be used to set the topic for all produced messages.
	// If not provided, each Message must specify a topic for itself. This must be
	// mutually exclusive, otherwise the Writer will return an error.
	Topic string `json:"topic" yaml:"topic"`

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int `json:"batchSize" yaml:"batchSize"`

	// Limit the maximum size of a request factoryIn bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int `json:"batchBytes" yaml:"batchBytes"`

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration `json:"batchTimeout" yaml:"batchTimeout"`

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout"`

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout"`

	// DEPRECATED: factoryIn versions prior to 0.4, the writer used to maintain a cache
	// the topic layout. With the change to use a transport to manage connections,
	// the responsibility of syncing the cluster layout has been delegated to the
	// transport.
	RebalanceInterval time.Duration `json:"rebalanceInterval" yaml:"rebalanceInterval"`

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAcks int `json:"requiredAcks" yaml:"requiredAcks"`

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool `json:"async" yaml:"async"`
}

WriterConfig is a configuration type used to create new instances of Writer.

type WriterFactory

type WriterFactory = di.Factory[*kafka.Writer]

WriterFactory is a di.Factory[*kafka.Writer] that creates *kafka.Writer.

Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.

type WriterInterceptor

type WriterInterceptor func(name string, writer *kafka.Writer)

WriterInterceptor is an interceptor that makes last minute change to a *kafka.Writer during its creation

type WriterMaker

type WriterMaker interface {
	Make(name string) (*kafka.Writer, error)
}

WriterMaker models a WriterFactory

type WriterOption

type WriterOption func(writer *Writer)

WriterOption is type that configures the Writer.

func WithLogger

func WithLogger(logger log.Logger) WriterOption

WithLogger is an option that provides logging to writer.

type WriterStats added in v0.6.0

type WriterStats struct {
	Writes   metrics.Counter
	Messages metrics.Counter
	Bytes    metrics.Counter
	Errors   metrics.Counter

	MaxAttempts  metrics.Gauge
	MaxBatchSize metrics.Gauge
	BatchTimeout metrics.Gauge
	ReadTimeout  metrics.Gauge
	WriteTimeout metrics.Gauge
	RequiredAcks metrics.Gauge
	Async        metrics.Gauge

	BatchTime  AggStats
	WriteTime  AggStats
	WaitTime   AggStats
	Retries    AggStats
	BatchSize  AggStats
	BatchBytes AggStats
	// contains filtered or unexported fields
}

WriterStats is a collection of metrics for kafka writer info.

func (*WriterStats) Observe added in v0.9.0

func (w *WriterStats) Observe(stats kafka.WriterStats) *WriterStats

Observe records the writer stats. It should called periodically.

func (*WriterStats) Writer added in v0.9.0

func (w *WriterStats) Writer(writer string) *WriterStats

Writer sets the writer label in WriterStats.

Directories

Path Synopsis
Package mock_metrics is a generated GoMock package.
Package mock_metrics is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL