middleware

package
v1.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReasonForPoisonedKey  = "reason_poisoned"
	PoisonedTopicKey      = "topic_poisoned"
	PoisonedHandlerKey    = "handler_poisoned"
	PoisonedSubscriberKey = "subscriber_poisoned"
)

Metadata keys which marks the reason and context why the message was deemed poisoned.

View Source
const CorrelationIDMetadataKey = "correlation_id"

CorrelationIDMetadataKey is used to store the correlation ID in metadata.

Variables

View Source
var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic")

ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid.

Functions

func CorrelationID

func CorrelationID(h message.HandlerFunc) message.HandlerFunc

CorrelationID adds correlation ID to all messages produced by the handler. ID is based on ID from message received by handler.

To make CorrelationID working correctly, SetCorrelationID must be called to first message entering the system.

func Duplicator

func Duplicator(h message.HandlerFunc) message.HandlerFunc

Duplicator is processing messages twice, to ensure that the endpoint is idempotent.

func InstantAck

func InstantAck(h message.HandlerFunc) message.HandlerFunc

InstantAck makes the handler instantly acknowledge the incoming message, regardless of any errors. It may be used to gain throughput, but at a cost: If you had exactly-once delivery, you may expect at-least-once instead. If you had ordered messages, the ordering might be broken.

func MessageCorrelationID

func MessageCorrelationID(message *message.Message) string

MessageCorrelationID returns correlation ID from the message.

func PoisonQueue

func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)

PoisonQueue provides a middleware that salvages unprocessable messages and published them on a separate topic. The main middleware chain then continues on, business as usual.

func PoisonQueueWithFilter

func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error)

PoisonQueueWithFilter is just like PoisonQueue, but accepts a function that decides which errors qualify for the poison queue.

func RandomFail

func RandomFail(errorProbability float32) message.HandlerMiddleware

RandomFail makes the handler fail with an error based on random chance. Error probability should be in the range (0,1).

func RandomPanic

func RandomPanic(panicProbability float32) message.HandlerMiddleware

RandomPanic makes the handler panic based on random chance. Panic probability should be in the range (0,1).

func Recoverer

Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace to any error returned from the handler.

func SetCorrelationID

func SetCorrelationID(id string, msg *message.Message)

SetCorrelationID sets a correlation ID for the message.

SetCorrelationID should be called when the message enters the system. When message is produced in a request (for example HTTP), message correlation ID should be the same as the request's correlation ID.

func Timeout

func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc

Timeout makes the handler cancel the incoming message's context after a specified time. Any timeout-sensitive functionality of the handler should listen on msg.Context().Done() to know when to fail.

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker is a middleware that wraps the handler in a circuit breaker. Based on the configuration, the circuit breaker will fail fast if the handler keeps returning errors. This is useful for preventing cascading failures.

func NewCircuitBreaker

func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker

NewCircuitBreaker returns a new CircuitBreaker middleware. Refer to the gobreaker documentation for the available settings.

func (CircuitBreaker) Middleware

Middleware returns the CircuitBreaker middleware.

type IgnoreErrors

type IgnoreErrors struct {
	// contains filtered or unexported fields
}

IgnoreErrors provides a middleware that makes the handler ignore some explicitly whitelisted errors.

func NewIgnoreErrors

func NewIgnoreErrors(errs []error) IgnoreErrors

NewIgnoreErrors creates a new IgnoreErrors middleware.

func (IgnoreErrors) Middleware

Middleware returns the IgnoreErrors middleware.

type RecoveredPanicError

type RecoveredPanicError struct {
	V          any
	Stacktrace string
}

RecoveredPanicError holds the recovered panic's error along with the stacktrace.

func (RecoveredPanicError) Error

func (p RecoveredPanicError) Error() string

type Retry

type Retry struct {
	// MaxRetries is maximum number of times a retry will be attempted.
	MaxRetries int

	// InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier.
	InitialInterval time.Duration
	// MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval.
	MaxInterval time.Duration
	// Multiplier is the factor by which the waiting interval will be multiplied between retries.
	Multiplier float64
	// MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0.
	MaxElapsedTime time.Duration
	// RandomizationFactor randomizes the spread of the backoff times within the interval of:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook is an optional function that will be executed on each retry attempt.
	// The number of the current retry is passed as retryNum,
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}

Retry provides a middleware that retries the handler if errors are returned. The retry behaviour is configurable, with exponential backoff and maximum elapsed time.

func (Retry) Middleware

func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc

Middleware returns the Retry middleware.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle provides a middleware that limits the amount of messages processed per unit of time. This may be done e.g. to prevent excessive load caused by running a handler on a long queue of unprocessed messages.

func NewThrottle

func NewThrottle(count int64, duration time.Duration) *Throttle

NewThrottle creates a new Throttle middleware. Example duration and count: NewThrottle(10, time.Second) for 10 messages per second

func (Throttle) Middleware

Middleware returns the Throttle middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL