Documentation ¶
Index ¶
- func LogEventMiddlewareWithContext(ctx context.Context) message.HandlerMiddleware
- func PanicRecovererMiddleware(ctx context.Context) message.HandlerMiddleware
- func PipelineStepStartCommandDelayMiddlewareWithContext(ctx context.Context) message.HandlerMiddleware
- type PlannerControl
- type RecoveredPanicError
- type Retry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LogEventMiddlewareWithContext ¶
func LogEventMiddlewareWithContext(ctx context.Context) message.HandlerMiddleware
This middleware writes the command and event to the jsonl event log file
func PanicRecovererMiddleware ¶
func PanicRecovererMiddleware(ctx context.Context) message.HandlerMiddleware
Recover from Go panic middleware. Based on Watermill Recoverer middleware.
The panic will be wrapped in a Flowpipe Error and set as a fatal error (non-retryable).
func PipelineStepStartCommandDelayMiddlewareWithContext ¶
func PipelineStepStartCommandDelayMiddlewareWithContext(ctx context.Context) message.HandlerMiddleware
Middleware to delay the PipelineStepStart command execution (for backoff purpose)
TODO: make it generic?
Types ¶
type PlannerControl ¶
type PlannerControl struct {
// contains filtered or unexported fields
}
func NewPlannerControl ¶
func NewPlannerControl(ctx context.Context) *PlannerControl
func (*PlannerControl) Middleware ¶
func (p *PlannerControl) Middleware(h message.HandlerFunc) message.HandlerFunc
type RecoveredPanicError ¶
type RecoveredPanicError struct { V interface{} Stacktrace string }
Holds the recovered panic's error along with the stacktrace.
func (RecoveredPanicError) Error ¶
func (p RecoveredPanicError) Error() string
type Retry ¶
type Retry struct { Ctx context.Context // MaxRetries is maximum number of times a retry will be attempted. MaxRetries int // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. InitialInterval time.Duration // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. MaxInterval time.Duration // Multiplier is the factor by which the waiting interval will be multiplied between retries. Multiplier float64 // MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0. MaxElapsedTime time.Duration // RandomizationFactor randomizes the spread of the backoff times within the interval of: // [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)]. RandomizationFactor float64 // OnRetryHook is an optional function that will be executed on each retry attempt. // The number of the current retry is passed as retryNum, OnRetryHook func(retryNum int, delay time.Duration) }
Custom retry middleware. To be used with Go Channel pub/sub. We need to be able to ack the message after the retry > max retry. Otherwise the message will be re-delivered and and we end up in an infinite loop situation
The majority of this code is based on the Watermill Retry middleware.
func (Retry) Middleware ¶
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc
Middleware function returns the Retry middleware.