kq

package
v1.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)

Types

type ConsumeErrorHandler

type ConsumeErrorHandler func(msg kafka.Message, err error)

type ConsumeHandle

type ConsumeHandle func(key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type KqConf

type KqConf struct {
	service.ServiceConf
	Brokers     []string
	Group       string
	Topic       string
	Offset      string `json:",options=first|last,default=last"`
	Conns       int    `json:",default=1"`
	Consumers   int    `json:",default=8"`
	Processors  int    `json:",default=8"`
	MinBytes    int    `json:",default=10240"`    // 10K
	MaxBytes    int    `json:",default=10485760"` // 10M
	Username    string `json:",optional"`
	Password    string `json:",optional"`
	ForceCommit bool   `json:",default=true"`
}

type PushOption

type PushOption func(options *chunkOptions)

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(v string) error

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithErrorHandler

func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL