kafka

package
v0.0.0-...-8fa5db6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func NewKafkaInstance

func NewKafkaInstance() *Kafka

NewKafkaInstance 返回一个新的 kafka 实例

func (*Kafka) Close

func (k *Kafka) Close()

func (*Kafka) Consume

func (k *Kafka) Consume(ctx context.Context, topic string, consumerNum int, groupID string, chanCap ...int) <-chan *Message

Consume 根据 consumerNum开启指定数量的协程, 并将消息通过 channel 传递

注意: 不要手动关闭返回的 channel

func (*Kafka) Send

func (k *Kafka) Send(ctx context.Context, topic string, messages []*Message) []error

Send 发送消息到指定的 topic

func (*Kafka) SetWriter

func (k *Kafka) SetWriter(topic string, asyncWrite ...bool) error

SetWriter 针对特定的 topic 生成一个并发安全的 writer, SetWriter 会在 topic 不存在的时候创建他

type Message

type Message struct {
	K, V []byte
}

Message 属于domain层的通用Msg. DO NOT EDIT.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL