Documentation ¶
Index ¶
- type ChannelQueue
- func (q *ChannelQueue[E]) Clear()
- func (q *ChannelQueue[E]) IsEmpty() bool
- func (q *ChannelQueue[E]) Iterator() Iterator[E]
- func (q *ChannelQueue[E]) Offer(item E) bool
- func (q *ChannelQueue[E]) Peek() (E, bool)
- func (q *ChannelQueue[E]) Poll() (E, bool)
- func (q *ChannelQueue[E]) Size() int64
- func (q *ChannelQueue[E]) ToSlice() []E
- type DeliverMessage
- type Iterator
- type LockFreeQueue
- func (q *LockFreeQueue[E]) Clear()
- func (q *LockFreeQueue[E]) IsEmpty() bool
- func (q *LockFreeQueue[E]) Iterator() Iterator[E]
- func (q *LockFreeQueue[E]) Offer(e E) bool
- func (q *LockFreeQueue[E]) Peek() (E, bool)
- func (q *LockFreeQueue[E]) Poll() (E, bool)
- func (q *LockFreeQueue[E]) Size() int64
- func (q *LockFreeQueue[E]) ToSlice() []E
- func (q *LockFreeQueue[E]) WithRetry(retry int64) *LockFreeRetryQueue[E]
- type LockFreeRetryQueue
- func (q *LockFreeRetryQueue[E]) Clear()
- func (q *LockFreeRetryQueue[E]) IsEmpty() bool
- func (q *LockFreeRetryQueue[E]) Iterator() Iterator[E]
- func (q *LockFreeRetryQueue[E]) Offer(e E) bool
- func (q *LockFreeRetryQueue[E]) Peek() (E, bool)
- func (q *LockFreeRetryQueue[E]) Poll() (E, bool)
- func (q *LockFreeRetryQueue[E]) Size() int64
- func (q *LockFreeRetryQueue[E]) ToSlice() []E
- type Message
- type MessageQueue
- type MessageQueueProvider
- type MutexQueue
- func (q *MutexQueue[E]) Clear()
- func (q *MutexQueue[E]) IsEmpty() bool
- func (q *MutexQueue[E]) Iterator() Iterator[E]
- func (q *MutexQueue[E]) Offer(e E) bool
- func (q *MutexQueue[E]) Peek() (E, bool)
- func (q *MutexQueue[E]) Poll() (E, bool)
- func (q *MutexQueue[E]) Size() int64
- func (q *MutexQueue[E]) ToSlice() []E
- type PoolQueue
- func (p *PoolQueue[E]) Clear()
- func (p *PoolQueue[E]) IsEmpty() bool
- func (p *PoolQueue[E]) Iterator() Iterator[E]
- func (p *PoolQueue[E]) Offer(e E) bool
- func (p *PoolQueue[E]) Peek() (E, bool)
- func (p *PoolQueue[E]) Poll() (E, bool)
- func (p *PoolQueue[E]) Size() int64
- func (p *PoolQueue[E]) ToSlice() []E
- type Provider
- type Queue
- type SegmentPool
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelQueue ¶
type ChannelQueue[E any] struct { // contains filtered or unexported fields }
func NewChannelQueue ¶
func NewChannelQueue[E any]() *ChannelQueue[E]
NewChannelQueue Create a new instance of ChannelQueue
func (*ChannelQueue[E]) IsEmpty ¶
func (q *ChannelQueue[E]) IsEmpty() bool
IsEmpty Check whether the queue is empty
func (*ChannelQueue[E]) Iterator ¶
func (q *ChannelQueue[E]) Iterator() Iterator[E]
Iterator implements Queue.
func (*ChannelQueue[E]) Offer ¶
func (q *ChannelQueue[E]) Offer(item E) bool
Offer implements Queue.
type DeliverMessage ¶
type DeliverMessage interface { // Message return to original message Message() *Message // Ack confirm that the message was successfully processed Ack(ctx context.Context) error // Nack rejecting a message may cause the message to rejoin the team or be discarded Nack(ctx context.Context) error }
DeliverMessage represents a delivered message interface
type Iterator ¶
type Iterator[E any] interface { // Next advances the iterator and returns true if there is a next element Next() bool // Value returns the current element Value() E }
Iterator interface
type LockFreeQueue ¶
type LockFreeQueue[E any] struct { // contains filtered or unexported fields }
LockFreeQueue Is a lock-free implementation of concurrent queues
func NewLockFreeQueue ¶
func NewLockFreeQueue[E any]() *LockFreeQueue[E]
NewLockFreeQueue Create a new LockFreeQueue
func NewLockFreeQueueWithPool ¶
func NewLockFreeQueueWithPool[E any](pool *SegmentPool[E]) *LockFreeQueue[E]
NewLockFreeQueueWithPool Create a new LockFreeQueue with a pre-created segment pool
func (*LockFreeQueue[E]) Clear ¶
func (q *LockFreeQueue[E]) Clear()
Clear removes all elements from the queue
func (*LockFreeQueue[E]) IsEmpty ¶
func (q *LockFreeQueue[E]) IsEmpty() bool
IsEmpty returns true if the queue is empty
func (*LockFreeQueue[E]) Iterator ¶
func (q *LockFreeQueue[E]) Iterator() Iterator[E]
Iterator returns an iterator for the queue
func (*LockFreeQueue[E]) Offer ¶
func (q *LockFreeQueue[E]) Offer(e E) bool
Offer Adds the element to the end of the queue
func (*LockFreeQueue[E]) Peek ¶
func (q *LockFreeQueue[E]) Peek() (E, bool)
Peek returns the element at the head of the queue without removing it
func (*LockFreeQueue[E]) Poll ¶
func (q *LockFreeQueue[E]) Poll() (E, bool)
Poll Removes an element from the queue header and returns it
func (*LockFreeQueue[E]) Size ¶
func (q *LockFreeQueue[E]) Size() int64
Size returns the number of elements in the queue
func (*LockFreeQueue[E]) ToSlice ¶
func (q *LockFreeQueue[E]) ToSlice() []E
ToSlice returns a slice representation of the queue
func (*LockFreeQueue[E]) WithRetry ¶
func (q *LockFreeQueue[E]) WithRetry(retry int64) *LockFreeRetryQueue[E]
type LockFreeRetryQueue ¶
type LockFreeRetryQueue[E any] struct { *LockFreeQueue[E] // contains filtered or unexported fields }
func (*LockFreeRetryQueue[E]) Clear ¶
func (q *LockFreeRetryQueue[E]) Clear()
Clear removes all elements from the queue
func (*LockFreeRetryQueue[E]) IsEmpty ¶
func (q *LockFreeRetryQueue[E]) IsEmpty() bool
IsEmpty returns true if the queue is empty
func (*LockFreeRetryQueue[E]) Iterator ¶
func (q *LockFreeRetryQueue[E]) Iterator() Iterator[E]
Iterator returns an iterator for the queue
func (*LockFreeRetryQueue[E]) Offer ¶
func (q *LockFreeRetryQueue[E]) Offer(e E) bool
Offer Adds the element to the end of the queue
func (*LockFreeRetryQueue[E]) Peek ¶
func (q *LockFreeRetryQueue[E]) Peek() (E, bool)
Peek returns the element at the head of the queue without removing it
func (*LockFreeRetryQueue[E]) Poll ¶
func (q *LockFreeRetryQueue[E]) Poll() (E, bool)
Poll Removes an element from the queue header and returns it
func (*LockFreeRetryQueue[E]) Size ¶
func (q *LockFreeRetryQueue[E]) Size() int64
Size returns the number of elements in the queue
func (*LockFreeRetryQueue[E]) ToSlice ¶
func (q *LockFreeRetryQueue[E]) ToSlice() []E
ToSlice returns a slice representation of the queue
type MessageQueue ¶
type MessageQueue interface { // Publish a message to the queue Publish(ctx context.Context, payload string) (string, error) // Subscribe to the queue and process the messages Subscribe(ctx context.Context, handler func(DeliverMessage)) error // Size returns the number of messages in the queue Size() int64 }
MessageQueue is a message Queue interface that extends the basic queue interface
type MessageQueueProvider ¶
type MessageQueueProvider interface { // MessageQueue obtain the corresponding MessageQueue based on subject MessageQueue(subject string) (MessageQueue, error) // PublishToTopic publishes a message to the specified topic PublishToTopic(ctx context.Context, topic string, payload string) error // SubscribeToTopic subscribe to messages for a specified topic SubscribeToTopic(ctx context.Context, topic string, handler func(DeliverMessage)) (Subscription, error) // RequestReply send a request and wait for a response RequestReply(ctx context.Context, topic string, payload string) (*Message, error) // QueueSubscribe create a queue subscription QueueSubscribe(ctx context.Context, topic string, handler func(DeliverMessage)) (Subscription, error) // Close closes all MessageQueue connections Close() error }
MessageQueueProvider provides the ability to get MessageQueue by subject
type MutexQueue ¶
type MutexQueue[E any] struct { // contains filtered or unexported fields }
MutexQueue is a mutex based queue implementation
func NewMutexQueue ¶
func NewMutexQueue[E any]() *MutexQueue[E]
NewMutexQueue creates a new MutexQueue
func (*MutexQueue[E]) Clear ¶
func (q *MutexQueue[E]) Clear()
func (*MutexQueue[E]) IsEmpty ¶
func (q *MutexQueue[E]) IsEmpty() bool
func (*MutexQueue[E]) Iterator ¶
func (q *MutexQueue[E]) Iterator() Iterator[E]
func (*MutexQueue[E]) Offer ¶
func (q *MutexQueue[E]) Offer(e E) bool
func (*MutexQueue[E]) Peek ¶
func (q *MutexQueue[E]) Peek() (E, bool)
func (*MutexQueue[E]) Poll ¶
func (q *MutexQueue[E]) Poll() (E, bool)
func (*MutexQueue[E]) Size ¶
func (q *MutexQueue[E]) Size() int64
func (*MutexQueue[E]) ToSlice ¶
func (q *MutexQueue[E]) ToSlice() []E
type PoolQueue ¶
type PoolQueue[E any] struct { // contains filtered or unexported fields }
func NewPoolQueue ¶
type Provider ¶
type Provider[E any] interface { Queue(topic string) Queue[E] // Get a queue by topic Publish(ctx context.Context, topic string, payload E) error Subscribe(ctx context.Context, topic string, handler func(E)) error Close() error }
Provider interface
type Queue ¶
type Queue[E any] interface { // Offer adds an element to the queue if possible, returning true on success Offer(E) bool // Poll retrieves and removes the head of the queue, or returns false if empty Poll() (E, bool) // Peek retrieves but does not remove the head of the queue, or returns false if empty Peek() (E, bool) // Size returns the number of elements in the queue Size() int64 // IsEmpty returns true if the queue contains no elements IsEmpty() bool // Clear removes all elements from the queue Clear() // ToSlice returns a slice containing all the elements in the queue ToSlice() []E // Iterator returns an Iterator over the elements in this queue Iterator() Iterator[E] }
Queue interface
type SegmentPool ¶
type SegmentPool[E any] struct { // contains filtered or unexported fields }
func (*SegmentPool[E]) NewPool ¶
func (s *SegmentPool[E]) NewPool()
type Subscription ¶
type Subscription interface { // Unsubscribe from the subscription Unsubscribe() error // Topic returns the subscribed topic Topic() string // IsQueue returns whether the subscription is a queue subscription IsQueue() bool // Queue return the queue name (if it is a queue subscription) Queue() (string, bool) // IsActive returns whether the subscription is active IsActive() bool // Pause suspend receiving message Pause() error // Resume resume receiving message Resume() error // HandlerMessage set up a new message handler HandlerMessage(handler func(*Message)) error // Pending returns the number of messages to be processed Pending() (int, error) // Delivered returns the number of messages that have been delivered Delivered() (uint64, error) // Dropped returns the number of messages discarded due to client timeouts or disconnections Dropped() (uint64, error) // ClearMaxPending clear statistics about the maximum number of messages to be processed ClearMaxPending() error // MaxPending returns the maximum number of pending messages MaxPending() (int, int, error) }
Subscription indicates a subscription