kafkamodule

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsumerConfig

func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error)

func NewProducerConfig

func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool,
	partitioner sarama.PartitionerConstructor) (*sarama.Config, error)

NewProducerConfig 新建producerConfig kafkaVersion kafka版本 returnErr,returnSucc 是否返回错误与成功 requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse Idempotent(幂等) 确保信息都准确写入一份副本,用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的 partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列

Types

type AsyncProducer

type AsyncProducer struct {
}

type AsyncReturn

type AsyncReturn struct {
	Msg *sarama.ProducerMessage
	Err error
	// contains filtered or unexported fields
}

func (*AsyncReturn) WaitOk

func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error)

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func (*ConsumerGroup) Close

func (c *ConsumerGroup) Close()

func (*ConsumerGroup) Setup

func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

func (*ConsumerGroupHandler) AppendMsg

func (*ConsumerGroupHandler) Cleanup

func (*ConsumerGroupHandler) ConsumeClaim

func (*ConsumerGroupHandler) Flush

func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string)

func (*ConsumerGroupHandler) GetMsgData

func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData

func (*ConsumerGroupHandler) Setup

type IMsgReceiver

type IMsgReceiver interface {
	Receiver(msgs []*sarama.ConsumerMessage) bool
}

type IProducer

type IProducer interface {
}

type KafkaAdmin

type KafkaAdmin struct {
	sarama.ClusterAdmin
	// contains filtered or unexported fields
}

func (*KafkaAdmin) CreateTopic

func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error

CreateTopic 创建主题 numPartitions分区数 replicationFactor副本数 validateOnly参数执行操作时只进行参数验证而不实际执行操作

func (*KafkaAdmin) GetTopicDetail

func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail

func (*KafkaAdmin) GetTopics

func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error)

func (*KafkaAdmin) HasTopic

func (ka *KafkaAdmin) HasTopic(topic string) bool

func (*KafkaAdmin) RefreshTopic

func (ka *KafkaAdmin) RefreshTopic() error

func (*KafkaAdmin) Setup

func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error

type MsgData

type MsgData struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type Producer

func (*Producer) ASyncSetup

func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error

func (*Producer) AsyncPushMessage

func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage)

func (*Producer) AsyncSendMessage

func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) SendMessage

func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

func (*Producer) SendMessages

func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error

func (*Producer) SyncSetup

func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error

type Sasl

type Sasl struct {
	UserName   string `json:"UserName"`
	Passwd     string `json:"Passwd"`
	InstanceId string `json:"InstanceId"`
}

type SyncProducer

type SyncProducer struct {
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL