transmanager

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2023 License: BSD-3-Clause Imports: 5 Imported by: 0

README

Transaction Manager

Package transactionmanager provides structures and interfaces for handling message processing in an asynchronous messaging system. Specifically, the outbox_processor is responsible for managing the lifecycle of outgoing messages, ensuring they are published and marked accordingly in the outbox store.

Provides middleware functionalities for managing message persistence and publication in an asynchronous messaging system. The outbox components handle outgoing message transactions ensuring that messages are stored before being published to prevent message loss.

Implements middleware functionality for processing incoming and outgoing messages within an asynchronous messaging system.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InboxHandler

func InboxHandler(store InboxStore) async.MessageHandlerMiddleware

InboxHandler returns a new instance of MessageHandlerMiddleware that uses the provided InboxStore to save incoming messages before passing them to the next handler in the chain. It ensures that duplicate messages are handled gracefully by acknowledging them without returning an error, thus preventing multiple processing of the same message.

The middleware wraps around the next MessageHandler to intercept the call, perform operations on the message, and delegate the message to the next handler if appropriate.

func OutboxPublisher

func OutboxPublisher(store OutboxStore) async.MessagePublisherMiddleware

OutboxPublisher creates a new MessagePublisherMiddleware using the provided OutboxStore. The middleware ensures that messages are saved to the OutboxStore before they are published. This is a critical step in guaranteeing that messages are not lost in the event of a failure during the publication process.

If the Save operation detects a duplicate message, it is silently acknowledged to maintain idempotency and prevent duplicate message delivery.

The middleware intercepts the message publication process, applies the store's Save operation, and proceeds with the publication if successful.

Types

type ErrDuplicateMessage

type ErrDuplicateMessage string

ErrDuplicateMessage defines a custom error type that is used to represent a duplicate message scenario within the message processing.

func (ErrDuplicateMessage) Error

func (e ErrDuplicateMessage) Error() string

Error implements the error interface for ErrDuplicateMessage. It formats the error message to indicate a duplicate message has been encountered.

type InboxStore

type InboxStore interface {
	// Save attempts to save an incoming message to the store.
	// It returns an error if the saving process fails.
	Save(context.Context, async.IncomingMessage) error
}

InboxStore is an interface that abstracts the storage mechanism for incoming messages. Implementations of this interface should handle the persistence of message data.

type OutboxProcessor

type OutboxProcessor interface {
	Start(ctx context.Context) error
}

OutboxProcessor is an interface that outlines the start process for message processing. Implementations of this interface must be able to initiate the processing of outgoing messages.

func NewOutboxProcessor

func NewOutboxProcessor(publisher async.MessagePublisher, store OutboxStore) OutboxProcessor

NewOutboxProcessor creates and returns a new instance of an outboxProcessor with the given message publisher and outbox store. This processor is responsible for retrieving unpublished messages from the store and publishing them using the publisher.

type OutboxStore

type OutboxStore interface {
	// Save persists a message in the outbox store and returns an error
	// if the save operation fails.
	Save(context.Context, async.Message) error

	// FindUnpublished retrieves a slice of messages that have not yet been
	// published, up to a maximum number specified by the limit argument.
	FindUnpublished(context.Context, int) ([]async.Message, error)

	// MarkPublished updates the status of a batch of messages, identified by
	// their unique IDs, to indicate that they have been successfully published
	MarkPublished(context.Context, ...string) error
}

OutboxStore defines the interface for an outbox storage mechanism. Implementations of this interface are responsible for storing and retrieving messages that are pending publication.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL