kgo

package module
v4.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

micro-broker-kgo

yet another micro kafka broker alternative

TODO:

  • dont always append options from context on Init and New
  • add SubscriberOptions(...kgo.Opt)
  • add ServerSubscribeOptions(...kgo.Opt)
  • check PublisherOptions(...kgo.Opt)
  • check ClientPublisherOptions(...kgo.Opt)

Documentation

Overview

Package kgo provides a kafka broker using kgo

Index

Constants

This section is empty.

Variables

View Source
var DefaultCommitInterval = 5 * time.Second

DefaultCommitInterval specifies how fast send commit offsets to kafka

View Source
var DefaultRetryBackoffFn = func() func(int) time.Duration {
	var rngMu sync.Mutex
	rng := rand.New(rand.NewSource(time.Now().UnixNano()))
	return func(fails int) time.Duration {
		const (
			min = 100 * time.Millisecond
			max = time.Second
		)
		if fails <= 0 {
			return min
		}
		if fails > 10 {
			return max
		}

		backoff := min * time.Duration(1<<(fails-1))

		rngMu.Lock()
		jitter := 0.8 + 0.4*rng.Float64()
		rngMu.Unlock()

		backoff = time.Duration(float64(backoff) * jitter)

		if backoff > max {
			return max
		}
		return backoff
	}
}()
View Source
var DefaultSubscribeMaxInflight = 10
View Source
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")

Functions

func ClientPublishKey

func ClientPublishKey(key []byte) client.PublishOption

ClientPublishKey set the kafka message key (client option)

func ClientPublishPromise

func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption

ClientPublishKey set the kafka message key (client option)

func CommitInterval

func CommitInterval(td time.Duration) broker.Option

CommitInterval specifies interval to send commits

func Options

func Options(opts ...kgo.Opt) broker.Option

Options pass additional options to broker

func PublishKey

func PublishKey(key []byte) broker.PublishOption

PublishKey set the kafka message key (broker option)

func PublishPromise

func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption

PublishPromise set the kafka promise func for Produce

func SubscribeContext

func SubscribeContext(ctx context.Context) broker.SubscribeOption

SubscribeContext set the context for broker.SubscribeOption

func SubscribeMaxInFlight

func SubscribeMaxInFlight(n int) broker.SubscribeOption

SubscribeMaxInFlight max queued messages

func SubscribeOptions

func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption

SubscribeOptions pass additional options to broker in Subscribe

Types

type Broker

type Broker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...broker.Option) *Broker

func (*Broker) Address

func (k *Broker) Address() string

func (*Broker) BatchPublish

func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error

func (*Broker) BatchSubscribe

func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error)

func (*Broker) Connect

func (k *Broker) Connect(ctx context.Context) error

func (*Broker) Disconnect

func (k *Broker) Disconnect(ctx context.Context) error

func (*Broker) Init

func (k *Broker) Init(opts ...broker.Option) error

func (*Broker) Name

func (k *Broker) Name() string

func (*Broker) Options

func (k *Broker) Options() broker.Options

func (*Broker) Publish

func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error

func (*Broker) String

func (k *Broker) String() string

func (*Broker) Subscribe

func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL