messagequeue

package
v2.16.6-prerelease Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package messagequeue is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddWorker added in v2.12.0

func AddWorker(wg *sync.WaitGroup, pool chan<- *Worker)

AddWorker to the passed pool

Types

type Chunker added in v2.12.0

type Chunker[T any] interface {
	// Start the chunker
	Start(buffer <-chan T, handler Handler[T]) error
	// Stop the chunker and clean up any resources
	Stop() error
}

Chunker interface for 'chunking' entries on a buffer into slices of a desired size.

func NewChunker added in v2.12.0

func NewChunker[T any](options *ChunkerOptions) Chunker[T]

NewChunker with the passed buffer, handler and options. Chunker will create slices of a specified size from the passed buffer and pass them to the handler. Entries retrieved from the buffer are guaranteed to be delivered to the handler within the configured ChunkElementExpiry duration.

type ChunkerOptions added in v2.12.0

type ChunkerOptions struct {
	// ChunkSize determines the desired chunk size for entries to be returned
	// from the buffer.
	ChunkSize uint
	// MaxElementWait determines the maximum time an entry will wait to be chunked.
	// Smaller durations may result in chunks that are less than the desired size.
	MaxElementWait time.Duration
}

ChunkerOptions for configuring a Chunker instance

func NewChunkerOptions added in v2.12.0

func NewChunkerOptions() *ChunkerOptions

NewChunkerOptions creates a ChunkerOptions with default values that can be used to create a NewChunker

func (*ChunkerOptions) Validate added in v2.12.0

func (o *ChunkerOptions) Validate() error

Validate that the values contained in this ChunkerOptions are complete and within the bounds necessary for operation.

type EnqueueMessageResult

type EnqueueMessageResult struct {
	// Message this result is for
	*Message
	// Success indicates whether the message was successfully enqueued
	Success bool
	// SenderFault when success is false, indicates that the enqueue failed due
	// to a malformed message
	SenderFault bool
	// Error that occurred when enqueueing the message
	Error string
}

EnqueueMessageResult is returned on for each message that is enqueued

type Enqueuer added in v2.12.0

type Enqueuer interface {
	// Start validates configurations and creates the resources necessary to
	// handle enqueuing messages.
	Start(messageQueue MessageQueue) error
	// Enqueue the passed message. This operation may be
	// buffered and errors enqueueing will not be available for immediate handling.
	Enqueue(messages *Message) error
	// Stop all active workers, drain queues, and free resources.
	Stop() error
}

Enqueuer for inserting tasks into a MessageQueue

func New

func New(options *EnqueuerOptions) Enqueuer

New enqueuer for inserting messages into a MessageQueue

type EnqueuerOptions added in v2.12.0

type EnqueuerOptions struct {
	*ChunkerOptions
	// Logger for the enqueuer to use
	Logger log.Logger
	// BufferSize to use for forming batches, must be greater than BatchSize
	BufferSize uint
	// FailedBufferSize for messages that fail to enqueue. This should be at
	// least equal to BatchSize to avoid blocking.
	FailedBufferSize uint
	// FailureHandler receives messages that failed to enqueue, optional.
	FailureHandler HandleFailedEnqueue
}

func NewEnqueuerOptions added in v2.12.0

func NewEnqueuerOptions() *EnqueuerOptions

NewEnqueuerOptions with default values

func (*EnqueuerOptions) Validate added in v2.12.0

func (eo *EnqueuerOptions) Validate() error

Validate that the values contained in this Options are complete and within the bounds necessary for operation.

type HandleFailedEnqueue added in v2.12.0

type HandleFailedEnqueue func(Enqueuer, *EnqueueMessageResult)

HandleFailedEnqueue is called when a message fails to enqueue

type HandleMessage added in v2.12.0

type HandleMessage func(context.Context, *Message) bool

HandleMessage returning a boolean indicating if the message was successfully processed. If this function returns true the message will be deleted from the queue, otherwise it will become available for other handlers after it's visibility timeout expires.

type Handler added in v2.12.0

type Handler[T any] func(chunk []T)

Handler receives chunks that are created by the chunker

type Job added in v2.12.0

type Job interface {
	Work() bool
}

Job done by a worker, returns true if the worker should continue, or false if the worker should exit

type Message

type Message struct {
	// ID uniquely identifies this message
	ID string
	// External field used by the sdk
	External string
	// Trace field for telemetry
	Trace string
	// Delay before this message becomes visible after being enqueued
	Delay time.Duration
	// Service this message is for
	Service string
	// Method that should be invoked to process this message
	Method string
	// Body can contain any structured (JSON, XML) or unstructured text
	// limitations are determined by the implementation
	Body string
	// Deadline for processing this message
	Deadline time.Time
}

Message that can be enqueued in a MessageQueue

type MessageQueue

type MessageQueue interface {
	// Enqueue all the passed messages as a batch
	EnqueueBatch(context.Context, []*Message) ([]*EnqueueMessageResult, error)
	// Dequeue up to the passed count of messages waiting up to the passed
	// duration
	Dequeue(ctx context.Context, count int, wait time.Duration) ([]*Message, error)
	// Delete the passed message from the queue so that it is not processed by
	// other workers
	// TODO: [COR-553] Batch delete messages
	Delete(context.Context, *Message) error
}

MessageQueue for enqueueing and dequeueing messages

type MockMessageQueue added in v2.12.0

type MockMessageQueue struct {
	// contains filtered or unexported fields
}

MockMessageQueue is a mock of MessageQueue interface.

func NewMockMessageQueue added in v2.12.0

func NewMockMessageQueue(ctrl *gomock.Controller) *MockMessageQueue

NewMockMessageQueue creates a new mock instance.

func (*MockMessageQueue) Delete added in v2.12.0

func (m *MockMessageQueue) Delete(arg0 context.Context, arg1 *Message) error

Delete mocks base method.

func (*MockMessageQueue) Dequeue added in v2.12.0

func (m *MockMessageQueue) Dequeue(arg0 context.Context, arg1 int, arg2 time.Duration) ([]*Message, error)

Dequeue mocks base method.

func (*MockMessageQueue) EXPECT added in v2.12.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMessageQueue) EnqueueBatch added in v2.12.0

func (m *MockMessageQueue) EnqueueBatch(arg0 context.Context, arg1 []*Message) ([]*EnqueueMessageResult, error)

EnqueueBatch mocks base method.

type MockMessageQueueMockRecorder added in v2.12.0

type MockMessageQueueMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageQueueMockRecorder is the mock recorder for MockMessageQueue.

func (*MockMessageQueueMockRecorder) Delete added in v2.12.0

func (mr *MockMessageQueueMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call

Delete indicates an expected call of Delete.

func (*MockMessageQueueMockRecorder) Dequeue added in v2.12.0

func (mr *MockMessageQueueMockRecorder) Dequeue(arg0, arg1, arg2 interface{}) *gomock.Call

Dequeue indicates an expected call of Dequeue.

func (*MockMessageQueueMockRecorder) EnqueueBatch added in v2.12.0

func (mr *MockMessageQueueMockRecorder) EnqueueBatch(arg0, arg1 interface{}) *gomock.Call

EnqueueBatch indicates an expected call of EnqueueBatch.

type Poller added in v2.12.0

type Poller interface {
	// Poll for messages on the passed queue
	Poll(HandleMessage, MessageQueue) error
	// Stop polling for messages
	Stop() error
}

Poller retrieves batches of messages from a message queue and handles them using provided functions.

func NewPoller added in v2.12.0

func NewPoller(options *PollerOptions) Poller

NewPoller with the passed options, if options are nil, default options will be used.

type PollerOptions added in v2.12.0

type PollerOptions struct {
	// Logger to use for reporting errors
	Logger log.Logger
	// ConcurrentMessageHandlers that should be running at any given time
	ConcurrentMessageHandlers int
	// WaitForBatch the specified duration before prematurely returning with less
	// than the desired number of messages.
	WaitForBatch time.Duration
	// DequeueCount is the number of messages to attempt to dequeue per request.
	// maximum will vary by implementation
	DequeueCount int
	// QueueOperationTimeout
	QueueOperationTimeout time.Duration
}

func NewPollerOptions added in v2.12.0

func NewPollerOptions() *PollerOptions

NewPollerOptions with valid values that can be used to initialize a new Poller

func (*PollerOptions) Validate added in v2.12.0

func (po *PollerOptions) Validate() error

Validate that the values contained in this Options are complete and within the bounds necessary for operation.

type Worker added in v2.12.0

type Worker struct {
	// contains filtered or unexported fields
}

Worker can be used to asynchronously call work added via AddWork until work returns false.

func (*Worker) Add added in v2.12.0

func (w *Worker) Add(job Job)

Add work to this worker. If the work returns false this workers routine will exit. This function will block.

func (*Worker) Exit added in v2.12.0

func (w *Worker) Exit()

Exit this workers internal routine once current processing ends. This function will not block.

Directories

Path Synopsis
Package sqs is a generated GoMock package.
Package sqs is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL