Documentation ¶
Index ¶
- Constants
- Variables
- func InitAsyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func InitSyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func KafkaMsgValueEncoder(value []byte) sarama.Encoder
- func KafkaMsgValueStrEncoder(value string) sarama.Encoder
- type AsyncProducer
- type KafkaMsg
- type SyncProducer
Constants ¶
View Source
const ( // KafkaProducerConnectd 生产者已连接 KafkaProducerConnectd string = "connected" // KafkaProducerDisconnected 生产者已断开 KafkaProducerDisconnected string = "disconnected" // KafkaProducerClosed 生产者已关闭 KafkaProducerClosed string = "closed" )
Variables ¶
View Source
var ( ErrProduceTimeout = errors.New("push message timeout") KafkaSyncProducers = make(map[string]*SyncProducer) KafkaAsyncProducers = make(map[string]*AsyncProducer) KafkaStdLogger stdLogger )
Functions ¶
func InitAsyncKafkaProducer ¶
InitAsyncKafkaProducer 初始化异步生产者
func InitSyncKafkaProducer ¶
InitSyncKafkaProducer 初始化同步生产者
func KafkaMsgValueEncoder ¶
func KafkaMsgValueStrEncoder ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { AsyncProducer sarama.AsyncProducer // contains filtered or unexported fields }
AsyncProducer 异步生产者
func GetKafkaAsyncProducer ¶
func GetKafkaAsyncProducer(name string) *AsyncProducer
func (*AsyncProducer) Close ¶
func (asyncProducer *AsyncProducer) Close() error
func (*AsyncProducer) Send ¶
func (asyncProducer *AsyncProducer) Send(msg *sarama.ProducerMessage) error
type SyncProducer ¶
type SyncProducer struct { SyncProducer sarama.SyncProducer // contains filtered or unexported fields }
SyncProducer 同步生产者
func GetKafkaSyncProducer ¶
func GetKafkaSyncProducer(name string) *SyncProducer
func (*SyncProducer) Send ¶
func (syncProducer *SyncProducer) Send(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
func (*SyncProducer) SendMessages ¶
func (syncProducer *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) sarama.ProducerErrors
Click to show internal directories.
Click to hide internal directories.