Documentation ¶
Index ¶
- Constants
- type CustomerSubscriber
- type DataType
- type MemoryQueue
- type MessageQueueService
- func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom
- func (ms *MessageQueueService) OnInit() error
- func (ms *MessageQueueService) OnRelease()
- func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error
- func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error
- func (ms *MessageQueueService) ReadCfg() error
- func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist)
- type MongoPersist
- func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData
- func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string
- func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64
- func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64
- func (mp *MongoPersist) GetNowTime() string
- func (mp *MongoPersist) IsSameDay(timestamp1 int64, timestamp2 int64) bool
- func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool, string)
- func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool)
- func (mp *MongoPersist) OnExit()
- func (mp *MongoPersist) OnInit() error
- func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData)
- func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)
- func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64)
- func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool)
- func (mp *MongoPersist) ReadCfg() error
- type QueueDataPersist
- type SubscribeMethod
- type Subscriber
- func (ss *Subscriber) Init(memoryQueueCap int32)
- func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool)
- func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData)
- func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, ...) error
- func (ss *Subscriber) UnSubscribe(customerId string)
- type TopicData
- type TopicRoom
Constants ¶
View Source
const CustomerCollectName = "SysCustomer"
View Source
const DefaultMaxTopicBacklogNum = 100000 //处理的channel最大数量
View Source
const DefaultMemoryQueueLen = 50000 //内存的最大长度
View Source
const DefaultOnceProcessTopicDataNum = 1024 //一次处理的topic数量,考虑批量落地的数量
View Source
const DefaultOneBatchQuantity = 1000
View Source
const MaxDays = 180
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CustomerSubscriber ¶
type CustomerSubscriber struct { rpc.IRpcHandler StartIndex uint64 // contains filtered or unexported fields }
func (*CustomerSubscriber) LoadLastIndex ¶
func (cs *CustomerSubscriber) LoadLastIndex()
func (*CustomerSubscriber) Subscribe ¶
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error
开始订阅
func (*CustomerSubscriber) SubscribeRun ¶
func (cs *CustomerSubscriber) SubscribeRun()
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
func (*MemoryQueue) FindData ¶
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool)
FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查
func (*MemoryQueue) Init ¶
func (mq *MemoryQueue) Init(cap int32)
type MessageQueueService ¶
type MessageQueueService struct { service.Service sync.Mutex // contains filtered or unexported fields }
func (*MessageQueueService) GetTopicRoom ¶
func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom
func (*MessageQueueService) OnInit ¶
func (ms *MessageQueueService) OnInit() error
func (*MessageQueueService) OnRelease ¶
func (ms *MessageQueueService) OnRelease()
func (*MessageQueueService) RPC_Publish ¶
func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error
func (*MessageQueueService) RPC_Subscribe ¶
func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error
func (*MessageQueueService) ReadCfg ¶
func (ms *MessageQueueService) ReadCfg() error
func (*MessageQueueService) Setup ¶
func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist)
type MongoPersist ¶
func (*MongoPersist) FindTopicData ¶
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData
FindTopicData 查找数据
func (*MongoPersist) GetDateByIndex ¶
func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string
func (*MongoPersist) GetIndex ¶
func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64
GetIndex 通过topic数据获取进度索引号
func (*MongoPersist) GetNextIndex ¶
func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64
func (*MongoPersist) GetNowTime ¶
func (mp *MongoPersist) GetNowTime() string
func (*MongoPersist) IsSameDay ¶ added in v1.19.1
func (mp *MongoPersist) IsSameDay(timestamp1 int64, timestamp2 int64) bool
func (*MongoPersist) IsYesterday ¶
func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool, string)
func (*MongoPersist) LoadCustomerIndex ¶
func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool)
LoadCustomerIndex false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
func (*MongoPersist) OnExit ¶
func (mp *MongoPersist) OnExit()
func (*MongoPersist) OnInit ¶
func (mp *MongoPersist) OnInit() error
func (*MongoPersist) OnPushTopicDataToCustomer ¶
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData)
OnPushTopicDataToCustomer 当推送数据到Customer时回调
func (*MongoPersist) OnReceiveTopicData ¶
func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)
OnReceiveTopicData 当收到推送过来的数据时
func (*MongoPersist) PersistIndex ¶
func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64)
PersistIndex 持久化进度索引号
func (*MongoPersist) PersistTopicData ¶
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool)
PersistTopicData 持久化数据
func (*MongoPersist) ReadCfg ¶
func (mp *MongoPersist) ReadCfg() error
type QueueDataPersist ¶
type QueueDataPersist interface { service.IModule OnExit() OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功 LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号 }
type SubscribeMethod ¶
type SubscribeMethod = int32
const ( MethodCustom SubscribeMethod = 0 //自定义模式,以消费者设置的StartIndex开始获取或订阅 MethodLast SubscribeMethod = 1 //Last模式,以该消费者上次记录的位置开始订阅 )
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
订阅器
func (*Subscriber) Init ¶
func (ss *Subscriber) Init(memoryQueueCap int32)
func (*Subscriber) PersistTopicData ¶
func (*Subscriber) PushTopicDataToQueue ¶
func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData)
func (*Subscriber) TopicSubscribe ¶
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error
func (*Subscriber) UnSubscribe ¶
func (ss *Subscriber) UnSubscribe(customerId string)
type TopicRoom ¶
type TopicRoom struct { Subscriber //订阅器 //onceProcessTopicDataNum int //一次处理的订阅数据最大量,方便订阅器Subscriber和QueueDataProcessor批量处理 StagingBuff []TopicData // contains filtered or unexported fields }
Click to show internal directories.
Click to hide internal directories.