kafka

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HeadersFromContext

func HeadersFromContext(ctx context.Context, headers ...kafka.Header) ([]kafka.Header, bool)

func NewHeadersContext

func NewHeadersContext(ctx context.Context, headers ...kafka.Header) context.Context

func WithSync

func WithSync() queue.CallOptions

Types

type Conf

type Conf struct {
	Brokers    []string
	Group      string
	Topic      string
	Offset     string `json:",options=first|last,default=last"`
	Conns      int    `json:",default=1"`
	Consumers  int    `json:",default=8"`
	Processors int    `json:",default=8"`
	MinBytes   int    `json:",default=10240"`    // 10K
	MaxBytes   int    `json:",default=10485760"` // 10M
}

type HeaderKey

type HeaderKey struct{}

type Headers added in v0.0.11

type Headers struct {
	// contains filtered or unexported fields
}

func (*Headers) Get added in v0.0.11

func (h *Headers) Get(key string) string

func (*Headers) Keys added in v0.0.11

func (h *Headers) Keys() []string

func (*Headers) Set added in v0.0.11

func (h *Headers) Set(key string, value string)

type PushOption

type PushOption func(options *chunkOptions)

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, k, v []byte, opts ...queue.CallOptions) (
	interface{}, error)

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

type Queues

type Queues struct {
	// contains filtered or unexported fields
}

func MustNewQueue

func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues

func NewQueue

func NewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) (*Queues, error)

func (Queues) Start

func (q Queues) Start()

func (Queues) Stop

func (q Queues) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL