mqapi

package
v0.0.0-...-d0e3dbc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2024 License: LGPL-3.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTopicAlreadyExist          = errors.New("topic already exists")
	ErrQueueAlreadyExist          = errors.New("queue already exists")
	ErrPublishGroupAlreadyExist   = errors.New("publishGroup already exists")
	ErrSubscribeGroupAlreadyExist = errors.New("subscribeGroup already exists")
	ErrNodeAlreadyExist           = errors.New("node already exists")

	ErrTopicNotExist          = errors.New("topic not exists")
	ErrQueueNotExist          = errors.New("queue not exists")
	ErrPublishGroupNotExist   = errors.New("publishGroup not exist")
	ErrSubscribeGroupNotExist = errors.New("subscribeGroup not exist")
	ErrNodeNotExist           = errors.New("node not exist")

	ErrTopicInternalIdExceeded = errors.New("topic internal id exceeded")

	ErrDeliveryLevelNotMatch         = errors.New("delivery level not match")
	ErrDeliveryLevelUnknown          = errors.New("delivery level unknown")
	ErrDeliveryLevelIllegalOperation = errors.New("illegal delivery level operation")

	ErrQueueStoreUnknown      = errors.New("queue store type unknown")
	ErrQueueStoreNotSupported = errors.New("queue store type not supported")

	ErrReplyTypeUnknown         = errors.New("reply type unknown")
	ErrReplyDestinationNotExist = errors.New("reply destination not exist")
	ErrReplyTimeout             = errors.New("reply timeout")

	ErrUnsupportedOperation = errors.New("unsupported operation")

	ErrAddonAlreadyExist = errors.New("addon already exists")
	ErrAddonNotExist     = errors.New("addon not exists")
)

Functions

func GetQueueTypeContainer

func GetQueueTypeContainer(key string) (func() QueueType, error)

func Register

func Register[T any](key string, v T) error

Types

type Ack

type Ack struct {
	AckIdList []MessageId
	Reply     *Reply
}

Ack is used for when DeliveryLevelType = 1

type BatchObtainResult

type BatchObtainResult struct {
	Requests []*Request
}

type Body

type Body []byte

type Broker

type Broker interface {
	// contains filtered or unexported methods
}

type ComponentRegistry

type ComponentRegistry interface {
}

type Ctx

type Ctx struct {
	Context context.Context
}

type DeliveryLevelType

type DeliveryLevelType byte
const (
	AtMostOnce  DeliveryLevelType = 0
	AtLeastOnce DeliveryLevelType = 1
	ExactlyOnce DeliveryLevelType = 2
)
type Header struct {
	Dup           bool
	DeliveryLevel DeliveryLevelType

	TopicId   TopicId
	Partition PartitionId
	Tags      []TagId
	ReplyTo   *ReplyTo
}

type Message

type Message struct {
	MessageId

	Attributes map[string][]string
	Body       Body
}

type MessageCommit

type MessageCommit struct {
	Header
	Ack
}

MessageCommit is used for when DeliveryLevelType = 2

type MessageFinish

type MessageFinish struct {
	Ack
}

MessageFinish is used for when DeliveryLevelType = 2

type MessageId

type MessageId struct {
	MsgId MsgId
	OutId OutId
}

type MessageReceived

type MessageReceived struct {
	Ack
}

MessageReceived is used for when DeliveryLevelType = 2

type MsgId

type MsgId idgen.IdType

type Node

type Node interface {
	SubscribeGroupInitialize(sg SubscribeGroup) error
	PublishGroupInitialize(pg PublishGroup) error

	DirectReply(reply *Reply, ctx *Ctx) error
	GetReplyChannel() <-chan *Reply

	GetNodeId() NodeId

	// Leave means the end of Node lifecycle
	Leave() error
}

type NodeId

type NodeId idgen.IdType

type OutId

type OutId idgen.IdType

type PartitionId

type PartitionId idgen.IdType

type PublishGroup

type PublishGroup interface {
	Join(node Node) error

	PublishMessage(req *Request, ctx *Ctx) (Ack, error)
	PublishGuaranteeMessage(req *Request, ctx *Ctx) (Ack, error)
	PrePublishMessage(req *Request, ctx *Ctx) (MessageReceived, error)
	CommitMessage(req *MessageCommit, ctx *Ctx) (MessageFinish, error)

	Reply(reply *Reply, ctx *Ctx) error
}

type PublishGroupId

type PublishGroupId idgen.IdType

type Queue

type Queue interface {
	DeliveryLevel() DeliveryLevelType
	QueueId() QueueId
}

type QueueId

type QueueId idgen.IdType

type QueueOption

type QueueOption struct {
	QueueChannelSize        int
	QueueInboundChannelSize int

	DeliveryLevel DeliveryLevelType
	QueueType     string

	UncommittedMessageRetainTime int // in seconds, default 7 * 24 * 3600
	RedeliverIntervalTime        int // in seconds, default 5 seconds
}

type QueueRecord

type QueueRecord struct {
	FromId RecordOffset
}

type QueueType

type QueueType interface {
	PublishMessage(req *Request, ctx *Ctx) error
	PrePublishMessage(req *Request, ctx *Ctx) error
	CommitMessages(commit *MessageCommit, ctx *Ctx) error

	CreateRecord(subscribeGroupId SubscribeGroupId, ctx *Ctx) (*QueueRecord, error)
	BatchObtain(record *QueueRecord, maxCnt int, ctx *Ctx) (BatchObtainResult, error)
	BatchObtainReleasing(record *QueueRecord, maxCnt int, ctx *Ctx) (BatchObtainResult, error)
	ConfirmConsumed(record *QueueRecord, ack *Ack) error
	ReleaseConsumed(record *QueueRecord, ack *Ack) error

	Init(queue Queue, option *QueueOption) error
	Close(ctx context.Context) error
}

type RecordOffset

type RecordOffset idgen.IdType

type ReleaseChanElem

type ReleaseChanElem struct {
	Request     *Request
	Queue       Queue //FIXME should not expose kernel component to the api
	QueueRecord *QueueRecord
}

type Reply

type Reply struct {
	ReplyTo
	Message
}

type ReplyTo

type ReplyTo struct {
	ReplyType           byte //0 - non, 1 - nodeId, 2 - publishGroupId
	ReplyToNode         NodeId
	ReplyToPublishGroup PublishGroupId
	ReplyIdentifier     string
}

type Request

type Request struct {
	Header       Header
	BatchMessage []Message
}

type SubChanElem

type SubChanElem struct {
	Request     *Request
	Queue       Queue //FIXME should not expose kernel component to the api
	QueueRecord *QueueRecord
}

type SubscribeGroup

type SubscribeGroup interface {
	SubscribeChannel() <-chan SubChanElem
	ReleaseChannel() <-chan ReleaseChanElem

	Join(node Node) error

	Commit(queueId QueueId, record *QueueRecord, ack *Ack) error
	Release(queueId QueueId, record *QueueRecord, ack *Ack) error
}

type SubscribeGroupId

type SubscribeGroupId idgen.IdType

type SubscribeGroupOption

type SubscribeGroupOption struct {
	SubscribeChannelSize    int
	ObtainFailRetryInterval int // milliseconds, default 100ms
}

type TagId

type TagId idgen.IdType

type Topic

type Topic interface {
	DeliveryLevel() DeliveryLevelType
	TopicId() TopicId
}

type TopicId

type TopicId idgen.IdType

type TopicOption

type TopicOption struct {
	DeliveryLevel DeliveryLevelType
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL