Documentation ¶
Index ¶
- Constants
- Variables
- type LocalPartition
- func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition
- func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time
- func (p *LocalPartition) HasData() bool
- func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, ...) (err error)
- func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool)
- func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64)
- func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error
- func (p *LocalPartition) Shutdown()
- func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, ...) error
- func (p *LocalPartition) WaitUntilNoPublishers()
- type LocalPartitionPublishers
- type LocalPartitionSubscribers
- type LocalPublisher
- type LocalSubscriber
- type LocalTopic
- type LocalTopicManager
- func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition)
- func (manager *LocalTopicManager) ClosePublishers(topic Topic, unixTsNs int64) (removed bool)
- func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64) (removed bool)
- func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats
- func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition
- func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool)
- func (manager *LocalTopicManager) RemoveTopic(topic Topic)
- func (manager *LocalTopicManager) WaitUntilNoPublishers(topic Topic)
- type Partition
- type Topic
- type TopicPartition
Constants ¶
View Source
const PartitionCount = 4096
Variables ¶
View Source
var TIME_FORMAT = "2006-01-02-15-04-05"
Functions ¶
This section is empty.
Types ¶
type LocalPartition ¶
type LocalPartition struct { ListenersWaits int64 AckTsNs int64 // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond Partition LogBuffer *log_buffer.LogBuffer Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers // contains filtered or unexported fields }
func NewLocalPartition ¶
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition
func (*LocalPartition) GetEarliestInMemoryMessagePosition ¶
func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition
func (*LocalPartition) GetEarliestMessageTimeInMemory ¶
func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time
func (*LocalPartition) HasData ¶
func (p *LocalPartition) HasData() bool
func (*LocalPartition) MaybeConnectToFollowers ¶
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error)
func (*LocalPartition) MaybeShutdownLocalPartition ¶
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool)
func (*LocalPartition) NotifyLogFlushed ¶
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64)
func (*LocalPartition) Publish ¶
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error
func (*LocalPartition) Shutdown ¶
func (p *LocalPartition) Shutdown()
func (*LocalPartition) Subscribe ¶
func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error
func (*LocalPartition) WaitUntilNoPublishers ¶
func (p *LocalPartition) WaitUntilNoPublishers()
type LocalPartitionPublishers ¶
type LocalPartitionPublishers struct {
// contains filtered or unexported fields
}
func NewLocalPartitionPublishers ¶
func NewLocalPartitionPublishers() *LocalPartitionPublishers
func (*LocalPartitionPublishers) AddPublisher ¶
func (p *LocalPartitionPublishers) AddPublisher(clientName string, publisher *LocalPublisher)
func (*LocalPartitionPublishers) RemovePublisher ¶
func (p *LocalPartitionPublishers) RemovePublisher(clientName string)
func (*LocalPartitionPublishers) SignalShutdown ¶
func (p *LocalPartitionPublishers) SignalShutdown()
func (*LocalPartitionPublishers) Size ¶
func (p *LocalPartitionPublishers) Size() int
type LocalPartitionSubscribers ¶
type LocalPartitionSubscribers struct { Subscribers map[string]*LocalSubscriber SubscribersLock sync.RWMutex }
func NewLocalPartitionSubscribers ¶
func NewLocalPartitionSubscribers() *LocalPartitionSubscribers
func (*LocalPartitionSubscribers) AddSubscriber ¶
func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber)
func (*LocalPartitionSubscribers) RemoveSubscriber ¶
func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string)
func (*LocalPartitionSubscribers) SignalShutdown ¶
func (p *LocalPartitionSubscribers) SignalShutdown()
func (*LocalPartitionSubscribers) Size ¶
func (p *LocalPartitionSubscribers) Size() int
type LocalPublisher ¶
type LocalPublisher struct { }
func NewLocalPublisher ¶
func NewLocalPublisher() *LocalPublisher
func (*LocalPublisher) SignalShutdown ¶
func (p *LocalPublisher) SignalShutdown()
type LocalSubscriber ¶
type LocalSubscriber struct {
// contains filtered or unexported fields
}
func NewLocalSubscriber ¶
func NewLocalSubscriber() *LocalSubscriber
func (*LocalSubscriber) SignalShutdown ¶
func (p *LocalSubscriber) SignalShutdown()
type LocalTopic ¶
type LocalTopic struct { Topic Partitions []*LocalPartition }
func NewLocalTopic ¶
func NewLocalTopic(topic Topic) *LocalTopic
func (*LocalTopic) WaitUntilNoPublishers ¶
func (localTopic *LocalTopic) WaitUntilNoPublishers()
type LocalTopicManager ¶
type LocalTopicManager struct {
// contains filtered or unexported fields
}
LocalTopicManager manages topics on local broker
func NewLocalTopicManager ¶
func NewLocalTopicManager() *LocalTopicManager
NewLocalTopicManager creates a new LocalTopicManager
func (*LocalTopicManager) AddLocalPartition ¶
func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition)
AddLocalPartition adds a topic to the local topic manager
func (*LocalTopicManager) ClosePublishers ¶
func (manager *LocalTopicManager) ClosePublishers(topic Topic, unixTsNs int64) (removed bool)
func (*LocalTopicManager) CloseSubscribers ¶
func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64) (removed bool)
func (*LocalTopicManager) CollectStats ¶
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats
func (*LocalTopicManager) GetLocalPartition ¶
func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition
GetLocalPartition gets a topic from the local topic manager
func (*LocalTopicManager) RemoveLocalPartition ¶
func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool)
func (*LocalTopicManager) RemoveTopic ¶
func (manager *LocalTopicManager) RemoveTopic(topic Topic)
RemoveTopic removes a topic from the local topic manager
func (*LocalTopicManager) WaitUntilNoPublishers ¶
func (manager *LocalTopicManager) WaitUntilNoPublishers(topic Topic)
type Partition ¶
type Partition struct { RangeStart int32 RangeStop int32 // exclusive RingSize int32 UnixTimeNs int64 // in nanoseconds }
func FromPbPartition ¶
func NewPartition ¶
func SplitPartitions ¶
func (Partition) ToPbPartition ¶
type Topic ¶
func FromPbTopic ¶
type TopicPartition ¶
func (*TopicPartition) String ¶
func (tp *TopicPartition) String() string
Click to show internal directories.
Click to hide internal directories.