Documentation ¶
Index ¶
- Variables
- type Barrier
- func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJobID *int64)
- func (b *Barrier) Leave(key string, jobID int64)
- func (b *Barrier) Peek(key string) (previousFailedJobID *int64)
- func (b *Barrier) Size() int
- func (b *Barrier) StateChanged(key string, jobID int64, state string) error
- func (b *Barrier) String() string
- func (b *Barrier) Sync() int
- func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID *int64)
- type OptFn
Constants ¶
This section is empty.
Variables ¶
var ErrUnsupportedState = errors.New("unsupported state")
Functions ¶
This section is empty.
Types ¶
type Barrier ¶
type Barrier struct {
// contains filtered or unexported fields
}
Barrier is an abstraction for applying event ordering guarantees in the router.
Events for the same key need to be processed in order, thus when an event fails but will be retried later by the router, we need to put all subsequent events for that key on hold until the failed event succeeds or fails in a terminal way.
A barrier controls the concurrency of events in two places:
1. At entrance, before the event enters the pipeline.
2. Before actually trying to send the event, since events after being accepted by the router, they are processed asynchronously through buffered channels by separate goroutine(s) aka workers.
func NewBarrier ¶
NewBarrier creates a new properly initialized Barrier
func (*Barrier) Enter ¶
Enter the barrier for this key and jobID. If there is not already a barrier for this key returns true, otherwise false along with the previous failed jobID if this is the cause of the barrier. Another scenario where a barrier might exist for a key is when the previous job has failed in an unrecoverable manner and the concurrency limiter is enabled.
func (*Barrier) Leave ¶ added in v1.5.0
Leave the barrier for this key and jobID. Leave acts as an undo operation for Enter, i.e. when a previously-entered job leaves the barrier it is as if this key and jobID didn't enter the barrier. Calling Leave is idempotent.
func (*Barrier) Peek ¶ added in v1.2.0
Peek returns the previously failed jobID for the given key, if any
func (*Barrier) StateChanged ¶
StateChanged must be called at the end, after the job state change has been persisted. The only exception to this rule is when a job has failed in a retryable manner, in this scenario you should notify the barrier immediately after the failure. An ErrUnsupportedState error will be returned if the state is not supported.
type OptFn ¶ added in v1.2.0
type OptFn func(b *Barrier)
func WithConcurrencyLimit ¶ added in v1.2.0
WithConcurrencyLimit sets the maximum number of concurrent jobs for a given key
func WithDrainConcurrencyLimit ¶ added in v1.8.0
WithDrainConcurrencyLimit sets the maximum number of concurrent jobs for a given key when the limiter is enabled (after a failed job has been drained, i.e. aborted)
func WithMetadata ¶ added in v1.2.0
WithMetadata includes the provided metadata in the error messages