kafka

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HeadersFromContext

func HeadersFromContext(ctx context.Context) ([]kafka.Header, bool)

func NewHeadersContext

func NewHeadersContext(ctx context.Context, headers ...kafka.Header) context.Context

func WithSync

func WithSync() queue.CallOptions

Types

type Balancer added in v0.1.3

type Balancer = kafka.Balancer

type BalancerFunc added in v0.1.3

type BalancerFunc = kafka.BalancerFunc

type CRC32Balancer added in v0.1.3

type CRC32Balancer = kafka.CRC32Balancer

type Conf

type Conf struct {
	Brokers    []string
	Group      string
	Topic      string
	Offset     string `json:",options=first|last,default=last"`
	Conns      int    `json:",default=1"`
	Consumers  int    `json:",default=8"`
	Processors int    `json:",default=8"`
	MinBytes   int    `json:",default=10240"`    // 10K
	MaxBytes   int    `json:",default=10485760"` // 10M
}

type Hash added in v0.1.3

type Hash = kafka.Hash

type Headers added in v0.0.11

type Headers struct {
	// contains filtered or unexported fields
}

func (*Headers) Get added in v0.0.11

func (h *Headers) Get(key string) string

func (*Headers) Keys added in v0.0.11

func (h *Headers) Keys() []string

func (*Headers) Set added in v0.0.11

func (h *Headers) Set(key string, value string)

type LeastBytes added in v0.1.3

type LeastBytes = kafka.LeastBytes

type Murmur2Balancer added in v0.1.3

type Murmur2Balancer = kafka.Murmur2Balancer

type PushOption

type PushOption func(options *pushOptions)

func WithBalancer added in v0.1.3

func WithBalancer(balancer Balancer) PushOption

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithCompletion added in v0.0.12

func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption

func WithDisableAutoTopicCreation added in v0.1.5

func WithDisableAutoTopicCreation() PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, k, v []byte, opts ...queue.CallOptions) (
	interface{}, error)

func (*Pusher) Start added in v0.1.4

func (p *Pusher) Start()

func (*Pusher) Stop added in v0.1.4

func (p *Pusher) Stop()

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

type Queues

type Queues struct {
	// contains filtered or unexported fields
}

func MustNewQueue

func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues

func NewQueue

func NewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) (*Queues, error)

func (Queues) Start

func (q Queues) Start()

func (Queues) Stop

func (q Queues) Stop()

type ReferenceHash added in v0.1.3

type ReferenceHash = kafka.ReferenceHash

type RoundRobin added in v0.1.3

type RoundRobin = kafka.RoundRobin

type XXHashBalancer added in v0.1.4

type XXHashBalancer struct {
	// contains filtered or unexported fields
}

func (*XXHashBalancer) Balance added in v0.1.4

func (x *XXHashBalancer) Balance(msg kafka.Message, partitions ...int) (partition int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL