queue

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package queue is an in-memory representation of the message queue.

Index

Constants

This section is empty.

Variables

View Source
var DefaultBufferSize = runtime.GOMAXPROCS(0) * 10

DefaultBufferSize is the default size of the in-memory queue buffer.

Functions

This section is empty.

Types

type CommandExecutor

type CommandExecutor struct {
	Queue     *Queue
	Persister persistence.Persister
	Packer    *parcel.Packer
}

CommandExecutor is an implementation of dogma.CommandExecutor that adds commands to the message queue.

func (*CommandExecutor) ExecuteCommand

func (x *CommandExecutor) ExecuteCommand(
	ctx context.Context,
	m dogma.Command,
	_ ...dogma.ExecuteCommandOption,
) error

ExecuteCommand enqueues a command for execution.

type Message

type Message struct {
	persistence.QueueMessage
	Parcel parcel.Parcel
}

Message is a message on the queue.

type Queue

type Queue struct {
	// Repository is used to load messages from the queue whenever the in-memory
	// buffer is exhausted.
	Repository persistence.QueueRepository

	// Marshaler is used to unmarshal the messages loaded via the repository.
	Marshaler marshaler.Marshaler

	// BufferSize is the maximum number of messages to buffer in memory.
	// If it is non-positive, DefaultBufferSize is used.
	//
	// It should be larger than the number of concurrent consumers.
	BufferSize int
	// contains filtered or unexported fields
}

A Queue is an prioritized collection of messages.

It is an in-memory representation of the head of the persisted message queue.

It dispatches to multiple consumers, ensuring each consumer receives a different message.

func (*Queue) Ack

func (q *Queue) Ack(m Message)

Ack stops tracking a message that was obtained via Pop() and has been handled successfully.

func (*Queue) Add

func (q *Queue) Add(messages []Message)

Add begins tracking messages that have already been persisted.

func (*Queue) Nack

func (q *Queue) Nack(m Message)

Nack re-queues a message that was obtained via Pop() but was not handled successfully.

The message is placed in the queue according to the current value of m.NextAttemptAt under the assumption it has been updated after the failure.

func (*Queue) Pop

func (q *Queue) Pop(ctx context.Context) (Message, error)

Pop returns the message at the front of the queue.

It blocks until a message is ready to be handled or ctx is canceled.

Once the message has been handled it must either be removed from the queue entirely, or returned to the pending queue, by calling q.Ack() or q.Nack(), respectively.

func (*Queue) RemoveTimeoutsByProcessID

func (q *Queue) RemoveTimeoutsByProcessID(hk, id string)

RemoveTimeoutsByProcessID removes any timeout messages that originated from a specific process instance ID.

hk is the process's handler key, id is the instance ID.

func (*Queue) Run

func (q *Queue) Run(ctx context.Context) error

Run starts the queue.

It coordinates the tracking of messages that are loaded from a queue repository or manually added to the queue by Add().

type StreamAdaptor

type StreamAdaptor struct {
	Queue            *Queue
	OffsetRepository persistence.OffsetRepository
	Persister        persistence.Persister
}

StreamAdaptor is an eventstream.Handler that adds the consumed events to a queue.

func (*StreamAdaptor) HandleEvent

func (a *StreamAdaptor) HandleEvent(ctx context.Context, o uint64, ev eventstream.Event) error

HandleEvent handles an event obtained from the event stream.

o must be the offset that would be returned by NextOffset(). On success, the next call to NextOffset() will return ev.Offset + 1.

func (*StreamAdaptor) NextOffset

func (a *StreamAdaptor) NextOffset(ctx context.Context, id configkit.Identity) (uint64, error)

NextOffset returns the offset of the next event to be consumed from a specific application's event stream.

id is the identity of the source application.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL