topic

package
v0.0.0-...-45e1a9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const PartitionCount = 4096

Variables

View Source
var TIME_FORMAT = "2006-01-02-15-04-05"

Functions

This section is empty.

Types

type LocalPartition

type LocalPartition struct {
	ListenersWaits int64
	AckTsNs        int64

	// notifying clients
	ListenersLock sync.Mutex
	ListenersCond *sync.Cond

	Partition
	LogBuffer   *log_buffer.LogBuffer
	Publishers  *LocalPartitionPublishers
	Subscribers *LocalPartitionSubscribers
	// contains filtered or unexported fields
}

func NewLocalPartition

func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition

func (*LocalPartition) GetEarliestInMemoryMessagePosition

func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition

func (*LocalPartition) GetEarliestMessageTimeInMemory

func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time

func (*LocalPartition) HasData

func (p *LocalPartition) HasData() bool

func (*LocalPartition) MaybeConnectToFollowers

func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error)

func (*LocalPartition) MaybeShutdownLocalPartition

func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool)

func (*LocalPartition) NotifyLogFlushed

func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64)

func (*LocalPartition) Publish

func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error

func (*LocalPartition) Shutdown

func (p *LocalPartition) Shutdown()

func (*LocalPartition) Subscribe

func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
	onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error

func (*LocalPartition) WaitUntilNoPublishers

func (p *LocalPartition) WaitUntilNoPublishers()

type LocalPartitionPublishers

type LocalPartitionPublishers struct {
	// contains filtered or unexported fields
}

func NewLocalPartitionPublishers

func NewLocalPartitionPublishers() *LocalPartitionPublishers

func (*LocalPartitionPublishers) AddPublisher

func (p *LocalPartitionPublishers) AddPublisher(clientName string, publisher *LocalPublisher)

func (*LocalPartitionPublishers) RemovePublisher

func (p *LocalPartitionPublishers) RemovePublisher(clientName string)

func (*LocalPartitionPublishers) SignalShutdown

func (p *LocalPartitionPublishers) SignalShutdown()

func (*LocalPartitionPublishers) Size

func (p *LocalPartitionPublishers) Size() int

type LocalPartitionSubscribers

type LocalPartitionSubscribers struct {
	Subscribers     map[string]*LocalSubscriber
	SubscribersLock sync.RWMutex
}

func NewLocalPartitionSubscribers

func NewLocalPartitionSubscribers() *LocalPartitionSubscribers

func (*LocalPartitionSubscribers) AddSubscriber

func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber)

func (*LocalPartitionSubscribers) RemoveSubscriber

func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string)

func (*LocalPartitionSubscribers) SignalShutdown

func (p *LocalPartitionSubscribers) SignalShutdown()

func (*LocalPartitionSubscribers) Size

func (p *LocalPartitionSubscribers) Size() int

type LocalPublisher

type LocalPublisher struct {
}

func NewLocalPublisher

func NewLocalPublisher() *LocalPublisher

func (*LocalPublisher) SignalShutdown

func (p *LocalPublisher) SignalShutdown()

type LocalSubscriber

type LocalSubscriber struct {
	// contains filtered or unexported fields
}

func NewLocalSubscriber

func NewLocalSubscriber() *LocalSubscriber

func (*LocalSubscriber) SignalShutdown

func (p *LocalSubscriber) SignalShutdown()

type LocalTopic

type LocalTopic struct {
	Topic
	Partitions []*LocalPartition
}

func NewLocalTopic

func NewLocalTopic(topic Topic) *LocalTopic

func (*LocalTopic) WaitUntilNoPublishers

func (localTopic *LocalTopic) WaitUntilNoPublishers()

type LocalTopicManager

type LocalTopicManager struct {
	// contains filtered or unexported fields
}

LocalTopicManager manages topics on local broker

func NewLocalTopicManager

func NewLocalTopicManager() *LocalTopicManager

NewLocalTopicManager creates a new LocalTopicManager

func (*LocalTopicManager) AddLocalPartition

func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition)

AddLocalPartition adds a topic to the local topic manager

func (*LocalTopicManager) ClosePublishers

func (manager *LocalTopicManager) ClosePublishers(topic Topic, unixTsNs int64) (removed bool)

func (*LocalTopicManager) CloseSubscribers

func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64) (removed bool)

func (*LocalTopicManager) CollectStats

func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats

func (*LocalTopicManager) GetLocalPartition

func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition

GetLocalPartition gets a topic from the local topic manager

func (*LocalTopicManager) RemoveLocalPartition

func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool)

func (*LocalTopicManager) RemoveTopic

func (manager *LocalTopicManager) RemoveTopic(topic Topic)

RemoveTopic removes a topic from the local topic manager

func (*LocalTopicManager) WaitUntilNoPublishers

func (manager *LocalTopicManager) WaitUntilNoPublishers(topic Topic)

type Partition

type Partition struct {
	RangeStart int32
	RangeStop  int32 // exclusive
	RingSize   int32
	UnixTimeNs int64 // in nanoseconds
}

func FromPbPartition

func FromPbPartition(partition *mq_pb.Partition) Partition

func NewPartition

func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition

func SplitPartitions

func SplitPartitions(targetCount int32, ts int64) []*Partition

func (Partition) Equals

func (partition Partition) Equals(other Partition) bool

func (Partition) ToPbPartition

func (partition Partition) ToPbPartition() *mq_pb.Partition

type Topic

type Topic struct {
	Namespace string
	Name      string
}

func FromPbTopic

func FromPbTopic(topic *mq_pb.Topic) Topic

func NewTopic

func NewTopic(namespace string, name string) Topic

func (Topic) String

func (tp Topic) String() string

func (Topic) ToPbTopic

func (tp Topic) ToPbTopic() *mq_pb.Topic

type TopicPartition

type TopicPartition struct {
	Topic
	Partition
}

func (*TopicPartition) String

func (tp *TopicPartition) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL