kgo

package module
v3.8.53 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 26 Imported by: 3

README

broker-kgo

TODO:

  1. экспортируем текущий оффсет для каждой партиции в группе
  2. экспортируем лаг для группы
  3. мониторим
    1. если есть лаг больше нуля
    2. если дельта оффсета за нужное нам время не

Documentation

Overview

Package kgo provides a kafka broker using kgo

Index

Constants

This section is empty.

Variables

View Source
var (

	// DefaultCommitInterval specifies how fast send commit offsets to kafka
	DefaultCommitInterval = 5 * time.Second

	// DefaultStatsInterval specifies how fast check consumer lag
	DefaultStatsInterval = 5 * time.Second

	// DefaultSubscribeMaxInflight specifies how much messages keep inflight
	DefaultSubscribeMaxInflight = 100
)
View Source
var DefaultRetryBackoffFn = func() func(int) time.Duration {
	var rngMu sync.Mutex
	return func(fails int) time.Duration {
		const (
			min = 100 * time.Millisecond
			max = time.Second
		)
		if fails <= 0 {
			return min
		}
		if fails > 10 {
			return max
		}

		backoff := min * time.Duration(1<<(fails-1))

		rngMu.Lock()
		jitter := 0.8 + 0.4*rand.Float64()
		rngMu.Unlock()

		backoff = time.Duration(float64(backoff) * jitter)

		if backoff > max {
			return max
		}
		return backoff
	}
}()
View Source
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")

Functions

func ClientID added in v3.8.10

func ClientID(id string) broker.Option

func ClientPublishKey

func ClientPublishKey(key []byte) client.PublishOption

ClientPublishKey set the kafka message key (client option)

func ClientPublishPromise added in v3.8.10

func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption

ClientPublishKey set the kafka message key (client option)

func CommitInterval

func CommitInterval(td time.Duration) broker.Option

CommitInterval specifies interval to send commits

func Group added in v3.8.10

func Group(id string) broker.Option

func Options

func Options(opts ...kgo.Opt) broker.Option

Options pass additional options to broker

func PublishKey

func PublishKey(key []byte) broker.PublishOption

PublishKey set the kafka message key (broker option)

func PublishPromise added in v3.8.10

func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption

PublishPromise set the kafka promise func for Produce

func SubscribeContext

func SubscribeContext(ctx context.Context) broker.SubscribeOption

SubscribeContext set the context for broker.SubscribeOption

func SubscribeMaxInFlight

func SubscribeMaxInFlight(n int) broker.SubscribeOption

SubscribeMaxInFlight max queued messages

func SubscribeOptions added in v3.8.2

func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption

SubscribeOptions pass additional options to broker in Subscribe

Types

type Broker added in v3.8.4

type Broker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...broker.Option) *Broker

func (*Broker) Address added in v3.8.4

func (k *Broker) Address() string

func (*Broker) BatchPublish added in v3.8.4

func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error

func (*Broker) BatchSubscribe added in v3.8.4

func (*Broker) Client added in v3.8.41

func (k *Broker) Client() *kgo.Client

func (*Broker) Connect added in v3.8.4

func (k *Broker) Connect(ctx context.Context) error

func (*Broker) Disconnect added in v3.8.4

func (k *Broker) Disconnect(ctx context.Context) error

func (*Broker) Health added in v3.8.50

func (r *Broker) Health() bool

func (*Broker) Init added in v3.8.4

func (k *Broker) Init(opts ...broker.Option) error

func (*Broker) Live added in v3.8.50

func (r *Broker) Live() bool

func (*Broker) Name added in v3.8.4

func (k *Broker) Name() string

func (*Broker) Options added in v3.8.4

func (k *Broker) Options() broker.Options

func (*Broker) Publish added in v3.8.4

func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error

func (*Broker) Ready added in v3.8.50

func (r *Broker) Ready() bool

func (*Broker) String added in v3.8.4

func (k *Broker) String() string

func (*Broker) Subscribe added in v3.8.4

func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error)

func (*Broker) TopicExists added in v3.8.41

func (k *Broker) TopicExists(ctx context.Context, topic string) error

type RecordCarrier added in v3.8.10

type RecordCarrier struct {
	// contains filtered or unexported fields
}

RecordCarrier injects and extracts traces from a kgo.Record.

This type exists to satisfy the otel/propagation.TextMapCarrier interface.

func NewRecordCarrier added in v3.8.10

func NewRecordCarrier(record *kgo.Record) RecordCarrier

NewRecordCarrier creates a new RecordCarrier.

func (RecordCarrier) Get added in v3.8.10

func (c RecordCarrier) Get(key string) string

Get retrieves a single value for a given key if it exists.

func (RecordCarrier) Keys added in v3.8.10

func (c RecordCarrier) Keys() []string

Keys returns a slice of all key identifiers in the carrier.

func (RecordCarrier) Set added in v3.8.10

func (c RecordCarrier) Set(key, val string)

Set sets a header.

type Subscriber added in v3.8.42

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Subscriber) Client added in v3.8.42

func (s *Subscriber) Client() *kgo.Client

func (*Subscriber) Options added in v3.8.42

func (s *Subscriber) Options() broker.SubscribeOptions

func (*Subscriber) Topic added in v3.8.42

func (s *Subscriber) Topic() string

func (*Subscriber) Unsubscribe added in v3.8.42

func (s *Subscriber) Unsubscribe(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL