Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup 定义消费者组类 nolint
func NewConsumerGroup ¶
func NewConsumerGroup(addrs, topics []string, groupID string, handle HandleFunc, ops ...OptionFunc) (*ConsumerGroup, error)
NewConsumerGroup 创建一个消费者实例 默认从最旧的开始消费
type OptionFunc ¶
func WitchBaseAuth ¶
func WitchBaseAuth(user, passwd, securityProtocol string) OptionFunc
WitchBaseAuth 设置基础鉴权 用户名, 密码 默认加密协议: SASL/PLAIN
func WithLogger ¶
func WithLogger(log logger.Logger) OptionFunc
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(addrs []string, ops ...OptionFunc) (*Producer, error)
func (*Producer) AsyncPusher ¶
AsyncPusher 使用通道向kafka发送数据 TODO: @zcf discuss:这里提供了基于channel的异步生产方式, 但是生产时会产生错误, 这里错误直接打印了; 如果后期有需要, 是否可以考虑加一个用来接收error的channel
func (*Producer) BatchMsgsPush ¶
PushMsgs 批量的向kafka的同一个topic发送数据 PS: @zcf这里不对推送数量数量做限制和校验
Click to show internal directories.
Click to hide internal directories.