mqimpl

package
v0.0.0-...-d0e3dbc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2024 License: LGPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	EmptyMessageIdList = mqapi.Ack{}
)

Functions

func LogError

func LogError(v ...interface{})

func LogInfo

func LogInfo(v ...interface{})

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(option *BrokerOption) *Broker

func (*Broker) AddNode

func (b *Broker) AddNode() (mqapi.Node, error)

func (*Broker) BindPublishGroupToTopic

func (b *Broker) BindPublishGroupToTopic(publishGroupId mqapi.PublishGroupId, topicId mqapi.TopicId) error

func (*Broker) BindSubscribeGroupToQueue

func (b *Broker) BindSubscribeGroupToQueue(subscribeGroupId mqapi.SubscribeGroupId, queueId mqapi.QueueId) error

func (*Broker) BindTopicAndQueue

func (b *Broker) BindTopicAndQueue(topicId mqapi.TopicId, queueId mqapi.QueueId, tags []mqapi.TagId) error

func (*Broker) DefineNewPublishGroup

func (b *Broker) DefineNewPublishGroup(publishGroupId mqapi.PublishGroupId) (mqapi.PublishGroup, error)

func (*Broker) DefineNewQueue

func (b *Broker) DefineNewQueue(queueId mqapi.QueueId, option *mqapi.QueueOption) (mqapi.Queue, error)

func (*Broker) DefineNewSubscribeGroup

func (b *Broker) DefineNewSubscribeGroup(subscribeGroupId mqapi.SubscribeGroupId, option *mqapi.SubscribeGroupOption) (mqapi.SubscribeGroup, error)

func (*Broker) DefineNewTopic

func (b *Broker) DefineNewTopic(topicId mqapi.TopicId, option *mqapi.TopicOption) (mqapi.Topic, error)

func (*Broker) DeletePublishGroup

func (b *Broker) DeletePublishGroup(publishGroupId mqapi.PublishGroupId) error

func (*Broker) DeleteQueue

func (b *Broker) DeleteQueue(queueId mqapi.QueueId) error

func (*Broker) DeleteSubscribeGroup

func (b *Broker) DeleteSubscribeGroup(subscribeGroupId mqapi.SubscribeGroupId) error

func (*Broker) DeleteTopic

func (b *Broker) DeleteTopic(topicId mqapi.TopicId) error

func (*Broker) GenNewInternalTopicId

func (b *Broker) GenNewInternalTopicId() (int32, error)

func (*Broker) GetNode

func (b *Broker) GetNode(nodeId mqapi.NodeId) mqapi.Node

func (*Broker) GetPublishGroup

func (b *Broker) GetPublishGroup(publishGroupId mqapi.PublishGroupId) mqapi.PublishGroup

func (*Broker) GetSubscribeGroup

func (b *Broker) GetSubscribeGroup(subscribeGroupId mqapi.SubscribeGroupId) mqapi.SubscribeGroup

func (*Broker) Shutdown

func (b *Broker) Shutdown(ctx context.Context) error

func (*Broker) Start

func (b *Broker) Start() error

func (*Broker) UnbindPublishGroupFromTopic

func (b *Broker) UnbindPublishGroupFromTopic(publishGroupId mqapi.PublishGroupId, topic mqapi.TopicId) error

func (*Broker) UnbindSubscribeGroupFromQueue

func (b *Broker) UnbindSubscribeGroupFromQueue(subscribeGroupId mqapi.SubscribeGroupId, queueId mqapi.QueueId) error

func (*Broker) UnbindTopicAndQueue

func (b *Broker) UnbindTopicAndQueue(topicId mqapi.TopicId, queueId mqapi.QueueId) error

type BrokerOption

type BrokerOption struct {
	NodeId int16
}

type Node

type Node struct {
	InitSubscribeGroupFn func(sub mqapi.SubscribeGroup)
	InitPublishGroupFn   func(pub mqapi.PublishGroup)
	// contains filtered or unexported fields
}

func (*Node) DirectReply

func (n *Node) DirectReply(reply *mqapi.Reply, ctx *mqapi.Ctx) error

func (*Node) GetNodeId

func (n *Node) GetNodeId() mqapi.NodeId

func (*Node) GetReplyChannel

func (n *Node) GetReplyChannel() <-chan *mqapi.Reply

func (*Node) Leave

func (n *Node) Leave() error

func (*Node) PublishGroupInitialize

func (n *Node) PublishGroupInitialize(pg mqapi.PublishGroup) error

func (*Node) SubscribeGroupInitialize

func (n *Node) SubscribeGroupInitialize(sg mqapi.SubscribeGroup) error

type Partition

type Partition struct {
}

type PublishGroup

type PublishGroup struct {
	// contains filtered or unexported fields
}

func (*PublishGroup) CommitMessage

func (pg *PublishGroup) CommitMessage(req *mqapi.MessageCommit, ctx *mqapi.Ctx) (mqapi.MessageFinish, error)

CommitMessage exactly once

func (*PublishGroup) Join

func (pg *PublishGroup) Join(node mqapi.Node) error

func (*PublishGroup) Leave

func (pg *PublishGroup) Leave(node *Node)

func (*PublishGroup) PrePublishMessage

func (pg *PublishGroup) PrePublishMessage(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.MessageReceived, error)

PrePublishMessage exactly once

func (*PublishGroup) PublishGuaranteeMessage

func (pg *PublishGroup) PublishGuaranteeMessage(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.Ack, error)

PublishGuaranteeMessage at least once

func (*PublishGroup) PublishMessage

func (pg *PublishGroup) PublishMessage(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.Ack, error)

PublishMessage at most once

func (*PublishGroup) Reply

func (pg *PublishGroup) Reply(reply *mqapi.Reply, ctx *mqapi.Ctx) error

type Queue

type Queue struct {
	UncommittedMessageRetainTime int // in seconds, default 7 * 24 * 3600
	InitBatchObtainCount         int // 16
	MaxBatchObtainCount          int // 1024

	QueueChannel chan *mqapi.Request

	// internal
	mqapi.QueueType
	// contains filtered or unexported fields
}

func (*Queue) DeliveryLevel

func (this *Queue) DeliveryLevel() mqapi.DeliveryLevelType

func (*Queue) QueueId

func (this *Queue) QueueId() mqapi.QueueId

type SgQMap

type SgQMap struct {
	// contains filtered or unexported fields
}

type Subject

type Subject struct {
}

special queue 1. non-durable 2. always fan out to all subscribers 3. can have responses to publisher

type SubscribeGroup

type SubscribeGroup struct {
	SubCh     chan mqapi.SubChanElem
	ReleaseCh chan mqapi.ReleaseChanElem
	// contains filtered or unexported fields
}

func (*SubscribeGroup) Commit

func (sg *SubscribeGroup) Commit(queueId mqapi.QueueId, record *mqapi.QueueRecord, ack *mqapi.Ack) error

Commit support the following QoS qos - at least once - ack qos - exactly once - commit

func (*SubscribeGroup) Join

func (sg *SubscribeGroup) Join(node mqapi.Node) error

func (*SubscribeGroup) Leave

func (sg *SubscribeGroup) Leave(node *Node)

func (*SubscribeGroup) PumpLoop

func (sg *SubscribeGroup) PumpLoop(record *mqapi.QueueRecord, queue *Queue)

func (*SubscribeGroup) PumpReleasingLoop

func (sg *SubscribeGroup) PumpReleasingLoop(record *mqapi.QueueRecord, queue *Queue)

func (*SubscribeGroup) Release

func (sg *SubscribeGroup) Release(queueId mqapi.QueueId, record *mqapi.QueueRecord, ack *mqapi.Ack) error

Release support the following QoS qos - exactly once - release

func (*SubscribeGroup) ReleaseChannel

func (sg *SubscribeGroup) ReleaseChannel() <-chan mqapi.ReleaseChanElem

func (*SubscribeGroup) SubscribeChannel

func (sg *SubscribeGroup) SubscribeChannel() <-chan mqapi.SubChanElem

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

func (*Topic) CommitMessages

func (topic *Topic) CommitMessages(req *mqapi.MessageCommit, ctx *mqapi.Ctx) (mqapi.Ack, error)

CommitMessages apply to exactly once omit dup flag

func (*Topic) DeliveryLevel

func (topic *Topic) DeliveryLevel() mqapi.DeliveryLevelType

func (*Topic) PrePublishMessage

func (topic *Topic) PrePublishMessage(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.Ack, error)

PrePublishMessage apply to exactly once omit dup flag

func (*Topic) PreQueue

func (topic *Topic) PreQueue(req *mqapi.Request)

func (*Topic) PublishMessage

func (topic *Topic) PublishMessage(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.Ack, error)

func (*Topic) PublishMessageWithResponse

func (topic *Topic) PublishMessageWithResponse(req *mqapi.Request, ctx *mqapi.Ctx) (mqapi.Ack, error)

PublishMessageWithResponse apply to at least once

func (*Topic) TopicId

func (topic *Topic) TopicId() mqapi.TopicId

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL