memory

package
v0.0.0-...-fd7de33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// ExitGroupEvent consumer=>consumer_group 表示消费者退出消费组的事件
	ExitGroupEvent = "exit_group"
	// ReportOffsetEvent consumer=>consumer_group  表示消费者向消费组上报消费进度事件
	ReportOffsetEvent = "report_offset"
	// RejoinEvent consumer_group=>consumer  表示消费组通知消费者重新加入消费组
	RejoinEvent = "rejoin"
	// RejoinAckEvent  consumer=>consumer_group  表示消费者收到重新加入消费组的指令并将offset进行上报
	RejoinAckEvent = "rejoin_ack"
	// CloseEvent consumer_group=>consumer 表示消费组关闭所有消费者,向所有消费者发出关闭事件
	CloseEvent = "close"
	// PartitionNotifyEvent consumer_group=>consumer 表示消费组向消费者下发分区情况
	PartitionNotifyEvent = "partition_notify"
	// PartitionNotifyAckEvent consumer=>consumer_group  表示消费者对消费组下发分区情况事件的确认
	PartitionNotifyAckEvent = "partition_notify_ack"

	StatusStable    = 1 // 稳定状态,可以正常的进行消费数据
	StatusBalancing = 2
	// 消费组关闭
	StatusStop = 3
	// 一个消费者正在退出消费组
	StatusStopping = 4
)

Variables

View Source
var (
	ErrReportOffsetFail    = errors.New("非平衡状态,无法上报偏移量")
	ErrConsumerGroupClosed = errors.New("消费组已经关闭")
)

Functions

func NewMQ

func NewMQ() mq.MQ

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error)

func (*Consumer) ConsumeChan

func (c *Consumer) ConsumeChan(ctx context.Context) (<-chan *mq.Message, error)

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup 表示消费组是并发安全的

func (*ConsumerGroup) Close

func (c *ConsumerGroup) Close()

func (*ConsumerGroup) JoinGroup

func (c *ConsumerGroup) JoinGroup() (*Consumer, error)

JoinGroup 加入消费组

type ConsumerPartitionAssigner

type ConsumerPartitionAssigner interface {
	// AssignPartition partitions表示分区数,返回值为map[消费者名称][]分区索引
	AssignPartition(consumers []string, partitions int) map[string][]int
}

ConsumerPartitionAssigner 此抽象是给消费组使用,用于将分区分配给消费组内的消费者。

type Event

type Event struct {
	// 事件类型
	Type string
	// 事件所需要处理的数据
	Data any
}

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func (*MQ) Close

func (m *MQ) Close() error

func (*MQ) Consumer

func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error)

func (*MQ) CreateTopic

func (m *MQ) CreateTopic(ctx context.Context, topic string, partitions int) error

func (*MQ) DeleteTopics

func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error

func (*MQ) Producer

func (m *MQ) Producer(topic string) (mq.Producer, error)

type Partition

type Partition struct {
	// contains filtered or unexported fields
}

func NewPartition

func NewPartition() *Partition

type PartitionIDGetter

type PartitionIDGetter interface {
	// PartitionID 用于Producer获取分区号,返回值就是分区号
	PartitionID(key string) int64
}

PartitionIDGetter 此抽象用于Producer获取对应分区号

type PartitionRecord

type PartitionRecord struct {
	// 属于哪个分区
	Index int
	// 消费进度
	Offset int
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResult, error)

func (*Producer) ProduceWithPartition

func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error)

type ReportData

type ReportData struct {
	Records []PartitionRecord
	ErrChan chan error
}

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

func (*Topic) Close

func (t *Topic) Close() error

Directories

Path Synopsis
consumerpartitionassigner
produceridgetter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL