Documentation
¶
Overview ¶
Package kgo provides a kafka broker using kgo
Index ¶
- Variables
- func ClientPublishKey(key []byte) client.PublishOption
- func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption
- func CommitInterval(td time.Duration) broker.Option
- func Options(opts ...kgo.Opt) broker.Option
- func PublishKey(key []byte) broker.PublishOption
- func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption
- func SubscribeContext(ctx context.Context) broker.SubscribeOption
- func SubscribeMaxInFlight(n int) broker.SubscribeOption
- func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption
- type Broker
- func (k *Broker) Address() string
- func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error
- func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, ...) (broker.Subscriber, error)
- func (k *Broker) Connect(ctx context.Context) error
- func (k *Broker) Disconnect(ctx context.Context) error
- func (k *Broker) Init(opts ...broker.Option) error
- func (k *Broker) Name() string
- func (k *Broker) Options() broker.Options
- func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, ...) error
- func (k *Broker) String() string
- func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, ...) (broker.Subscriber, error)
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultCommitInterval = 5 * time.Second
DefaultCommitInterval specifies how fast send commit offsets to kafka
View Source
var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex rng := rand.New(rand.NewSource(time.Now().UnixNano())) return func(fails int) time.Duration { const ( min = 100 * time.Millisecond max = time.Second ) if fails <= 0 { return min } if fails > 10 { return max } backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() jitter := 0.8 + 0.4*rng.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) if backoff > max { return max } return backoff } }()
View Source
var DefaultSubscribeMaxInflight = 10
View Source
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
Functions ¶
func ClientPublishKey ¶
func ClientPublishKey(key []byte) client.PublishOption
ClientPublishKey set the kafka message key (client option)
func ClientPublishPromise ¶
func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption
ClientPublishKey set the kafka message key (client option)
func CommitInterval ¶
CommitInterval specifies interval to send commits
func PublishKey ¶
func PublishKey(key []byte) broker.PublishOption
PublishKey set the kafka message key (broker option)
func PublishPromise ¶
func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption
PublishPromise set the kafka promise func for Produce
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) broker.SubscribeOption
SubscribeContext set the context for broker.SubscribeOption
func SubscribeMaxInFlight ¶
func SubscribeMaxInFlight(n int) broker.SubscribeOption
SubscribeMaxInFlight max queued messages
func SubscribeOptions ¶
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption
SubscribeOptions pass additional options to broker in Subscribe
Types ¶
type Broker ¶
func (*Broker) BatchPublish ¶
func (*Broker) BatchSubscribe ¶
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error)
Click to show internal directories.
Click to hide internal directories.