Documentation ¶
Overview ¶
Package messagequeue is a generated GoMock package.
Index ¶
- func AddWorker(wg *sync.WaitGroup, pool chan<- *Worker)
- type Chunker
- type ChunkerOptions
- type EnqueueMessageResult
- type Enqueuer
- type EnqueuerOptions
- type HandleFailedEnqueue
- type HandleMessage
- type Handler
- type Job
- type Message
- type MessageQueue
- type MockMessageQueue
- func (m *MockMessageQueue) Delete(arg0 context.Context, arg1 *Message) error
- func (m *MockMessageQueue) Dequeue(arg0 context.Context, arg1 int, arg2 time.Duration) ([]*Message, error)
- func (m *MockMessageQueue) EXPECT() *MockMessageQueueMockRecorder
- func (m *MockMessageQueue) EnqueueBatch(arg0 context.Context, arg1 []*Message) ([]*EnqueueMessageResult, error)
- type MockMessageQueueMockRecorder
- type Poller
- type PollerOptions
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Chunker ¶ added in v2.12.0
type Chunker[T any] interface { // Start the chunker Start(buffer <-chan T, handler Handler[T]) error // Stop the chunker and clean up any resources Stop() error }
Chunker interface for 'chunking' entries on a buffer into slices of a desired size.
func NewChunker ¶ added in v2.12.0
func NewChunker[T any](options *ChunkerOptions) Chunker[T]
NewChunker with the passed buffer, handler and options. Chunker will create slices of a specified size from the passed buffer and pass them to the handler. Entries retrieved from the buffer are guaranteed to be delivered to the handler within the configured ChunkElementExpiry duration.
type ChunkerOptions ¶ added in v2.12.0
type ChunkerOptions struct { // ChunkSize determines the desired chunk size for entries to be returned // from the buffer. ChunkSize uint // MaxElementWait determines the maximum time an entry will wait to be chunked. // Smaller durations may result in chunks that are less than the desired size. MaxElementWait time.Duration }
ChunkerOptions for configuring a Chunker instance
func NewChunkerOptions ¶ added in v2.12.0
func NewChunkerOptions() *ChunkerOptions
NewChunkerOptions creates a ChunkerOptions with default values that can be used to create a NewChunker
func (*ChunkerOptions) Validate ¶ added in v2.12.0
func (o *ChunkerOptions) Validate() error
Validate that the values contained in this ChunkerOptions are complete and within the bounds necessary for operation.
type EnqueueMessageResult ¶
type EnqueueMessageResult struct { // Message this result is for *Message // Success indicates whether the message was successfully enqueued Success bool // SenderFault when success is false, indicates that the enqueue failed due // to a malformed message SenderFault bool // Error that occurred when enqueueing the message Error string }
EnqueueMessageResult is returned on for each message that is enqueued
type Enqueuer ¶ added in v2.12.0
type Enqueuer interface { // Start validates configurations and creates the resources necessary to // handle enqueuing messages. Start(messageQueue MessageQueue) error // Enqueue the passed message. This operation may be // buffered and errors enqueueing will not be available for immediate handling. Enqueue(messages *Message) error // Stop all active workers, drain queues, and free resources. Stop() error }
Enqueuer for inserting tasks into a MessageQueue
func New ¶
func New(options *EnqueuerOptions) Enqueuer
New enqueuer for inserting messages into a MessageQueue
type EnqueuerOptions ¶ added in v2.12.0
type EnqueuerOptions struct { *ChunkerOptions // Logger for the enqueuer to use Logger log.Logger // BufferSize to use for forming batches, must be greater than BatchSize BufferSize uint // FailedBufferSize for messages that fail to enqueue. This should be at // least equal to BatchSize to avoid blocking. FailedBufferSize uint // FailureHandler receives messages that failed to enqueue, optional. FailureHandler HandleFailedEnqueue }
func NewEnqueuerOptions ¶ added in v2.12.0
func NewEnqueuerOptions() *EnqueuerOptions
NewEnqueuerOptions with default values
func (*EnqueuerOptions) Validate ¶ added in v2.12.0
func (eo *EnqueuerOptions) Validate() error
Validate that the values contained in this Options are complete and within the bounds necessary for operation.
type HandleFailedEnqueue ¶ added in v2.12.0
type HandleFailedEnqueue func(Enqueuer, *EnqueueMessageResult)
HandleFailedEnqueue is called when a message fails to enqueue
type HandleMessage ¶ added in v2.12.0
HandleMessage returning a boolean indicating if the message was successfully processed. If this function returns true the message will be deleted from the queue, otherwise it will become available for other handlers after it's visibility timeout expires.
type Handler ¶ added in v2.12.0
type Handler[T any] func(chunk []T)
Handler receives chunks that are created by the chunker
type Job ¶ added in v2.12.0
type Job interface {
Work() bool
}
Job done by a worker, returns true if the worker should continue, or false if the worker should exit
type Message ¶
type Message struct { // ID uniquely identifies this message ID string // External field used by the sdk External string // Trace field for telemetry Trace string // Delay before this message becomes visible after being enqueued Delay time.Duration // Service this message is for Service string // Method that should be invoked to process this message Method string // Body can contain any structured (JSON, XML) or unstructured text // limitations are determined by the implementation Body string // Deadline for processing this message Deadline time.Time }
Message that can be enqueued in a MessageQueue
type MessageQueue ¶
type MessageQueue interface { // Enqueue all the passed messages as a batch EnqueueBatch(context.Context, []*Message) ([]*EnqueueMessageResult, error) // Dequeue up to the passed count of messages waiting up to the passed // duration Dequeue(ctx context.Context, count int, wait time.Duration) ([]*Message, error) // Delete the passed message from the queue so that it is not processed by // other workers // TODO: [COR-553] Batch delete messages Delete(context.Context, *Message) error }
MessageQueue for enqueueing and dequeueing messages
type MockMessageQueue ¶ added in v2.12.0
type MockMessageQueue struct {
// contains filtered or unexported fields
}
MockMessageQueue is a mock of MessageQueue interface.
func NewMockMessageQueue ¶ added in v2.12.0
func NewMockMessageQueue(ctrl *gomock.Controller) *MockMessageQueue
NewMockMessageQueue creates a new mock instance.
func (*MockMessageQueue) Delete ¶ added in v2.12.0
func (m *MockMessageQueue) Delete(arg0 context.Context, arg1 *Message) error
Delete mocks base method.
func (*MockMessageQueue) Dequeue ¶ added in v2.12.0
func (m *MockMessageQueue) Dequeue(arg0 context.Context, arg1 int, arg2 time.Duration) ([]*Message, error)
Dequeue mocks base method.
func (*MockMessageQueue) EXPECT ¶ added in v2.12.0
func (m *MockMessageQueue) EXPECT() *MockMessageQueueMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockMessageQueue) EnqueueBatch ¶ added in v2.12.0
func (m *MockMessageQueue) EnqueueBatch(arg0 context.Context, arg1 []*Message) ([]*EnqueueMessageResult, error)
EnqueueBatch mocks base method.
type MockMessageQueueMockRecorder ¶ added in v2.12.0
type MockMessageQueueMockRecorder struct {
// contains filtered or unexported fields
}
MockMessageQueueMockRecorder is the mock recorder for MockMessageQueue.
func (*MockMessageQueueMockRecorder) Delete ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call
Delete indicates an expected call of Delete.
func (*MockMessageQueueMockRecorder) Dequeue ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) Dequeue(arg0, arg1, arg2 interface{}) *gomock.Call
Dequeue indicates an expected call of Dequeue.
func (*MockMessageQueueMockRecorder) EnqueueBatch ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) EnqueueBatch(arg0, arg1 interface{}) *gomock.Call
EnqueueBatch indicates an expected call of EnqueueBatch.
type Poller ¶ added in v2.12.0
type Poller interface { // Poll for messages on the passed queue Poll(HandleMessage, MessageQueue) error // Stop polling for messages Stop() error }
Poller retrieves batches of messages from a message queue and handles them using provided functions.
func NewPoller ¶ added in v2.12.0
func NewPoller(options *PollerOptions) Poller
NewPoller with the passed options, if options are nil, default options will be used.
type PollerOptions ¶ added in v2.12.0
type PollerOptions struct { // Logger to use for reporting errors Logger log.Logger // ConcurrentMessageHandlers that should be running at any given time ConcurrentMessageHandlers int // WaitForBatch the specified duration before prematurely returning with less // than the desired number of messages. WaitForBatch time.Duration // DequeueCount is the number of messages to attempt to dequeue per request. // maximum will vary by implementation DequeueCount int // QueueOperationTimeout QueueOperationTimeout time.Duration }
func NewPollerOptions ¶ added in v2.12.0
func NewPollerOptions() *PollerOptions
NewPollerOptions with valid values that can be used to initialize a new Poller
func (*PollerOptions) Validate ¶ added in v2.12.0
func (po *PollerOptions) Validate() error
Validate that the values contained in this Options are complete and within the bounds necessary for operation.
type Worker ¶ added in v2.12.0
type Worker struct {
// contains filtered or unexported fields
}
Worker can be used to asynchronously call work added via AddWork until work returns false.