Documentation ¶
Index ¶
- func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition
- type ConsumerGroup
- func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)
- func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment)
- func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)
- func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, ...)
- type ConsumerGroupInstance
- type Coordinator
- func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance
- func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups
- func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
- func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats)
- func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats)
- func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic)
- func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic)
- type PartitionConsumerMapping
- type PartitionSlotToConsumerInstance
- type PartitionSlotToConsumerInstanceList
- type TopicConsumerGroups
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ToPartitions ¶
func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct { // map a consumer group instance id to a consumer group instance ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] // contains filtered or unexported fields }
func NewConsumerGroup ¶
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup
func (*ConsumerGroup) OnAddConsumerGroupInstance ¶
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)
func (*ConsumerGroup) OnPartitionListChange ¶
func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment)
func (*ConsumerGroup) OnRemoveConsumerGroupInstance ¶
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)
func (*ConsumerGroup) RebalanceConsumberGroupInstances ¶
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string)
type ConsumerGroupInstance ¶
type ConsumerGroupInstance struct { InstanceId string // the consumer group instance may not have an active partition Partitions []*topic.Partition ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse }
func NewConsumerGroupInstance ¶
func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance
type Coordinator ¶
type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] // contains filtered or unexported fields }
func NewCoordinator ¶
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator
func (*Coordinator) AddSubscriber ¶
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance
func (*Coordinator) GetTopicConsumerGroups ¶
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups
func (*Coordinator) OnPartitionChange ¶
func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
func (*Coordinator) OnSubAddBroker ¶
func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats)
OnSubAddBroker is called when a broker is added to the balancer
func (*Coordinator) OnSubRemoveBroker ¶
func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats)
OnSubRemoveBroker is called when a broker is removed from the balancer
func (*Coordinator) RemoveSubscriber ¶
func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic)
func (*Coordinator) RemoveTopic ¶
func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic)
type PartitionConsumerMapping ¶
type PartitionConsumerMapping struct {
// contains filtered or unexported fields
}
func NewPartitionConsumerMapping ¶
func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping
func (*PartitionConsumerMapping) BalanceToConsumerInstanceIds ¶
func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string)
type PartitionSlotToConsumerInstanceList ¶
type PartitionSlotToConsumerInstanceList struct { PartitionSlots []*PartitionSlotToConsumerInstance RingSize int32 Version int64 }
func NewPartitionSlotToConsumerInstanceList ¶
func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *PartitionSlotToConsumerInstanceList
type TopicConsumerGroups ¶
type TopicConsumerGroups struct { // map a consumer group name to a consumer group ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] }
Click to show internal directories.
Click to hide internal directories.