sub_coordinator

package
v0.0.0-...-45e1a9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ToPartitions

func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition

Types

type ConsumerGroup

type ConsumerGroup struct {

	// map a consumer group instance id to a consumer group instance
	ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup

func (*ConsumerGroup) OnAddConsumerGroupInstance

func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)

func (*ConsumerGroup) OnPartitionListChange

func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment)

func (*ConsumerGroup) OnRemoveConsumerGroupInstance

func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic)

func (*ConsumerGroup) RebalanceConsumberGroupInstances

func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string)

type ConsumerGroupInstance

type ConsumerGroupInstance struct {
	InstanceId string
	// the consumer group instance may not have an active partition
	Partitions   []*topic.Partition
	ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
}

func NewConsumerGroupInstance

func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance

type Coordinator

type Coordinator struct {
	// map topic name to consumer groups
	TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator

func (*Coordinator) AddSubscriber

func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance

func (*Coordinator) GetTopicConsumerGroups

func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups

func (*Coordinator) OnPartitionChange

func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)

func (*Coordinator) OnSubAddBroker

func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats)

OnSubAddBroker is called when a broker is added to the balancer

func (*Coordinator) OnSubRemoveBroker

func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats)

OnSubRemoveBroker is called when a broker is removed from the balancer

func (*Coordinator) RemoveSubscriber

func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic)

func (*Coordinator) RemoveTopic

func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic)

type PartitionConsumerMapping

type PartitionConsumerMapping struct {
	// contains filtered or unexported fields
}

func NewPartitionConsumerMapping

func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping

func (*PartitionConsumerMapping) BalanceToConsumerInstanceIds

func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string)

type PartitionSlotToConsumerInstance

type PartitionSlotToConsumerInstance struct {
	RangeStart         int32
	RangeStop          int32
	UnixTimeNs         int64
	Broker             string
	AssignedInstanceId string
}

type PartitionSlotToConsumerInstanceList

type PartitionSlotToConsumerInstanceList struct {
	PartitionSlots []*PartitionSlotToConsumerInstance
	RingSize       int32
	Version        int64
}

func NewPartitionSlotToConsumerInstanceList

func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *PartitionSlotToConsumerInstanceList

type TopicConsumerGroups

type TopicConsumerGroups struct {
	// map a consumer group name to a consumer group
	ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL