Documentation ¶
Index ¶
- Constants
- Variables
- func NewMQ() mq.MQ
- type Consumer
- type ConsumerGroup
- type ConsumerPartitionAssigner
- type Event
- type MQ
- func (m *MQ) Close() error
- func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error)
- func (m *MQ) CreateTopic(ctx context.Context, topic string, partitions int) error
- func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error
- func (m *MQ) Producer(topic string) (mq.Producer, error)
- type Partition
- type PartitionIDGetter
- type PartitionRecord
- type Producer
- type ReportData
- type Topic
Constants ¶
View Source
const ( // ExitGroupEvent consumer=>consumer_group 表示消费者退出消费组的事件 ExitGroupEvent = "exit_group" // ReportOffsetEvent consumer=>consumer_group 表示消费者向消费组上报消费进度事件 ReportOffsetEvent = "report_offset" // RejoinEvent consumer_group=>consumer 表示消费组通知消费者重新加入消费组 RejoinEvent = "rejoin" // RejoinAckEvent consumer=>consumer_group 表示消费者收到重新加入消费组的指令并将offset进行上报 RejoinAckEvent = "rejoin_ack" // CloseEvent consumer_group=>consumer 表示消费组关闭所有消费者,向所有消费者发出关闭事件 CloseEvent = "close" // PartitionNotifyEvent consumer_group=>consumer 表示消费组向消费者下发分区情况 PartitionNotifyEvent = "partition_notify" // PartitionNotifyAckEvent consumer=>consumer_group 表示消费者对消费组下发分区情况事件的确认 PartitionNotifyAckEvent = "partition_notify_ack" StatusStable = 1 // 稳定状态,可以正常的进行消费数据 StatusBalancing = 2 // 消费组关闭 StatusStop = 3 // 一个消费者正在退出消费组 StatusStopping = 4 )
Variables ¶
View Source
var ( ErrReportOffsetFail = errors.New("非平衡状态,无法上报偏移量") ErrConsumerGroupClosed = errors.New("消费组已经关闭") )
Functions ¶
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup 表示消费组是并发安全的
func (*ConsumerGroup) Close ¶
func (c *ConsumerGroup) Close()
func (*ConsumerGroup) JoinGroup ¶
func (c *ConsumerGroup) JoinGroup() (*Consumer, error)
JoinGroup 加入消费组
type ConsumerPartitionAssigner ¶
type ConsumerPartitionAssigner interface { // AssignPartition partitions表示分区数,返回值为map[消费者名称][]分区索引 AssignPartition(consumers []string, partitions int) map[string][]int }
ConsumerPartitionAssigner 此抽象是给消费组使用,用于将分区分配给消费组内的消费者。
type Partition ¶
type Partition struct {
// contains filtered or unexported fields
}
func NewPartition ¶
func NewPartition() *Partition
type PartitionIDGetter ¶
type PartitionIDGetter interface { // PartitionID 用于Producer获取分区号,返回值就是分区号 PartitionID(key string) int64 }
PartitionIDGetter 此抽象用于Producer获取对应分区号
type PartitionRecord ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
type ReportData ¶
type ReportData struct { Records []PartitionRecord ErrChan chan error }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.