kafka

package
v0.0.0-...-1dedb3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(c kq.KqConf, consumerName string, handler kq.ConsumeHandler)

Consume Brokers: kafka 的多个 Broker 节点 Group:消费者组 Topic:订阅的 Topic 主题 Offset:如果新的 topic kafka 没有对应的 offset 信息,或者当前的 offset 无效了(历史数据被删除),那么需要指定从头(first)消费还是从尾(last)部消费 Conns: 一个 kafka queue 对应可对应多个 consumer,Conns 对应 kafka queue 数量,可以同时初始化多个 kafka queue,默认只启动一个 Consumers : go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量) Processors: 当 Consumers 中的多个 goroutine 将 kafka 消息拉取到进程内部的 channel 后,我们要真正消费消息写入我们自己逻辑,go-queue 内部通过此参数控制当前消费的并发 goroutine 数量 MinBytes: fetch 一次返回的最小字节数,如果不够这个字节数就等待. MaxBytes: fetch 一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证 consumer 的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到 broker 的message.max.bytes限制,以及 topic 的max.message.bytes的限制 CommitInOrder: 顺序处理

func DelaySetUp

func DelaySetUp(redis *redis.Redis, batchSize int, prefix string)

func NewProducer

func NewProducer(topic string, opts ...kq.PushOption) (*kq.Pusher, error)

func Push

func Push(ctx context.Context, topic string, data any, sync ...bool) error

func PushDelay

func PushDelay(ctx context.Context, topic string, data any, delayDuration time.Duration) error

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL