Documentation ¶
Index ¶
- type AsyncProducer
- type Consumer
- func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error
- func (consumer *Consumer) Consume(topics string, callback func(*ConsumerMessage) bool) (err error)
- func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error
- type ConsumerConf
- type ConsumerMessage
- type ProducerConf
- type SyncProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
sarama.AsyncProducer
}
异步生产者
func (AsyncProducer) SendMessage ¶
func (producer AsyncProducer) SendMessage(topic, text string)
异步发送消息
type Consumer ¶
type Consumer struct { sarama.ConsumerGroup // contains filtered or unexported fields }
消费者
func (*Consumer) Cleanup ¶
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) Consume ¶
func (consumer *Consumer) Consume(topics string, callback func(*ConsumerMessage) bool) (err error)
开始消费
func (*Consumer) ConsumeClaim ¶
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type ConsumerConf ¶
type ConsumerConf struct { Brokers string `yaml:"brokers" json:"brokers"` Version string `yaml:"version" json:"version"` Group string `yaml:"group" json:"group"` ClientID string `yaml:"clientID" json:"clientID"` Oldest bool `yaml:"oldest" json:"oldest"` MaxWaitTimeMs int64 `yaml:"maxWaitTimeMs" json:"maxWaitTimeMs"` //fetch.wait.max.ms SessionTimeoutMs int64 `yaml:"sessionTimeoutMs" json:"sessionTimeoutMs"` //session.timeout.ms KeepAliveMs int64 `yaml:"keepAliveMs" json:"keepAliveMs"` Logger sarama.StdLogger }
消费者配置
func NewConsumerConf ¶
func NewConsumerConf() *ConsumerConf
type ConsumerMessage ¶
type ConsumerMessage struct {
*sarama.ConsumerMessage
}
type ProducerConf ¶
type ProducerConf struct { Hosts string `yaml:"hosts" json:"hosts"` ClientID string `yaml:"clientID" json:"clientID"` KeepAliveMs int64 `yaml:"keepAliveMs" json:"keepAliveMs"` ReqTimeoutMs int64 `yaml:"reqTimeoutMs" json:"reqTimeoutMs"` RetSuccesses bool `yaml:"retSuccesses" json:"retSuccesses"` }
生产者配置
func (ProducerConf) NewAsyncProducer ¶
func (x ProducerConf) NewAsyncProducer() *AsyncProducer
创建异步生产者
type SyncProducer ¶
type SyncProducer struct {
sarama.SyncProducer
}
同步生产者
func (SyncProducer) SendMessage ¶
func (producer SyncProducer) SendMessage(topic, text string) (partition int32, offset int64, err error)
同步发送消息
Click to show internal directories.
Click to hide internal directories.