Documentation ¶
Index ¶
- Constants
- Variables
- func InitAsyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func InitSyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func KafkaMsgValueEncoder(value []byte) sarama.Encoder
- func KafkaMsgValueStrEncoder(value string) sarama.Encoder
- type AsyncProducer
- type Consumer
- type KafkaMessageHandler
- type KafkaMsg
- type KafkaProducer
- type SyncProducer
Constants ¶
View Source
const ( //KafkaConsumerConnected 消费者已连接 KafkaConsumerConnected string = "connected" //KafkaConsumerDisconnected 消费者断开 KafkaConsumerDisconnected string = "disconnected" )
View Source
const ( //生产者已连接 KafkaProducerConnected string = "connected" //生产者已断开 KafkaProducerDisconnected string = "disconnected" //生产者已关闭 KafkaProducerClosed string = "closed" DefaultKafkaAsyncProducer = "default-kafka-async-producer" DefaultKafkaSyncProducer = "default-kafka-sync-producer" )
Variables ¶
View Source
var ( ErrProduceTimeout = errors.New("push message timeout") KafkaStdLogger stdLogger )
Functions ¶
func InitAsyncKafkaProducer ¶
初始化异步生产者
func InitSyncKafkaProducer ¶
func KafkaMsgValueEncoder ¶
func KafkaMsgValueStrEncoder ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { KafkaProducer AsyncProducer *sarama.AsyncProducer }
异步生产者
func GetKafkaAsyncProducer ¶
func GetKafkaAsyncProducer(name string) *AsyncProducer
func (*AsyncProducer) Close ¶
func (asyncProducer *AsyncProducer) Close() error
func (*AsyncProducer) Send ¶
func (asyncProducer *AsyncProducer) Send(msg *sarama.ProducerMessage) error
SendMsg 同步发送消息到 kafka
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func StartKafkaConsumer ¶
func StartKafkaConsumer(hosts, topics []string, groupID string, config *cluster.Config, f KafkaMessageHandler) (*Consumer, error)
启动消费者
type KafkaMessageHandler ¶
type KafkaMessageHandler func(message *sarama.ConsumerMessage) (bool, error)
KafkaMessageHandler 消费者回调函数
type KafkaProducer ¶
type SyncProducer ¶
type SyncProducer struct { KafkaProducer SyncProducer *sarama.SyncProducer }
同步生产者
func GetKafkaSyncProducer ¶
func GetKafkaSyncProducer(name string) *SyncProducer
func (*SyncProducer) Close ¶
func (syncProducer *SyncProducer) Close() error
func (*SyncProducer) Send ¶
func (syncProducer *SyncProducer) Send(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMsg 同步发送消息到 kafka
func (*SyncProducer) SendMessages ¶
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors)
SendMsgs 同步发送消息到 kafka
Click to show internal directories.
Click to hide internal directories.