messagequeueservice

package
v1.21.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const CustomerCollectName = "SysCustomer"
View Source
const DefaultMaxTopicBacklogNum = 100000 //处理的channel最大数量
View Source
const DefaultMemoryQueueLen = 50000 //内存的最大长度
View Source
const DefaultOnceProcessTopicDataNum = 1024 //一次处理的topic数量,考虑批量落地的数量
View Source
const DefaultOneBatchQuantity = 1000
View Source
const MaxDays = 180

Variables

This section is empty.

Functions

This section is empty.

Types

type CustomerSubscriber

type CustomerSubscriber struct {
	rpc.IRpcHandler

	StartIndex uint64
	// contains filtered or unexported fields
}

func (*CustomerSubscriber) LoadLastIndex

func (cs *CustomerSubscriber) LoadLastIndex()

func (*CustomerSubscriber) Subscribe

func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error

开始订阅

func (*CustomerSubscriber) SubscribeRun

func (cs *CustomerSubscriber) SubscribeRun()

func (*CustomerSubscriber) UnSubscribe

func (cs *CustomerSubscriber) UnSubscribe()

取消订阅

type DataType added in v1.19.8

type DataType interface {
	int | uint | int64 | uint64 | float32 | float64 | int32 | uint32 | int16 | uint16
}

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

func (*MemoryQueue) FindData

func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool)

FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查

func (*MemoryQueue) Init

func (mq *MemoryQueue) Init(cap int32)

func (*MemoryQueue) Push

func (mq *MemoryQueue) Push(topicData *TopicData) bool

从队尾Push数据

type MessageQueueService

type MessageQueueService struct {
	service.Service

	sync.Mutex
	// contains filtered or unexported fields
}

func (*MessageQueueService) GetTopicRoom

func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom

func (*MessageQueueService) OnInit

func (ms *MessageQueueService) OnInit() error

func (*MessageQueueService) OnRelease

func (ms *MessageQueueService) OnRelease()

func (*MessageQueueService) RPC_Publish

func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error

func (*MessageQueueService) RPC_Subscribe

func (*MessageQueueService) ReadCfg

func (ms *MessageQueueService) ReadCfg() error

func (*MessageQueueService) Setup

func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist)

type MongoPersist

type MongoPersist struct {
	service.Module
	// contains filtered or unexported fields
}

func (*MongoPersist) FindTopicData

func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData

FindTopicData 查找数据

func (*MongoPersist) GetDateByIndex

func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string

func (*MongoPersist) GetIndex

func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64

GetIndex 通过topic数据获取进度索引号

func (*MongoPersist) GetNextIndex

func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64

func (*MongoPersist) GetNowTime

func (mp *MongoPersist) GetNowTime() string

func (*MongoPersist) IsSameDay added in v1.19.1

func (mp *MongoPersist) IsSameDay(timestamp1 int64, timestamp2 int64) bool

func (*MongoPersist) IsYesterday

func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool, string)

func (*MongoPersist) LoadCustomerIndex

func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool)

LoadCustomerIndex false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)

func (*MongoPersist) OnExit

func (mp *MongoPersist) OnExit()

func (*MongoPersist) OnInit

func (mp *MongoPersist) OnInit() error

func (*MongoPersist) OnPushTopicDataToCustomer

func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData)

OnPushTopicDataToCustomer 当推送数据到Customer时回调

func (*MongoPersist) OnReceiveTopicData

func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)

OnReceiveTopicData 当收到推送过来的数据时

func (*MongoPersist) PersistIndex

func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64)

PersistIndex 持久化进度索引号

func (*MongoPersist) PersistTopicData

func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool)

PersistTopicData 持久化数据

func (*MongoPersist) ReadCfg

func (mp *MongoPersist) ReadCfg() error

type QueueDataPersist

type QueueDataPersist interface {
	service.IModule

	OnExit()
	OnReceiveTopicData(topic string, topicData []TopicData)                                                //当收到推送过来的数据时
	OnPushTopicDataToCustomer(topic string, topicData []TopicData)                                         //当推送数据到Customer时回调
	PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true
	FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData         //查找数据,参数bool代表数据库查找是否成功
	LoadCustomerIndex(topic string, customerId string) (uint64, bool)                                      //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
	GetIndex(topicData *TopicData) uint64                                                                  //通过topic数据获取进度索引号
	PersistIndex(topic string, customerId string, index uint64)                                            //持久化进度索引号
}

type SubscribeMethod

type SubscribeMethod = int32
const (
	MethodCustom SubscribeMethod = 0 //自定义模式,以消费者设置的StartIndex开始获取或订阅
	MethodLast   SubscribeMethod = 1 //Last模式,以该消费者上次记录的位置开始订阅
)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

订阅器

func (*Subscriber) Init

func (ss *Subscriber) Init(memoryQueueCap int32)

func (*Subscriber) PersistTopicData

func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool)

func (*Subscriber) PushTopicDataToQueue

func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData)

func (*Subscriber) TopicSubscribe

func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error

func (*Subscriber) UnSubscribe

func (ss *Subscriber) UnSubscribe(customerId string)

type TopicData

type TopicData struct {
	Seq     uint64 //序号
	RawData []byte //原始数据

	ExtendParam interface{} //扩展参数
}

func (TopicData) GetValue

func (t TopicData) GetValue() uint64

type TopicRoom

type TopicRoom struct {
	Subscriber //订阅器

	//onceProcessTopicDataNum int //一次处理的订阅数据最大量,方便订阅器Subscriber和QueueDataProcessor批量处理
	StagingBuff []TopicData
	// contains filtered or unexported fields
}

func (*TopicRoom) Init

func (tr *TopicRoom) Init(maxTopicBacklogNum int32, memoryQueueLen int32, topic string, queueWait *sync.WaitGroup, dataPersist QueueDataPersist)

maxProcessTopicBacklogNum:主题最大积压数量

func (*TopicRoom) NextSeq

func (tr *TopicRoom) NextSeq() uint64

func (*TopicRoom) Publish

func (tr *TopicRoom) Publish(data [][]byte) error

func (*TopicRoom) Stop

func (tr *TopicRoom) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL