confluent

package
v0.0.0-...-a8c716a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup 定义消费者组类 nolint

func NewConsumerGroup

func NewConsumerGroup(addrs, topics []string, groupID string, handle HandleFunc, ops ...OptionFunc) (*ConsumerGroup, error)

NewConsumerGroup 创建一个消费者实例 默认从最旧的开始消费

func (*ConsumerGroup) Run

func (cg *ConsumerGroup) Run(ctx context.Context) error

Run 执行消费动作

type Data

type Data struct {
	FaninTime time.Time
	Topic     string
	Key       []byte
	Value     []byte
}

type HandleFunc

type HandleFunc func(context.Context, *Data) error

type Msg

type Msg struct {
	Topic string
	Key   string
	Value []byte
}

Msg 定义消息对象

type OptionFunc

type OptionFunc func(*Config) error

func WitchBaseAuth

func WitchBaseAuth(user, passwd, securityProtocol string) OptionFunc

WitchBaseAuth 设置基础鉴权 用户名, 密码 默认加密协议: SASL/PLAIN

func WithLogger

func WithLogger(log logger.Logger) OptionFunc

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(addrs []string, ops ...OptionFunc) (*Producer, error)

func (*Producer) AsyncPusher

func (p *Producer) AsyncPusher(ctx context.Context, msgChan <-chan *Msg)

AsyncPusher 使用通道向kafka发送数据 TODO: @zcf discuss:这里提供了基于channel的异步生产方式, 但是生产时会产生错误, 这里错误直接打印了; 如果后期有需要, 是否可以考虑加一个用来接收error的channel

func (*Producer) BatchMsgsPush

func (p *Producer) BatchMsgsPush(ctx context.Context, msgs []*Msg) error

PushMsgs 批量的向kafka的同一个topic发送数据 PS: @zcf这里不对推送数量数量做限制和校验

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) SingleMsgPush

func (p *Producer) SingleMsgPush(ctx context.Context, msg *Msg) error

SinglePushMsg 向kafka的一个topic发送一个数据

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL