Documentation
¶
Overview ¶
Package kgo provides a kafka broker using kgo
Index ¶
- Variables
- func ClientID(id string) broker.Option
- func ClientPublishKey(key []byte) client.PublishOption
- func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption
- func CommitInterval(td time.Duration) broker.Option
- func Group(id string) broker.Option
- func Options(opts ...kgo.Opt) broker.Option
- func PublishKey(key []byte) broker.PublishOption
- func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption
- func SubscribeContext(ctx context.Context) broker.SubscribeOption
- func SubscribeMaxInFlight(n int) broker.SubscribeOption
- func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption
- type Broker
- func (k *Broker) Address() string
- func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error
- func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, ...) (broker.Subscriber, error)
- func (k *Broker) Client() *kgo.Client
- func (k *Broker) Connect(ctx context.Context) error
- func (k *Broker) Disconnect(ctx context.Context) error
- func (r *Broker) Health() bool
- func (k *Broker) Init(opts ...broker.Option) error
- func (r *Broker) Live() bool
- func (k *Broker) Name() string
- func (k *Broker) Options() broker.Options
- func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, ...) error
- func (r *Broker) Ready() bool
- func (k *Broker) String() string
- func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, ...) (broker.Subscriber, error)
- func (k *Broker) TopicExists(ctx context.Context, topic string) error
- type RecordCarrier
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultCommitInterval specifies how fast send commit offsets to kafka DefaultCommitInterval = 5 * time.Second // DefaultStatsInterval specifies how fast check consumer lag DefaultStatsInterval = 5 * time.Second // DefaultSubscribeMaxInflight specifies how much messages keep inflight DefaultSubscribeMaxInflight = 100 )
var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex return func(fails int) time.Duration { const ( min = 100 * time.Millisecond max = time.Second ) if fails <= 0 { return min } if fails > 10 { return max } backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() jitter := 0.8 + 0.4*rand.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) if backoff > max { return max } return backoff } }()
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
Functions ¶
func ClientPublishKey ¶
func ClientPublishKey(key []byte) client.PublishOption
ClientPublishKey set the kafka message key (client option)
func ClientPublishPromise ¶ added in v3.8.10
func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption
ClientPublishKey set the kafka message key (client option)
func CommitInterval ¶
CommitInterval specifies interval to send commits
func PublishKey ¶
func PublishKey(key []byte) broker.PublishOption
PublishKey set the kafka message key (broker option)
func PublishPromise ¶ added in v3.8.10
func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption
PublishPromise set the kafka promise func for Produce
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) broker.SubscribeOption
SubscribeContext set the context for broker.SubscribeOption
func SubscribeMaxInFlight ¶
func SubscribeMaxInFlight(n int) broker.SubscribeOption
SubscribeMaxInFlight max queued messages
func SubscribeOptions ¶ added in v3.8.2
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption
SubscribeOptions pass additional options to broker in Subscribe
Types ¶
type Broker ¶ added in v3.8.4
func (*Broker) BatchPublish ¶ added in v3.8.4
func (*Broker) BatchSubscribe ¶ added in v3.8.4
func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error)
type RecordCarrier ¶ added in v3.8.10
type RecordCarrier struct {
// contains filtered or unexported fields
}
RecordCarrier injects and extracts traces from a kgo.Record.
This type exists to satisfy the otel/propagation.TextMapCarrier interface.
func NewRecordCarrier ¶ added in v3.8.10
func NewRecordCarrier(record *kgo.Record) RecordCarrier
NewRecordCarrier creates a new RecordCarrier.
func (RecordCarrier) Get ¶ added in v3.8.10
func (c RecordCarrier) Get(key string) string
Get retrieves a single value for a given key if it exists.
func (RecordCarrier) Keys ¶ added in v3.8.10
func (c RecordCarrier) Keys() []string
Keys returns a slice of all key identifiers in the carrier.
func (RecordCarrier) Set ¶ added in v3.8.10
func (c RecordCarrier) Set(key, val string)
Set sets a header.
type Subscriber ¶ added in v3.8.42
func (*Subscriber) Client ¶ added in v3.8.42
func (s *Subscriber) Client() *kgo.Client
func (*Subscriber) Options ¶ added in v3.8.42
func (s *Subscriber) Options() broker.SubscribeOptions
func (*Subscriber) Topic ¶ added in v3.8.42
func (s *Subscriber) Topic() string
func (*Subscriber) Unsubscribe ¶ added in v3.8.42
func (s *Subscriber) Unsubscribe(ctx context.Context) error