kafka

package
v0.0.0-...-a8c716a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 定义KafkaClient

func NewKafkaClient

func NewKafkaClient(addrs []string, ops ...OptionFunc) (*Client, func(), error)

NewKafkaClient 实例化kafka连接实例 retrun: kafkaClient, kafkaCloseFunc, error

func (*Client) CreateTopic

func (kc *Client) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error)

CreateTopic 创建单个Topic

func (*Client) CreateTopics

func (kc *Client) CreateTopics(ctx context.Context, tarTopics []string, ops ...TopicOption) (err error)

CreateTopics 创建topics

func (*Client) GetBrokers

func (kc *Client) GetBrokers(ctx context.Context) (brokers []*sarama.Broker, err error)

GetBrokers 获取集群中的borker list

func (*Client) GetTopics

func (kc *Client) GetTopics(ctx context.Context) (topicList []string, err error)

GetTopics 获取当前kafka中的topic

type ClientOptionFunc

type ClientOptionFunc func(*Client) error

func ClientWithLogger

func ClientWithLogger(logg logger.Logger) ClientOptionFunc

type Config

type Config struct {
	// contains filtered or unexported fields
}

func DefaultConfig

func DefaultConfig() *Config

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup 定义消费者组类

func NewConsumerGroup

func NewConsumerGroup(addrs, topics []string, groupID string, handle HandleFunc, ops ...OptionFunc) (*ConsumerGroup, error)

NewConsumerGroup 创建一个消费者实例 默认从最旧的开始消费

func (*ConsumerGroup) Run

func (cg *ConsumerGroup) Run(ctx context.Context) error

Run 执行消费动作

type CreateTopicsErr

type CreateTopicsErr struct {
	// contains filtered or unexported fields
}

func NewCreateTopicsErr

func NewCreateTopicsErr(errs map[string]error) *CreateTopicsErr

func (CreateTopicsErr) Error

func (err CreateTopicsErr) Error() string

type Data

type Data struct {
	FaninTime time.Time
	Topic     string
	Key       []byte
	Value     []byte
}

func (*Data) String

func (d *Data) String() string

String data to string

type Fetch

type Fetch struct {
	Min, Default, Max int32
}

Fetch 定义消费者拉取缓存的配置对象

type HandleFunc

type HandleFunc func(context.Context, *Data) error

type Msg

type Msg struct {
	Topic string
	Key   string
	Value []byte
}

Msg 定义消息对象

type OptionFunc

type OptionFunc func(*Config) error

func ConsumerBeginAt

func ConsumerBeginAt(at int64) OptionFunc

func ConsumerWithFetch

func ConsumerWithFetch(f Fetch) OptionFunc

func ProducerWithBatchSize

func ProducerWithBatchSize(size int) OptionFunc

func ProducerWithMaxMessageBytes

func ProducerWithMaxMessageBytes(size int) OptionFunc

func WitchBaseAuth

func WitchBaseAuth(user, passwd string, mechanism sarama.SASLMechanism) OptionFunc

WitchBaseAuth 设置基础鉴权 用户名, 密码 默认加密协议: SASL/PLAIN

func WithLogger

func WithLogger(log logger.Logger) OptionFunc

func WithVersion

func WithVersion(v sarama.KafkaVersion) OptionFunc

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(addrs []string, ops ...OptionFunc) (*Producer, error)

func (*Producer) AsyncPusher

func (p *Producer) AsyncPusher(ctx context.Context, msgChan <-chan *Msg)

AsyncPusher 使用通道向kafka发送数据 TODO: @zcf discuss:这里提供了基于channel的异步生产方式, 但是生产时会产生错误, 这里错误直接打印了; 如果后期有需要, 是否可以考虑加一个用来接收error的channel

func (*Producer) BatchMsgsPush

func (p *Producer) BatchMsgsPush(ctx context.Context, msgs []*Msg) error

PushMsgs 批量的向kafka的同一个topic发送数据 PS: @zcf这里不对推送数量数量做限制和校验

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) SingleMsgPush

func (p *Producer) SingleMsgPush(ctx context.Context, msg *Msg) (err error)

SinglePushMsg 向kafka的一个topic发送一个数据

type TopicOption

type TopicOption func(*sarama.TopicDetail)

////////////////////////////////////////////////////////////////////////////////////////// TopicOption 创建topic的配置

func TopicWithConfigEntries

func TopicWithConfigEntries(key, value string) TopicOption

func TopicWithNumOfReplication

func TopicWithNumOfReplication(num int16) TopicOption

func TopicWithOfPartitions

func TopicWithOfPartitions(num int32) TopicOption

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL