Versions in this module Expand all Collapse all v1 v1.0.1 Aug 3, 2023 v1.0.0 Aug 3, 2023 Changes in this version + const CustomerCollectName + const DefaultMaxTopicBacklogNum + const DefaultMemoryQueueLen + const DefaultOnceProcessTopicDataNum + const DefaultOneBatchQuantity + const MaxDays + type CustomerSubscriber struct + StartIndex uint64 + func (cs *CustomerSubscriber) LoadLastIndex() + func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, ...) error + func (cs *CustomerSubscriber) SubscribeRun() + func (cs *CustomerSubscriber) UnSubscribe() + type DataType interface + type MemoryQueue struct + func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) + func (mq *MemoryQueue) Init(cap int32) + func (mq *MemoryQueue) Push(topicData *TopicData) bool + type MessageQueueService struct + func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom + func (ms *MessageQueueService) OnInit() error + func (ms *MessageQueueService) OnRelease() + func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error + func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error + func (ms *MessageQueueService) ReadCfg() error + func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist) + type MongoPersist struct + func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData + func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string + func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 + func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64 + func (mp *MongoPersist) GetNowTime() string + func (mp *MongoPersist) IsSameDay(timestamp1 int64, timestamp2 int64) bool + func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool, string) + func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool) + func (mp *MongoPersist) OnExit() + func (mp *MongoPersist) OnInit() error + func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) + func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData) + func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64) + func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) + func (mp *MongoPersist) ReadCfg() error + type QueueDataPersist interface + FindTopicData func(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData + GetIndex func(topicData *TopicData) uint64 + LoadCustomerIndex func(topic string, customerId string) (uint64, bool) + OnExit func() + OnPushTopicDataToCustomer func(topic string, topicData []TopicData) + OnReceiveTopicData func(topic string, topicData []TopicData) + PersistIndex func(topic string, customerId string, index uint64) + PersistTopicData func(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) + type SubscribeMethod = int32 + const MethodCustom + const MethodLast + type Subscriber struct + func (ss *Subscriber) Init(memoryQueueCap int32) + func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) + func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) + func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, ...) error + func (ss *Subscriber) UnSubscribe(customerId string) + type TopicData struct + ExtendParam interface{} + RawData []byte + Seq uint64 + func (t TopicData) GetValue() uint64 + type TopicRoom struct + StagingBuff []TopicData + func (tr *TopicRoom) Init(maxTopicBacklogNum int32, memoryQueueLen int32, topic string, ...) + func (tr *TopicRoom) NextSeq() uint64 + func (tr *TopicRoom) Publish(data [][]byte) error + func (tr *TopicRoom) Stop()