Documentation ¶
Index ¶
- Variables
- func GetQueueTypeContainer(key string) (func() QueueType, error)
- func Register[T any](key string, v T) error
- type Ack
- type BatchObtainResult
- type Body
- type Broker
- type ComponentRegistry
- type Ctx
- type DeliveryLevelType
- type Header
- type Message
- type MessageCommit
- type MessageFinish
- type MessageId
- type MessageReceived
- type MsgId
- type Node
- type NodeId
- type OutId
- type PartitionId
- type PublishGroup
- type PublishGroupId
- type Queue
- type QueueId
- type QueueOption
- type QueueRecord
- type QueueType
- type RecordOffset
- type ReleaseChanElem
- type Reply
- type ReplyTo
- type Request
- type SubChanElem
- type SubscribeGroup
- type SubscribeGroupId
- type SubscribeGroupOption
- type TagId
- type Topic
- type TopicId
- type TopicOption
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTopicAlreadyExist = errors.New("topic already exists") ErrQueueAlreadyExist = errors.New("queue already exists") ErrPublishGroupAlreadyExist = errors.New("publishGroup already exists") ErrSubscribeGroupAlreadyExist = errors.New("subscribeGroup already exists") ErrNodeAlreadyExist = errors.New("node already exists") ErrTopicNotExist = errors.New("topic not exists") ErrQueueNotExist = errors.New("queue not exists") ErrPublishGroupNotExist = errors.New("publishGroup not exist") ErrSubscribeGroupNotExist = errors.New("subscribeGroup not exist") ErrNodeNotExist = errors.New("node not exist") ErrTopicInternalIdExceeded = errors.New("topic internal id exceeded") ErrDeliveryLevelNotMatch = errors.New("delivery level not match") ErrDeliveryLevelUnknown = errors.New("delivery level unknown") ErrDeliveryLevelIllegalOperation = errors.New("illegal delivery level operation") ErrQueueStoreUnknown = errors.New("queue store type unknown") ErrQueueStoreNotSupported = errors.New("queue store type not supported") ErrReplyTypeUnknown = errors.New("reply type unknown") ErrReplyDestinationNotExist = errors.New("reply destination not exist") ErrReplyTimeout = errors.New("reply timeout") ErrUnsupportedOperation = errors.New("unsupported operation") ErrAddonAlreadyExist = errors.New("addon already exists") ErrAddonNotExist = errors.New("addon not exists") )
Functions ¶
func GetQueueTypeContainer ¶
Types ¶
type BatchObtainResult ¶
type BatchObtainResult struct {
Requests []*Request
}
type ComponentRegistry ¶
type ComponentRegistry interface { }
type DeliveryLevelType ¶
type DeliveryLevelType byte
const ( AtMostOnce DeliveryLevelType = 0 AtLeastOnce DeliveryLevelType = 1 ExactlyOnce DeliveryLevelType = 2 )
type Header ¶
type Header struct { Dup bool DeliveryLevel DeliveryLevelType TopicId TopicId Partition PartitionId Tags []TagId ReplyTo *ReplyTo }
type MessageCommit ¶
MessageCommit is used for when DeliveryLevelType = 2
type MessageFinish ¶
type MessageFinish struct {
Ack
}
MessageFinish is used for when DeliveryLevelType = 2
type MessageReceived ¶
type MessageReceived struct {
Ack
}
MessageReceived is used for when DeliveryLevelType = 2
type Node ¶
type Node interface { SubscribeGroupInitialize(sg SubscribeGroup) error PublishGroupInitialize(pg PublishGroup) error DirectReply(reply *Reply, ctx *Ctx) error GetReplyChannel() <-chan *Reply GetNodeId() NodeId // Leave means the end of Node lifecycle Leave() error }
type PartitionId ¶
type PublishGroup ¶
type PublishGroup interface { Join(node Node) error PublishMessage(req *Request, ctx *Ctx) (Ack, error) PublishGuaranteeMessage(req *Request, ctx *Ctx) (Ack, error) PrePublishMessage(req *Request, ctx *Ctx) (MessageReceived, error) CommitMessage(req *MessageCommit, ctx *Ctx) (MessageFinish, error) Reply(reply *Reply, ctx *Ctx) error }
type PublishGroupId ¶
type Queue ¶
type Queue interface { DeliveryLevel() DeliveryLevelType QueueId() QueueId }
type QueueOption ¶
type QueueRecord ¶
type QueueRecord struct {
FromId RecordOffset
}
type QueueType ¶
type QueueType interface { PublishMessage(req *Request, ctx *Ctx) error PrePublishMessage(req *Request, ctx *Ctx) error CommitMessages(commit *MessageCommit, ctx *Ctx) error CreateRecord(subscribeGroupId SubscribeGroupId, ctx *Ctx) (*QueueRecord, error) BatchObtain(record *QueueRecord, maxCnt int, ctx *Ctx) (BatchObtainResult, error) BatchObtainReleasing(record *QueueRecord, maxCnt int, ctx *Ctx) (BatchObtainResult, error) ConfirmConsumed(record *QueueRecord, ack *Ack) error ReleaseConsumed(record *QueueRecord, ack *Ack) error Init(queue Queue, option *QueueOption) error Close(ctx context.Context) error }
type RecordOffset ¶
type ReleaseChanElem ¶
type ReleaseChanElem struct { Request *Request Queue Queue //FIXME should not expose kernel component to the api QueueRecord *QueueRecord }
type ReplyTo ¶
type ReplyTo struct { ReplyType byte //0 - non, 1 - nodeId, 2 - publishGroupId ReplyToNode NodeId ReplyToPublishGroup PublishGroupId ReplyIdentifier string }
type SubChanElem ¶
type SubChanElem struct { Request *Request Queue Queue //FIXME should not expose kernel component to the api QueueRecord *QueueRecord }
type SubscribeGroup ¶
type SubscribeGroup interface { SubscribeChannel() <-chan SubChanElem ReleaseChannel() <-chan ReleaseChanElem Join(node Node) error Commit(queueId QueueId, record *QueueRecord, ack *Ack) error Release(queueId QueueId, record *QueueRecord, ack *Ack) error }
type SubscribeGroupId ¶
type SubscribeGroupOption ¶
type Topic ¶
type Topic interface { DeliveryLevel() DeliveryLevelType TopicId() TopicId }
type TopicOption ¶
type TopicOption struct {
DeliveryLevel DeliveryLevelType
}
Click to show internal directories.
Click to hide internal directories.