kq

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: MIT Imports: 25 Imported by: 26

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)

Types

type ConsumeErrorHandler added in v1.2.0

type ConsumeErrorHandler func(ctx context.Context, msg kafka.Message, err error)

type ConsumeHandle

type ConsumeHandle func(ctx context.Context, key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(ctx context.Context, key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type KqConf

type KqConf struct {
	service.ServiceConf
	Brokers       []string
	Group         string
	Topic         string
	CaFile        string `json:",optional"`
	Offset        string `json:",options=first|last,default=last"`
	Conns         int    `json:",default=1"`
	Consumers     int    `json:",default=8"`
	Processors    int    `json:",default=8"`
	MinBytes      int    `json:",default=10240"`    // 10K
	MaxBytes      int    `json:",default=10485760"` // 10M
	Username      string `json:",optional"`
	Password      string `json:",optional"`
	ForceCommit   bool   `json:",default=true"`
	CommitInOrder bool   `json:",default=false"`
}

type PushOption

type PushOption func(options *pushOptions)

func WithAllowAutoTopicCreation added in v1.2.0

func WithAllowAutoTopicCreation() PushOption

WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.

func WithBalancer added in v1.2.2

func WithBalancer(balancer kafka.Balancer) PushOption

WithBalancer customizes the Pusher with the given balancer.

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

WithChunkSize customizes the Pusher with the given chunk size.

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

WithFlushInterval customizes the Pusher with the given flush interval.

func WithSyncPush added in v1.2.2

func WithSyncPush() PushOption

WithSyncPush enables the Pusher to push messages synchronously.

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

NewPusher returns a Pusher with the given Kafka addresses and topic.

func (*Pusher) Close

func (p *Pusher) Close() error

Close closes the Pusher and releases any resources used by it.

func (*Pusher) KPush added in v1.2.2

func (p *Pusher) KPush(ctx context.Context, k, v string) error

KPush sends a message to the Kafka topic.

func (*Pusher) Name

func (p *Pusher) Name() string

Name returns the name of the Kafka topic that the Pusher is sending messages to.

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, v string) error

Push sends a message to the Kafka topic.

func (*Pusher) PushWithKey added in v1.2.2

func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error

PushWithKey sends a message with the given key to the Kafka topic.

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithErrorHandler added in v1.2.0

func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL