Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrTxNil = errors.New("tx is nil") ErrTableEmpty = errors.New("table is empty") )
Functions ¶
This section is empty.
Types ¶
type ForwardOption ¶
type ForwardOption func(forwarder *forwarder)
func WithForwardFilter ¶
func WithForwardFilter(filter types.MessageFilter) ForwardOption
type Forwarder ¶
Forwarder reads unpublished messages from the outbox table, publishes them and then marks them as published. It is recommended to run a single Forwarder instance per outbox table, i.e. in Kubernetes cronjob, or at least to isolate different Forwarder instances acting on the same outbox table by using different filters in outbox.Reader.
func NewForwarder ¶
func NewForwarder(reader Reader, publisher Publisher, opts ...ForwardOption) (Forwarder, error)
func NewForwarderFromPool ¶
type ReadOption ¶
type ReadOption func(*reader)
func WithReadFilter ¶
func WithReadFilter(filter types.MessageFilter) ReadOption
type Reader ¶
type Reader interface { // Read reads unpublished messages from the outbox table that match the filter. // limit is the maximum number of messages to read. // Limit and frequency of Read invocations should be considered carefully to avoid overloading the database. Read(ctx context.Context, limit int) ([]types.Message, error) // Ack acknowledges / marks the messages by ids as published in a single transaction. // ids can be obtained from the Read method output. // It returns the number of messages acknowledged. Ack(ctx context.Context, ids []int64) (int, error) }
Reader reads outbox unpublished messages from a single outbox table. Users should prefer to interact directly with Forwarder instance instead of Reader. Read and Ack happen in different transactions.
type Tx ¶
type Tx interface{}
Tx is a transaction interface to support both and pgx.Tx and *sql.Tx. As pgx.Tx and *sql.Tx do not have common method signatures, this is empty interface.
type WriteOption ¶
type WriteOption func(*writer)
func WithDisablePreparedBatch ¶
func WithDisablePreparedBatch() WriteOption
type Writer ¶
type Writer interface { // Write writes the message to the outbox table. // It returns the ID of the newly inserted message. Write(ctx context.Context, tx Tx, message types.Message) (int64, error) // WriteBatch writes multiple messages to the outbox table. // It returns the IDs of the newly inserted messages. // It returns an error if any of the messages fail to write. WriteBatch(ctx context.Context, tx pgx.Tx, messages []types.Message) ([]int64, error) }
Writer writes outbox messages to a single outbox table. To write messages to multiple outbox tables, create multiple Writer instances. An outbox message must be written in the same transaction as business entities, hence the tx argument which supports both pgx.Tx and *sql.Tx. Implementations must be safe for concurrent use by multiple goroutines.