Documentation
¶
Index ¶
Constants ¶
const ( // MessageConsumedKey is the key used to record the number of messages // successfully consumed. MessageConsumedKey = "messages_consumed_total" // MessageConsumedFailedKey is the key used to record the number of // messages that failed to be consumed. MessageConsumedFailedKey = "messages_consumed_failed_total" )
const ( DefaultMaxAttempts = 5 DefaultAttemptTimeout = 10 * time.Second )
DefaultMaxAttempts is the default maximum number of attempts.
const MaxExponentialBackoff = 5 * time.Minute
Variables ¶
var ErrNoMessage = errors.New("no message in mailbox")
Error returned when the mailbox is empty.
var ( // ErrNoRouteMatch is returned when no route matches the message. ErrNoRouteMatch = errors.New("no route matches the message") )
var ( // ErrNoTx is returned when the transaction is not found in the context. ErrNoTx = errors.New("no transaction found in context") )
Functions ¶
Types ¶
type ConsumeFn ¶
ConsumeFn is the function that is called by the consumer on a message. If the function returns an error, the message is re-queued and will be processed again until success.
func RoutingConsumer ¶
RoutingConsumer returns a ConsumeFn that routes the message to the first route that matches the message according to the order of the routes. If no route matches the message, it returns ErrNoRouteMatch.
func WithMoveToMailbox ¶
WithMoveToMailbox returns a ConsumeFn that moves the message to the mailbox. This can be paired with WithRetryConsume to implement a dead-letter queue when this ConsumeFn is used as the Final function. In order to use this ConsumeFn, the consumer's transactor must be configured with recursive transactions. The enqueueing of the message is done in the same transaction as the processing of the message. This ensures that the message is not lost if the transaction is rolled back.
type ConsumeMiddleware ¶
ConsumeMiddleware is a function that wraps a ConsumeFn. It is used to implement middleware that can be applied to the ConsumeFn.
func WithObservabilityConsume ¶
func WithObservabilityConsume(p ObservabilityPolicy) ConsumeMiddleware
WithObservabilityConsume returns a ConsumeMiddleware that wraps the ConsumeFn with observability, notably logging and metrics. See ObservabilityPolicy for more details. Ideally, this middleware should immediately wrap the ConsumeFn such that retry attempts are also logged.
func WithRetryPolicyConsume ¶
func WithRetryPolicyConsume(p RetryPolicy) ConsumeMiddleware
WithRetryConsume returns a ConsumeMiddleware that wraps the ConsumeFn with a retry policy. See RetryPolicy for more details.
type Consumer ¶
type Consumer interface { // Consume consumes messages from the mailbox. It is safe to call this // method concurrently. This method is meant to be called in a loop, the // caller is responsible to apply back-pressure if needed. // // It is recommended that the function: // - is idempotent, it may be called multiple times for the same message // - returns quickly to avoid holding the row lock for too long // - does not call other methods on the consumer to avoid deadlocks // // The following errors are returned: // - nil: a message was successfully consumed // - ErrNoMessage: the mailbox is empty and no message was consumed // - Any other error: an error occurred while consuming a message // // The consumer does not have any dead-letter mechanism. If the function // returns an error, the message is re-queued and will be processed again. // It is the responsibility of the caller to handle a maximum number of // retries and/or to move the message to a dead-letter queue, see the // various middlewares for more details, e.g. WithTimeoutConsume, // WithRetryPolicyConsume. Consume(context.Context) error // Size returns the number of messages in the mailbox. Size(context.Context) (size int64, err error) }
Consumer consumes messages from the mailbox. Once a message is processed, it is removed from the mailbox. The draining is controlled by the caller via the Consume method. The caller is responsible to call this method in a loop. The consumer does not have any background goroutines.
func NewConsumer ¶
func NewConsumer(ctx context.Context, transactor tx.Transactor, table string, consume ConsumeFn) (Consumer, error)
NewConsumer creates a new consumer. The table must exist and have the same schema as required by Mailbox. The consumer does not have any background goroutines, the caller is responsible to drive the draining in an infinite loop.
The context is not persisted and is only used to validate the database connection and schema validation.
type Mailbox ¶
type Mailbox interface { // Put adds a message to the mailbox. If the method returns an error, // the message is not added to the mailbox and caller is responsible // for retrying and/or rolling back the transaction. The transaction is // explicitly passed allowing the caller to control the transaction boundaries. Put(context.Context, *sql.Tx, Message) error }
Mailbox is a message queue backed by Postgres.
func NewMailbox ¶
NewMailbox creates a new mailbox. The table must exist and have the following schema:
- id VARCHAR PRIMARY KEY - metadata JSONB - payload BYTEA - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
The table name must be fully qualified, for example: "public.mailbox".
type MatchFn ¶
Filter is a function that returns true if the message should be given to this consumer.
type Message ¶
type Message struct { // The message identifier. ID string // The message metadata. Metadata map[string]string // The message payload. Payload []byte }
Message is a message in the mailbox.
type ObservabilityPolicy ¶
type ObservabilityPolicy struct { // Metrics is the expvar map used to record metrics. If nil, no metrics // are recorded. Two metrics are recorded: // - messages_consumed_total: the number of messages successfully consumed // - messages_consumed_failed_total: the number of messages that failed to be consumed // The metrics keys are statically defined by MessageConsumedKey and // MessageConsumedFailedKey. Metrics *expvar.Map // Logger is the logger used to log events. If nil, no logging is done. // By default, only errors are logged. See LogSuccess for more details. Logger *slog.Logger // Attrs is the function that returns the attributes to log. If nil, no // attributes are extracted and logged. See slog's documentation for more // details on attributes. Attrs func(context.Context, Message) []any // If set, successful message consumption is logged. Errors are always // logged. If nil, no logging is done. LogSuccess bool }
ObservabilityPolicy controls how a message is logged with the WithObservabilityConsume middleware. It also supports recording metrics. It exposes, at a high level, the following:
- the logger used to log errors
- the attributes to log
- whether successful message consumption is logged
- the expvar map used to record metrics
type RetryPolicy ¶
type RetryPolicy struct { // Backoff is the function that returns the duration to wait before // retrying. The argument is the number of retries and the message. If // the function returns 0, it retries immediately. By default, it uses // an exponential backoff similar to Kubernetes, e.g. 10s, 20s, 40s, ..., // capped at 5 minutes. Backoff func(int, Message) time.Duration // Final is invoked after all retries have been exhausted. For example, // it could be used to move the message to a dead-letter queue, or it could // be used to log the message. If the function returns an error, the // message is retried. By default, it swallows the error. Note that this // function is not recovered from. Final ConsumeFn // MaxAttempts is the maximum number of attempts before giving up. If the // value not strictly positive, it is set to DefaultMaxAttempts. MaxAttempts int // AttemptTimeout is the timeout for each attempt. If the value is not // strictly positive, it is set to DefaultAttemptTimeout. AttemptTimeout time.Duration }
RetryPolicy controls how a message is retried with the WithRetryConsume middleware. It exposes, at a high level, the following:
- the backoff function
- the final function, invoked after all retries have been exhausted
- the maximum number of attempts
- the timeout for each attempt
- it recovers from panics and return an error