Documentation
¶
Index ¶
- Variables
- type Client
- func (kc *Client) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error)
- func (kc *Client) CreateTopics(ctx context.Context, tarTopics []string, ops ...TopicOption) (err error)
- func (kc *Client) GetBrokers(ctx context.Context) (brokers []*sarama.Broker, err error)
- func (kc *Client) GetTopics(ctx context.Context) (topicList []string, err error)
- type ClientOptionFunc
- type Config
- type ConsumerGroup
- type CreateTopicsErr
- type Data
- type Fetch
- type HandleFunc
- type Msg
- type OptionFunc
- func ConsumerBeginAt(at int64) OptionFunc
- func ConsumerWithFetch(f Fetch) OptionFunc
- func ProducerWithBatchSize(size int) OptionFunc
- func ProducerWithMaxMessageBytes(size int) OptionFunc
- func WitchBaseAuth(user, passwd string, mechanism sarama.SASLMechanism) OptionFunc
- func WithLogger(log logger.Logger) OptionFunc
- func WithVersion(v sarama.KafkaVersion) OptionFunc
- type Producer
- type TopicOption
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client 定义KafkaClient
func NewKafkaClient ¶
func NewKafkaClient(addrs []string, ops ...OptionFunc) (*Client, func(), error)
NewKafkaClient 实例化kafka连接实例 retrun: kafkaClient, kafkaCloseFunc, error
func (*Client) CreateTopic ¶
func (kc *Client) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error)
CreateTopic 创建单个Topic
func (*Client) CreateTopics ¶
func (kc *Client) CreateTopics(ctx context.Context, tarTopics []string, ops ...TopicOption) (err error)
CreateTopics 创建topics
func (*Client) GetBrokers ¶
GetBrokers 获取集群中的borker list
type ClientOptionFunc ¶
func ClientWithLogger ¶
func ClientWithLogger(logg logger.Logger) ClientOptionFunc
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
func DefaultConfig ¶
func DefaultConfig() *Config
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup 定义消费者组类
func NewConsumerGroup ¶
func NewConsumerGroup(addrs, topics []string, groupID string, handle HandleFunc, ops ...OptionFunc) (*ConsumerGroup, error)
NewConsumerGroup 创建一个消费者实例 默认从最旧的开始消费
type CreateTopicsErr ¶
type CreateTopicsErr struct {
// contains filtered or unexported fields
}
func NewCreateTopicsErr ¶
func NewCreateTopicsErr(errs map[string]error) *CreateTopicsErr
func (CreateTopicsErr) Error ¶
func (err CreateTopicsErr) Error() string
type OptionFunc ¶
func ConsumerBeginAt ¶
func ConsumerBeginAt(at int64) OptionFunc
func ConsumerWithFetch ¶
func ConsumerWithFetch(f Fetch) OptionFunc
func ProducerWithBatchSize ¶
func ProducerWithBatchSize(size int) OptionFunc
func ProducerWithMaxMessageBytes ¶
func ProducerWithMaxMessageBytes(size int) OptionFunc
func WitchBaseAuth ¶
func WitchBaseAuth(user, passwd string, mechanism sarama.SASLMechanism) OptionFunc
WitchBaseAuth 设置基础鉴权 用户名, 密码 默认加密协议: SASL/PLAIN
func WithLogger ¶
func WithLogger(log logger.Logger) OptionFunc
func WithVersion ¶
func WithVersion(v sarama.KafkaVersion) OptionFunc
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(addrs []string, ops ...OptionFunc) (*Producer, error)
func (*Producer) AsyncPusher ¶
AsyncPusher 使用通道向kafka发送数据 TODO: @zcf discuss:这里提供了基于channel的异步生产方式, 但是生产时会产生错误, 这里错误直接打印了; 如果后期有需要, 是否可以考虑加一个用来接收error的channel
func (*Producer) BatchMsgsPush ¶
PushMsgs 批量的向kafka的同一个topic发送数据 PS: @zcf这里不对推送数量数量做限制和校验
type TopicOption ¶
type TopicOption func(*sarama.TopicDetail)
////////////////////////////////////////////////////////////////////////////////////////// TopicOption 创建topic的配置
func TopicWithConfigEntries ¶
func TopicWithConfigEntries(key, value string) TopicOption
func TopicWithNumOfReplication ¶
func TopicWithNumOfReplication(num int16) TopicOption
func TopicWithOfPartitions ¶
func TopicWithOfPartitions(num int32) TopicOption
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.