Documentation ¶
Index ¶
- func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error)
- func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, ...) (*sarama.Config, error)
- type AsyncProducer
- type AsyncReturn
- type ConsumerGroup
- type ConsumerGroupHandler
- func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage)
- func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
- func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string)
- func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData
- func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
- type IMsgReceiver
- type IProducer
- type KafkaAdmin
- func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error
- func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail
- func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error)
- func (ka *KafkaAdmin) HasTopic(topic string) bool
- func (ka *KafkaAdmin) RefreshTopic() error
- func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error
- type MsgData
- type Producer
- func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error
- func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage)
- func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn
- func (p *Producer) Close()
- func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
- func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error
- func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error
- type Sasl
- type SyncProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumerConfig ¶
func NewProducerConfig ¶
func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool, partitioner sarama.PartitionerConstructor) (*sarama.Config, error)
NewProducerConfig 新建producerConfig kafkaVersion kafka版本 returnErr,returnSucc 是否返回错误与成功 requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse Idempotent(幂等) 确保信息都准确写入一份副本,用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的 partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { }
type AsyncReturn ¶
type AsyncReturn struct { Msg *sarama.ProducerMessage Err error // contains filtered or unexported fields }
func (*AsyncReturn) WaitOk ¶
func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error)
type ConsumerGroup ¶
type ConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
func (*ConsumerGroup) Close ¶
func (c *ConsumerGroup) Close()
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (*ConsumerGroupHandler) AppendMsg ¶
func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage)
func (*ConsumerGroupHandler) Cleanup ¶
func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroupHandler) Flush ¶
func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string)
func (*ConsumerGroupHandler) GetMsgData ¶
func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData
func (*ConsumerGroupHandler) Setup ¶
func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
type IMsgReceiver ¶
type IMsgReceiver interface {
Receiver(msgs []*sarama.ConsumerMessage) bool
}
type KafkaAdmin ¶
type KafkaAdmin struct { sarama.ClusterAdmin // contains filtered or unexported fields }
func (*KafkaAdmin) CreateTopic ¶
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error
CreateTopic 创建主题 numPartitions分区数 replicationFactor副本数 validateOnly参数执行操作时只进行参数验证而不实际执行操作
func (*KafkaAdmin) GetTopicDetail ¶
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail
func (*KafkaAdmin) GetTopics ¶
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error)
func (*KafkaAdmin) HasTopic ¶
func (ka *KafkaAdmin) HasTopic(topic string) bool
func (*KafkaAdmin) RefreshTopic ¶
func (ka *KafkaAdmin) RefreshTopic() error
type Producer ¶
type Producer struct { service.Module sarama.SyncProducer sarama.AsyncProducer }
func (*Producer) ASyncSetup ¶
func (*Producer) AsyncPushMessage ¶
func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage)
func (*Producer) AsyncSendMessage ¶
func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn
func (*Producer) SendMessage ¶
func (*Producer) SendMessages ¶
func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error
type SyncProducer ¶
type SyncProducer struct { }
Click to show internal directories.
Click to hide internal directories.