queue

package
v2.1.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2021 License: Apache-2.0 Imports: 3 Imported by: 19

Documentation

Overview

Package queue provides implementations of server-side queues.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Queue manager.

func NewManager

func NewManager(qstore Storage) *Manager

Create a queue manager with the specified queue storage mechanism

func (*Manager) Find

func (qm *Manager) Find(destination string) *Queue

Finds the queue for the given destination, and creates it if necessary.

type MemoryQueueStorage

type MemoryQueueStorage struct {
	// contains filtered or unexported fields
}

In-memory implementation of the QueueStorage interface.

func (*MemoryQueueStorage) Dequeue

func (m *MemoryQueueStorage) Dequeue(queue string) (*frame.Frame, error)

Removes a frame from the head of the queue. Returns nil if no frame is available.

func (*MemoryQueueStorage) Enqueue

func (m *MemoryQueueStorage) Enqueue(queue string, frame *frame.Frame) error

func (*MemoryQueueStorage) Requeue

func (m *MemoryQueueStorage) Requeue(queue string, frame *frame.Frame) error

Pushes a frame to the head of the queue. Sets the "message-id" header of the frame if it is not already set.

func (*MemoryQueueStorage) Start

func (m *MemoryQueueStorage) Start()

Called at server startup. Allows the queue storage to perform any initialization.

func (*MemoryQueueStorage) Stop

func (m *MemoryQueueStorage) Stop()

Called prior to server shutdown. Allows the queue storage to perform any cleanup.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue for storing message frames.

func (*Queue) Enqueue

func (q *Queue) Enqueue(f *frame.Frame) error

Send a message to the queue. If a subscription is available to receive the message, it is sent to the subscription without making it to the queue. Otherwise, the message is queued until a message is available.

func (*Queue) Requeue

func (q *Queue) Requeue(f *frame.Frame) error

Send a message to the front of the queue, probably because it failed to be sent to a client. If a subscription is available to receive the message, it is sent to the subscription without making it to the queue. Otherwise, the message is queued until a message is available.

func (*Queue) Subscribe

func (q *Queue) Subscribe(sub *client.Subscription) error

Add a subscription to a queue. The subscription is removed whenever a frame is sent to the subscription and needs to be re-added when the subscription decides that the message has been received by the client.

func (*Queue) Unsubscribe

func (q *Queue) Unsubscribe(sub *client.Subscription)

Unsubscribe a subscription.

type Storage

type Storage interface {
	// Pushes a MESSAGE frame to the end of the queue. Sets
	// the "message-id" header of the frame before adding to
	// the queue.
	Enqueue(queue string, frame *frame.Frame) error

	// Pushes a MESSAGE frame to the head of the queue. Sets
	// the "message-id" header of the frame if it is not
	// already set.
	Requeue(queue string, frame *frame.Frame) error

	// Removes a frame from the head of the queue.
	// Returns nil if no frame is available.
	Dequeue(queue string) (*frame.Frame, error)

	// Called at server startup. Allows the queue storage
	// to perform any initialization.
	Start()

	// Called prior to server shutdown. Allows the queue storage
	// to perform any cleanup.
	Stop()
}

Interface for queue storage. The intent is that different queue storage implementations can be used, depending on preference. Queue storage mechanisms could include in-memory, and various persistent storage mechanisms (eg file system, DB, etc)

func NewMemoryQueueStorage

func NewMemoryQueueStorage() Storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL