Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InboxHandler ¶
func InboxHandler(store InboxStore) async.MessageHandlerMiddleware
InboxHandler returns a new instance of MessageHandlerMiddleware that uses the provided InboxStore to save incoming messages before passing them to the next handler in the chain. It ensures that duplicate messages are handled gracefully by acknowledging them without returning an error, thus preventing multiple processing of the same message.
The middleware wraps around the next MessageHandler to intercept the call, perform operations on the message, and delegate the message to the next handler if appropriate.
func OutboxPublisher ¶
func OutboxPublisher(store OutboxStore) async.MessagePublisherMiddleware
OutboxPublisher creates a new MessagePublisherMiddleware using the provided OutboxStore. The middleware ensures that messages are saved to the OutboxStore before they are published. This is a critical step in guaranteeing that messages are not lost in the event of a failure during the publication process.
If the Save operation detects a duplicate message, it is silently acknowledged to maintain idempotency and prevent duplicate message delivery.
The middleware intercepts the message publication process, applies the store's Save operation, and proceeds with the publication if successful.
Types ¶
type ErrDuplicateMessage ¶
type ErrDuplicateMessage string
ErrDuplicateMessage defines a custom error type that is used to represent a duplicate message scenario within the message processing.
func (ErrDuplicateMessage) Error ¶
func (e ErrDuplicateMessage) Error() string
Error implements the error interface for ErrDuplicateMessage. It formats the error message to indicate a duplicate message has been encountered.
type InboxStore ¶
type InboxStore interface { // Save attempts to save an incoming message to the store. // It returns an error if the saving process fails. Save(context.Context, async.IncomingMessage) error }
InboxStore is an interface that abstracts the storage mechanism for incoming messages. Implementations of this interface should handle the persistence of message data.
type OutboxProcessor ¶
OutboxProcessor is an interface that outlines the start process for message processing. Implementations of this interface must be able to initiate the processing of outgoing messages.
func NewOutboxProcessor ¶
func NewOutboxProcessor(publisher async.MessagePublisher, store OutboxStore) OutboxProcessor
NewOutboxProcessor creates and returns a new instance of an outboxProcessor with the given message publisher and outbox store. This processor is responsible for retrieving unpublished messages from the store and publishing them using the publisher.
type OutboxStore ¶
type OutboxStore interface { // Save persists a message in the outbox store and returns an error // if the save operation fails. Save(context.Context, async.Message) error // FindUnpublished retrieves a slice of messages that have not yet been // published, up to a maximum number specified by the limit argument. FindUnpublished(context.Context, int) ([]async.Message, error) // MarkPublished updates the status of a batch of messages, identified by // their unique IDs, to indicate that they have been successfully published MarkPublished(context.Context, ...string) error }
OutboxStore defines the interface for an outbox storage mechanism. Implementations of this interface are responsible for storing and retrieving messages that are pending publication.