Documentation
¶
Overview ¶
Package queue is an in-memory representation of the message queue.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultBufferSize = runtime.GOMAXPROCS(0) * 10
DefaultBufferSize is the default size of the in-memory queue buffer.
Functions ¶
This section is empty.
Types ¶
type CommandExecutor ¶
type CommandExecutor struct { Queue *Queue Persister persistence.Persister Packer *parcel.Packer }
CommandExecutor is an implementation of dogma.CommandExecutor that adds commands to the message queue.
func (*CommandExecutor) ExecuteCommand ¶
func (x *CommandExecutor) ExecuteCommand( ctx context.Context, m dogma.Command, _ ...dogma.ExecuteCommandOption, ) error
ExecuteCommand enqueues a command for execution.
type Message ¶
type Message struct { persistence.QueueMessage Parcel parcel.Parcel }
Message is a message on the queue.
type Queue ¶
type Queue struct { // Repository is used to load messages from the queue whenever the in-memory // buffer is exhausted. Repository persistence.QueueRepository // Marshaler is used to unmarshal the messages loaded via the repository. Marshaler marshaler.Marshaler // BufferSize is the maximum number of messages to buffer in memory. // If it is non-positive, DefaultBufferSize is used. // // It should be larger than the number of concurrent consumers. BufferSize int // contains filtered or unexported fields }
A Queue is an prioritized collection of messages.
It is an in-memory representation of the head of the persisted message queue.
It dispatches to multiple consumers, ensuring each consumer receives a different message.
func (*Queue) Ack ¶
Ack stops tracking a message that was obtained via Pop() and has been handled successfully.
func (*Queue) Nack ¶
Nack re-queues a message that was obtained via Pop() but was not handled successfully.
The message is placed in the queue according to the current value of m.NextAttemptAt under the assumption it has been updated after the failure.
func (*Queue) Pop ¶
Pop returns the message at the front of the queue.
It blocks until a message is ready to be handled or ctx is canceled.
Once the message has been handled it must either be removed from the queue entirely, or returned to the pending queue, by calling q.Ack() or q.Nack(), respectively.
func (*Queue) RemoveTimeoutsByProcessID ¶
RemoveTimeoutsByProcessID removes any timeout messages that originated from a specific process instance ID.
hk is the process's handler key, id is the instance ID.
type StreamAdaptor ¶
type StreamAdaptor struct { Queue *Queue OffsetRepository persistence.OffsetRepository Persister persistence.Persister }
StreamAdaptor is an eventstream.Handler that adds the consumed events to a queue.
func (*StreamAdaptor) HandleEvent ¶
func (a *StreamAdaptor) HandleEvent(ctx context.Context, o uint64, ev eventstream.Event) error
HandleEvent handles an event obtained from the event stream.
o must be the offset that would be returned by NextOffset(). On success, the next call to NextOffset() will return ev.Offset + 1.
func (*StreamAdaptor) NextOffset ¶
NextOffset returns the offset of the next event to be consumed from a specific application's event stream.
id is the identity of the source application.