Documentation ¶
Index ¶
- Constants
- Variables
- func CorrelationID(h message.HandlerFunc) message.HandlerFunc
- func Duplicator(h message.HandlerFunc) message.HandlerFunc
- func InstantAck(h message.HandlerFunc) message.HandlerFunc
- func MessageCorrelationID(message *message.Message) string
- func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)
- func PoisonQueueWithFilter(pub message.Publisher, topic string, ...) (message.HandlerMiddleware, error)
- func RandomFail(errorProbability float32) message.HandlerMiddleware
- func RandomPanic(panicProbability float32) message.HandlerMiddleware
- func Recoverer(h message.HandlerFunc) message.HandlerFunc
- func SetCorrelationID(id string, msg *message.Message)
- func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc
- type CircuitBreaker
- type Deduplicator
- type DelayOnError
- type ExpiringKeyRepository
- type IgnoreErrors
- type MessageHasher
- type RecoveredPanicError
- type Retry
- type Throttle
Constants ¶
const ( ReasonForPoisonedKey = "reason_poisoned" PoisonedTopicKey = "topic_poisoned" PoisonedHandlerKey = "handler_poisoned" PoisonedSubscriberKey = "subscriber_poisoned" )
Metadata keys which marks the reason and context why the message was deemed poisoned.
const CorrelationIDMetadataKey = "correlation_id"
CorrelationIDMetadataKey is used to store the correlation ID in metadata.
const MessageHasherReadLimitMinimum = 64
MessageHasherReadLimitMinimum specifies the least number of bytes of a message.Message are used for calculating their hash values using a MessageHasher.
Variables ¶
var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic")
ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid.
Functions ¶
func CorrelationID ¶
func CorrelationID(h message.HandlerFunc) message.HandlerFunc
CorrelationID adds correlation ID to all messages produced by the handler. ID is based on ID from message received by handler.
To make CorrelationID working correctly, SetCorrelationID must be called to first message entering the system.
func Duplicator ¶ added in v1.0.2
func Duplicator(h message.HandlerFunc) message.HandlerFunc
Duplicator is processing messages twice, to ensure that the endpoint is idempotent.
func InstantAck ¶
func InstantAck(h message.HandlerFunc) message.HandlerFunc
InstantAck makes the handler instantly acknowledge the incoming message, regardless of any errors. It may be used to gain throughput, but at a cost: If you had exactly-once delivery, you may expect at-least-once instead. If you had ordered messages, the ordering might be broken.
func MessageCorrelationID ¶
MessageCorrelationID returns correlation ID from the message.
func PoisonQueue ¶
PoisonQueue provides a middleware that salvages unprocessable messages and published them on a separate topic. The main middleware chain then continues on, business as usual.
func PoisonQueueWithFilter ¶ added in v0.4.0
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error)
PoisonQueueWithFilter is just like PoisonQueue, but accepts a function that decides which errors qualify for the poison queue.
func RandomFail ¶
func RandomFail(errorProbability float32) message.HandlerMiddleware
RandomFail makes the handler fail with an error based on random chance. Error probability should be in the range (0,1).
func RandomPanic ¶
func RandomPanic(panicProbability float32) message.HandlerMiddleware
RandomPanic makes the handler panic based on random chance. Panic probability should be in the range (0,1).
func Recoverer ¶
func Recoverer(h message.HandlerFunc) message.HandlerFunc
Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace to any error returned from the handler.
func SetCorrelationID ¶
SetCorrelationID sets a correlation ID for the message.
SetCorrelationID should be called when the message enters the system. When message is produced in a request (for example HTTP), message correlation ID should be the same as the request's correlation ID.
func Timeout ¶ added in v1.0.0
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc
Timeout makes the handler cancel the incoming message's context after a specified time. Any timeout-sensitive functionality of the handler should listen on msg.Context().Done() to know when to fail.
Types ¶
type CircuitBreaker ¶ added in v1.3.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker is a middleware that wraps the handler in a circuit breaker. Based on the configuration, the circuit breaker will fail fast if the handler keeps returning errors. This is useful for preventing cascading failures.
func NewCircuitBreaker ¶ added in v1.3.3
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker
NewCircuitBreaker returns a new CircuitBreaker middleware. Refer to the gobreaker documentation for the available settings.
func (CircuitBreaker) Middleware ¶ added in v1.3.3
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware returns the CircuitBreaker middleware.
type Deduplicator ¶ added in v1.3.6
type Deduplicator struct { KeyFactory MessageHasher Repository ExpiringKeyRepository Timeout time.Duration }
Deduplicator drops similar messages if they are present in a ExpiringKeyRepository. The similarity is determined by a MessageHasher. Time out is applied to repository operations using context.WithTimeout.
Call Deduplicator.Middleware for a new middleware or [Deduplicator.Decorator] for a message.PublisherDecorator.
KeyFactory defaults to NewMessageHasherAdler32 with read limit set to math.MaxInt64 for fast tagging. Use NewMessageHasherSHA256 for minimal collisions.
Repository defaults to NewMapExpiringKeyRepository with one minute retention window. This default setting is performant but **does not support distributed operations**. If you implement a ExpiringKeyRepository backed by Redis, please submit a pull request.
Timeout defaults to one minute. If lower than five milliseconds, it is set to five milliseconds.
ExpiringKeyRepository must expire values in a certain time window. If there is no expiration, only one unique message will be ever delivered as long as the repository keeps its state.
func (*Deduplicator) IsDuplicate ¶ added in v1.3.6
func (d *Deduplicator) IsDuplicate(m *message.Message) (bool, error)
IsDuplicate returns true if the message hash tag calculated using a MessageHasher was seen in deduplication time window.
func (*Deduplicator) Middleware ¶ added in v1.3.6
func (d *Deduplicator) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware returns the message.HandlerMiddleware that drops similar messages in a given time window.
func (*Deduplicator) PublisherDecorator ¶ added in v1.3.6
func (d *Deduplicator) PublisherDecorator() message.PublisherDecorator
PublisherDecorator returns a decorator that acknowledges and drops every message.Message that was recognized by a Deduplicator.
The returned decorator provides the same functionality to a message.Publisher as Deduplicator.Middleware to a message.Router.
type DelayOnError ¶ added in v1.4.0
type DelayOnError struct { // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. InitialInterval time.Duration // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. MaxInterval time.Duration // Multiplier is the factor by which the waiting interval will be multiplied between retries. Multiplier float64 }
DelayOnError is a middleware that adds the delay metadata to the message if an error occurs.
IMPORTANT: The delay metadata doesn't cause delays with all Pub/Subs! Using it won't have any effect on Pub/Subs that don't support it. See the list of supported Pub/Subs in the documentation: https://watermill.io/advanced/delayed-messages/
func (*DelayOnError) Middleware ¶ added in v1.4.0
func (d *DelayOnError) Middleware(h message.HandlerFunc) message.HandlerFunc
type ExpiringKeyRepository ¶ added in v1.3.6
type ExpiringKeyRepository interface { // IsDuplicate returns `true` if the key // was not checked in recent past. // The key must expire in a certain time window. IsDuplicate(ctx context.Context, key string) (ok bool, err error) }
ExpiringKeyRepository is a state container for checking the existence of a key in a certain time window. All operations must be safe for concurrent use.
func NewMapExpiringKeyRepository ¶ added in v1.3.6
func NewMapExpiringKeyRepository(window time.Duration) (ExpiringKeyRepository, error)
NewMapExpiringKeyRepository returns a memory store backed by a regular hash map protected by a sync.Mutex. The state **cannot be shared or synchronized between instances** by design for performance.
If you need to drop duplicate messages by orchestration, implement ExpiringKeyRepository interface backed by Redis or similar.
Window specifies the minimum duration of how long the duplicate tags are remembered for. Real duration can extend up to 50% longer because it depends on the clean up cycle.
type IgnoreErrors ¶
type IgnoreErrors struct {
// contains filtered or unexported fields
}
IgnoreErrors provides a middleware that makes the handler ignore some explicitly whitelisted errors.
func NewIgnoreErrors ¶
func NewIgnoreErrors(errs []error) IgnoreErrors
NewIgnoreErrors creates a new IgnoreErrors middleware.
func (IgnoreErrors) Middleware ¶
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware returns the IgnoreErrors middleware.
type MessageHasher ¶ added in v1.3.6
MessageHasher returns a short tag that describes a message. The tag should be unique per message, but avoiding hash collisions entirely is not practical for performance reasons. Used for powering [Deduplicator]s.
func NewMessageHasherAdler32 ¶ added in v1.3.6
func NewMessageHasherAdler32(readLimit int64) MessageHasher
NewMessageHasherAdler32 generates message hashes using a fast Adler-32 checksum of the message.Message body. Read limit specifies how many bytes of the message are used for calculating the hash.
Lower limit improves performance but results in more false positives. Read limit must be greater than MessageHasherReadLimitMinimum.
func NewMessageHasherFromMetadataField ¶ added in v1.3.6
func NewMessageHasherFromMetadataField(field string) MessageHasher
NewMessageHasherFromMetadataField looks for a hash value inside message metadata instead of calculating a new one. Useful if a MessageHasher was applied in a previous message.HandlerFunc.
func NewMessageHasherSHA256 ¶ added in v1.3.6
func NewMessageHasherSHA256(readLimit int64) MessageHasher
NewMessageHasherSHA256 generates message hashes using a slower but more resilient hashing of the message.Message body. Read limit specifies how many bytes of the message are used for calculating the hash.
Lower limit improves performance but results in more false positives. Read limit must be greater than MessageHasherReadLimitMinimum.
type RecoveredPanicError ¶ added in v0.3.0
type RecoveredPanicError struct { V interface{} Stacktrace string }
RecoveredPanicError holds the recovered panic's error along with the stacktrace.
func (RecoveredPanicError) Error ¶ added in v0.3.0
func (p RecoveredPanicError) Error() string
type Retry ¶
type Retry struct { // MaxRetries is maximum number of times a retry will be attempted. MaxRetries int // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. InitialInterval time.Duration // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. MaxInterval time.Duration // Multiplier is the factor by which the waiting interval will be multiplied between retries. Multiplier float64 // MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0. MaxElapsedTime time.Duration // RandomizationFactor randomizes the spread of the backoff times within the interval of: // [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)]. RandomizationFactor float64 // OnRetryHook is an optional function that will be executed on each retry attempt. // The number of the current retry is passed as retryNum, OnRetryHook func(retryNum int, delay time.Duration) Logger watermill.LoggerAdapter }
Retry provides a middleware that retries the handler if errors are returned. The retry behaviour is configurable, with exponential backoff and maximum elapsed time.
func (Retry) Middleware ¶
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware returns the Retry middleware.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle provides a middleware that limits the amount of messages processed per unit of time. This may be done e.g. to prevent excessive load caused by running a handler on a long queue of unprocessed messages.
func NewThrottle ¶ added in v0.1.2
NewThrottle creates a new Throttle middleware. Example duration and count: NewThrottle(10, time.Second) for 10 messages per second
func (Throttle) Middleware ¶
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware returns the Throttle middleware.