Documentation ¶
Index ¶
- type ConsumerGroup
- func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *ConsumerGroup) Group() string
- func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
- func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) Start()
- func (c *ConsumerGroup) Stop()
- func (c *ConsumerGroup) Topics() []string
- type KafkaConsumerConf
- type KafkaProducerConf
- type MessageHandlerF
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ConsumerGroup kafka consumer
func MustKafkaConsumer ¶
func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup
func (*ConsumerGroup) Cleanup ¶
func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroup) ConsumeClaim ¶
func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroup) Group ¶
func (c *ConsumerGroup) Group() string
func (*ConsumerGroup) RegisterHandlers ¶
func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
func (*ConsumerGroup) Setup ¶
func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ConsumerGroup) Start ¶
func (c *ConsumerGroup) Start()
Start start consume messages, watch signals
func (*ConsumerGroup) Stop ¶
func (c *ConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ConsumerGroup) Topics ¶
func (c *ConsumerGroup) Topics() []string
type KafkaConsumerConf ¶
KafkaConsumerConf kafka client settings.
type KafkaProducerConf ¶
KafkaProducerConf kafka producer settings.
type Producer ¶
type Producer struct { SyncProducer sarama.SyncProducer // contains filtered or unexported fields }
func GetCachedMQClient ¶
func GetCachedMQClient(c *KafkaProducerConf) *Producer
func MustKafkaProducer ¶
func MustKafkaProducer(c *KafkaProducerConf) *Producer
func (*Producer) SendMessage ¶
SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.
Click to show internal directories.
Click to hide internal directories.