outbox

package module
v0.0.0-...-d170e9e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2024 License: MIT Imports: 9 Imported by: 1

README

CI Status Go Report Card Go Reference License Coverage Status

Project Logo

pgx-outbox

This is a simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver.

More advanced options are described in Revisiting the Outbox Pattern article by Gunnar Morling.

Motivation behind this library is to provide a generic extensible implementation to avoid boilerplate code in projects.

Note: this is not a general-purpose Postgres queue, even though internal implementation is based on a table with a queue-like structure.

Diagram

How to use

1. Add database migration to a project:
CREATE TABLE IF NOT EXISTS outbox_messages
(
    id           BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,

    broker       TEXT                                NOT NULL,
    topic        TEXT                                NOT NULL,
    metadata     JSONB,
    payload      JSONB                               NOT NULL,

    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
    published_at TIMESTAMP
);

CREATE INDEX idx_outbox_messages_published_at ON outbox_messages (published_at);

The outbox table name can be customized, but the table structure should remain exactly the same.

2. Add outbox.Writer to repository layer:
type repo struct {
	pool *pgxpool.Pool
	
	writer outbox.Writer
	messageMapper types.ToMessageFunc[User]
}

To map your a domain model, i.e. User to the outbox message, implement the types.ToMessageFunc function is service layer and pass it to the repository either in New function or as a repository method parameter.

Start using the writer.Write method in the repository methods which should produce outbox messages.

func (r *repo) CreateUser(ctx context.Context, user User) (u User, txErr error) {
	// create a transaction, commit/rollback in defer() depending on txErr

	user, err = r.createUser(ctx, tx, user)
	if err != nil {
		return u, fmt.Errorf("createUser: %w", err)
	}

	message, err := r.messageMapper(user)
	if err != nil {
		return u, fmt.Errorf("messageMapper: %w", err)
	}

	if _, err := r.writer.Write(ctx, tx, message); err != nil {
		return u, fmt.Errorf("writer.Write: %w", err)
	}

	return user, nil
}

See outbox.Writer example in repo.go of the 01_sns directory.

3. Add outbox.Forwarder to a cronjob:
forwarder, err := outbox.NewForwarderFromPool("outbox_messages", pool, publisher)

stats, err := forwarder.Forward(ctx, 10)
slog.Info("forwarded", "stats", stats)

where pool is a pgxpool.Pool and publisher is an implementation of outbox.Publisher.

This library provides reference publisher implementation for AWS SNS publisher in the sns module.

publisher, err := outboxSns.NewPublisher(awsSnsCli, messageTransformer{})

where messageTransformer is an implementation of outboxSns.MessageTransformer interface, for example:

func (mt messageTransformer) Transform(message types.Message) (*awsSns.PublishInput, error) {
	topicARN := fmt.Sprintf("arn:aws:sns:%s:%s:%s", tc.region, tc.accountID, message.Topic)

	return &awsSns.PublishInput{
		Message:  aws.String(string(message.Payload)),
		TopicArn: &topicARN,
	}, nil
}

See outbox.Forwarder example in main.go of the 01_sns directory.

Examples

1. SNS

Source code and instructions for the example are located in the examples/01_sns directory.

Example 1 diagram

Stargazers over time

Stargazers over time

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTxNil = errors.New("tx is nil")

	ErrTableEmpty = errors.New("table is empty")
)

Functions

This section is empty.

Types

type ForwardOption

type ForwardOption func(forwarder *forwarder)

func WithForwardFilter

func WithForwardFilter(filter types.MessageFilter) ForwardOption

type Forwarder

type Forwarder interface {
	Forward(ctx context.Context, limit int) (types.ForwardStats, error)
}

Forwarder reads unpublished messages from the outbox table, publishes them and then marks them as published. It is recommended to run a single Forwarder instance per outbox table, i.e. in Kubernetes cronjob, or at least to isolate different Forwarder instances acting on the same outbox table by using different filters in outbox.Reader.

func NewForwarder

func NewForwarder(reader Reader, publisher Publisher, opts ...ForwardOption) (Forwarder, error)

func NewForwarderFromPool

func NewForwarderFromPool(table string, pool *pgxpool.Pool, publisher Publisher, opts ...ForwardOption) (Forwarder, error)

type Publisher

type Publisher interface {
	Publish(ctx context.Context, message types.Message) error
}

type ReadOption

type ReadOption func(*reader)

func WithReadFilter

func WithReadFilter(filter types.MessageFilter) ReadOption

type Reader

type Reader interface {
	// Read reads unpublished messages from the outbox table that match the filter.
	// limit is the maximum number of messages to read.
	// Limit and frequency of Read invocations should be considered carefully to avoid overloading the database.
	Read(ctx context.Context, limit int) ([]types.Message, error)

	// Ack acknowledges / marks the messages by ids as published in a single transaction.
	// ids can be obtained from the Read method output.
	// It returns the number of messages acknowledged.
	Ack(ctx context.Context, ids []int64) (int, error)
}

Reader reads outbox unpublished messages from a single outbox table. Users should prefer to interact directly with Forwarder instance instead of Reader. Read and Ack happen in different transactions.

func NewReader

func NewReader(table string, pool *pgxpool.Pool, opts ...ReadOption) (Reader, error)

type Tx

type Tx interface{}

Tx is a transaction interface to support both and pgx.Tx and *sql.Tx. As pgx.Tx and *sql.Tx do not have common method signatures, this is empty interface.

type WriteOption

type WriteOption func(*writer)

func WithDisablePreparedBatch

func WithDisablePreparedBatch() WriteOption

type Writer

type Writer interface {
	// Write writes the message to the outbox table.
	// It returns the ID of the newly inserted message.
	Write(ctx context.Context, tx Tx, message types.Message) (int64, error)

	// WriteBatch writes multiple messages to the outbox table.
	// It returns the IDs of the newly inserted messages.
	// It returns an error if any of the messages fail to write.
	WriteBatch(ctx context.Context, tx pgx.Tx, messages []types.Message) ([]int64, error)
}

Writer writes outbox messages to a single outbox table. To write messages to multiple outbox tables, create multiple Writer instances. An outbox message must be written in the same transaction as business entities, hence the tx argument which supports both pgx.Tx and *sql.Tx. Implementations must be safe for concurrent use by multiple goroutines.

func NewWriter

func NewWriter(table string, opts ...WriteOption) (Writer, error)

Directories

Path Synopsis
internal
sns module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL