Documentation ¶
Index ¶
- Constants
- Variables
- type AsyncProducer
- func (p *AsyncProducer) AsyncClose()
- func (p *AsyncProducer) Close() error
- func (p *AsyncProducer) Errors() <-chan *ProducerError
- func (p *AsyncProducer) GetClient() *AsyncProducer
- func (p *AsyncProducer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
- func (p *AsyncProducer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)
- func (p *AsyncProducer) SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
- func (p *AsyncProducer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)
- func (p *AsyncProducer) Successes() <-chan *ProducerMessage
- type AsyncProducerRepo
- type CloseFunc
- type ConsumeGroupHandler
- type ConsumeHandler
- type ConsumerError
- type ConsumerGroup
- type ConsumerGroupHandler
- type ConsumerGroupRepo
- type ConsumerMessage
- type GroupConfigInfo
- type Info
- type MarkMessageFunc
- type OptionHandler
- type PartitionConsumerRepo
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) GetClient() *Producer
- func (p *Producer) GetConfig() Info
- func (p *Producer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
- func (p *Producer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)
- func (p *Producer) SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
- func (p *Producer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)
- type ProducerConfigInfo
- type ProducerError
- type ProducerMessage
- type ProducerRepo
- type TLSConfigInfo
- type Topic
- type Trace
Constants ¶
const ( // DefaultRetryMax 默认重试时间, 单位秒. DefaultRetryMax = 10 // DefaultFlushFrequency 默认刷新频率, 500毫秒. DefaultFlushFrequency = 500 // DefaultVersion 默认卡夫卡版本 DefaultVersion = "2.8.0" )
Variables ¶
var ( ProducerEnableErr = errno.NewError("cfg.Producer.Enable is false") //nolint:nolintlint,errname AsyncProducerEnableErr = errno.NewError("cfg.AsyncProducer.Enable is false") //nolint:nolintlint,errname )
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { Client sarama.AsyncProducer // contains filtered or unexported fields }
func (*AsyncProducer) AsyncClose ¶
func (p *AsyncProducer) AsyncClose()
func (*AsyncProducer) Close ¶
func (p *AsyncProducer) Close() error
Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before process shutting down, or you may lose messages. You must call this before calling Close on the underlying client.
func (*AsyncProducer) Errors ¶
func (p *AsyncProducer) Errors() <-chan *ProducerError
Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock when the channel is full. Alternatively, you can set Producer.Return.Errors in your config to false, which prevents errors to be returned.
func (*AsyncProducer) GetClient ¶
func (p *AsyncProducer) GetClient() *AsyncProducer
func (*AsyncProducer) SendMessage ¶
func (p *AsyncProducer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
func (*AsyncProducer) SendMessageByte ¶
func (p *AsyncProducer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error)
func (*AsyncProducer) SendMessages ¶
func (p *AsyncProducer) SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
func (*AsyncProducer) SendMessagesByte ¶
func (p *AsyncProducer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error)
func (*AsyncProducer) Successes ¶
func (p *AsyncProducer) Successes() <-chan *ProducerMessage
Successes is the success output channel back to the user when Return.Successes is enabled. If Return.Successes is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.
type AsyncProducerRepo ¶
type AsyncProducerRepo interface { GetClient() *AsyncProducer // AsyncClose triggers a shutdown of the producer. The shutdown has completed // when both the Errors and Successes channels have been closed. When calling // AsyncClose, you *must* continue to read from those channels in order to // drain the results of any messages in flight. AsyncClose() // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before process // shutting down, or you may lose messages. You must call this before calling // Close on the underlying client. Close() error // SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量. SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error) // SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量. SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error) // Successes is the success output channel back to the user when Return.Successes is // enabled. If Return.Successes is true, you MUST read from this channel or the // Producer will deadlock. It is suggested that you send and read messages // together in a single select statement. Successes() <-chan *ProducerMessage // Errors is the error output channel back to the user. You MUST read from this // channel or the Producer will deadlock when the channel is full. Alternatively, // you can set Producer.Return.Errors in your config to false, which prevents // errors to be returned. Errors() <-chan *ProducerError }
func DefaultAsyncProducer ¶
func DefaultAsyncProducer() AsyncProducerRepo
func NewAsyncProducer ¶
func NewAsyncProducer(cfg Info, optionHandlers ...OptionHandler) (AsyncProducerRepo, error)
type ConsumeGroupHandler ¶
type ConsumeGroupHandler func(msg *ConsumerMessage) (error, bool)
type ConsumeHandler ¶
type ConsumeHandler func(msgChan <-chan *ConsumerMessage, errChan <-chan *ConsumerError, markFunc MarkMessageFunc) error
type ConsumerError ¶
type ConsumerError struct {
sarama.ConsumerError
}
type ConsumerGroup ¶
type ConsumerGroup struct { Client sarama.ConsumerGroup // contains filtered or unexported fields }
func (*ConsumerGroup) Close ¶
func (g *ConsumerGroup) Close() (err error)
func (*ConsumerGroup) Consume ¶
func (g *ConsumerGroup) Consume(topics []Topic, handler ConsumeGroupHandler) error
Consume 消费者, 这个是一个阻塞的动作. 应该包裹在一个for循环中. for循环结束记得调用cancel.
func (*ConsumerGroup) Errors ¶
func (g *ConsumerGroup) Errors() <-chan error
Errors returns a read channel of errors that occurred during the Consumer life-cycle. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
func (*ConsumerGroup) GetClient ¶
func (g *ConsumerGroup) GetClient() *ConsumerGroup
func (*ConsumerGroup) GetConfig ¶
func (p *ConsumerGroup) GetConfig() Info
type ConsumerGroupHandler ¶
type ConsumerGroupHandler interface { sarama.ConsumerGroupHandler }
type ConsumerGroupRepo ¶
type ConsumerGroupRepo interface { Close() error GetClient() *ConsumerGroup // Consume 消费者, 这个是一个阻塞的动作. 应该包裹在一个for循环中. Consume(topics []Topic, handler ConsumeGroupHandler) error Errors() <-chan error GetConfig() Info }
func DefaultConsumerGroup ¶
func DefaultConsumerGroup() ConsumerGroupRepo
func NewConsumerGroup ¶
func NewConsumerGroup(cfg Info, optionHandlers ...OptionHandler) (ConsumerGroupRepo, error)
type ConsumerMessage ¶
type ConsumerMessage struct {
*sarama.ConsumerMessage
}
type GroupConfigInfo ¶
type Info ¶
type Info struct { // 连接地址, 集群则配置多个. 必填. BrokerList []string `toml:"BrokerList" json:"BrokerList"` // 生产者相关配置. Producer ProducerConfigInfo `toml:"Producer" json:"Producer"` AsyncProducer ProducerConfigInfo `toml:"AsyncProducer" json:"AsyncProducer"` TLS TLSConfigInfo `toml:"TLS" json:"TLS"` Group GroupConfigInfo `toml:"Group" json:"Group"` Version string `toml:"Version" json:"Version"` }
type MarkMessageFunc ¶
type MarkMessageFunc func(msg *ConsumerMessage, metadata string)
type OptionHandler ¶
type OptionHandler func(*option)
type PartitionConsumerRepo ¶
type PartitionConsumerRepo interface { // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this // function, or Close before a Consumer object passes out of scope, as it will otherwise leak memory. You must call // this before calling Close on the underlying client. AsyncClose() // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service // the Messages channel when this function is called, you will be competing with Close for messages; consider // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a Consumer object passes // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client. Close() error // Messages returns the read channel for the messages that are returned by // the broker. Messages() <-chan *ConsumerMessage // Errors returns a read channel of errors that occurred during consuming, if // enabled. By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan *ConsumerError // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 }
type Producer ¶
type Producer struct { Client sarama.SyncProducer // contains filtered or unexported fields }
func (*Producer) SendMessage ¶
func (p *Producer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error)
SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
func (*Producer) SendMessageByte ¶
func (*Producer) SendMessages ¶
func (p *Producer) SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error)
SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量.
func (*Producer) SendMessagesByte ¶
type ProducerConfigInfo ¶
type ProducerConfigInfo struct { Enable bool `toml:"Enable" json:"Enable"` // RequiredAcks 两种模式都要配置.要求填数字. 因为有0值, 所以不用int型, 以免产生歧义. 不填有默认值. RequiredAcks string `toml:"RequiredAcks" json:"RequiredAcks"` // RetryMax 同步模式时的重试次数. 要求填数字, 不填有默认值 RetryMax int `toml:"RetryMax" json:"RetryMax"` // Compression 异步模式时配置 要求填数字. 因为有0值, 所以不用int型, 以免产生歧义. 不填有默认值. Compression string `toml:"Compression" json:"Compression"` // 异步模式时刷新频率, 单位毫秒. 不填有默认值 FlushFrequency int64 `toml:"FlushFrequency" json:"FlushFrequency"` }
type ProducerError ¶
type ProducerError struct {
sarama.ProducerError
}
type ProducerMessage ¶
type ProducerMessage struct {
sarama.ProducerMessage
}
type ProducerRepo ¶
type ProducerRepo interface { // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. Close() error GetClient() *Producer // SendMessage 发送一条消息, 返回一个消息指针. 关注 msg.PartitionConsumer, msg.Offset 两个变量. SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error) // SendMessages 批量发送消息, 返回消息指针数组. 关注 msg.PartitionConsumer, msg.Offset 两个变量. SendMessages(topic Topic, key string, values ...interface{}) (msgList []*ProducerMessage, err error) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error) }
func DefaultProducer ¶
func DefaultProducer() ProducerRepo
func NewProducer ¶
func NewProducer(cfg Info, optionHandlers ...OptionHandler) (ProducerRepo, error)