eventorder

package
v1.5.0-preview.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2022 License: AGPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnsupportedState = errors.New("unsupported state")

Functions

This section is empty.

Types

type Barrier

type Barrier struct {
	// contains filtered or unexported fields
}

Barrier is an abstraction for applying event ordering guarantees in the router.

Events for the same key need to be processed in order, thus when an event fails but will be retried later by the router, we need to put all subsequent events for that key on hold until the failed event succeeds or fails in a terminal way.

A barrier controls the concurrency of events in two places:

1. At entrance, before the event enters the pipeline.

2. Before actually trying to send the event, since events after being accepted by the router, they are processed asynchronously through buffered channels by separate goroutine(s) aka workers.

func NewBarrier

func NewBarrier(fns ...OptFn) *Barrier

NewBarrier creates a new properly initialized Barrier

func (*Barrier) Enter

func (b *Barrier) Enter(key string, jobID int64) (accepted bool, previousFailedJobID *int64)

Enter the barrier for this key and jobID. If there is not already a barrier for this key returns true, otherwise false along with the previous failed jobID if this is the cause of the barrier. Another scenario where a barrier might exist for a key is when the previous job has failed in an unrecoverable manner and the concurrency limiter is enabled.

func (*Barrier) Leave added in v1.5.0

func (b *Barrier) Leave(key string, jobID int64)

Leave the barrier for this key and jobID. Leave acts as an undo operation for Enter, i.e. when a previously-entered job leaves the barrier it is as if this key and jobID didn't enter the barrier. Calling Leave is idempotent.

func (*Barrier) Peek added in v1.2.0

func (b *Barrier) Peek(key string) (previousFailedJobID *int64)

Peek returns the previously failed jobID for the given key, if any

func (*Barrier) Size

func (b *Barrier) Size() int

Size returns the number of active barriers

func (*Barrier) StateChanged

func (b *Barrier) StateChanged(key string, jobID int64, state string) error

StateChanged must be called at the end, after the job state change has been persisted. The only exception to this rule is when a job has failed in a retryable manner, in this scenario you should notify the barrier immediately after the failure. An ErrUnsupportedState error will be returned if the state is not supported.

func (*Barrier) String

func (b *Barrier) String() string

String returns a string representation of the barrier

func (*Barrier) Sync

func (b *Barrier) Sync() int

Sync applies any enqueued commands to the barrier's state. It should be called at the beginning of every new iteration of the main loop

func (*Barrier) Wait

func (b *Barrier) Wait(key string, jobID int64) (wait bool, previousFailedJobID *int64)

Wait returns true if the job for this key shouldn't continue, but wait (transition to a waiting state)

type OptFn added in v1.2.0

type OptFn func(b *Barrier)

func WithConcurrencyLimit added in v1.2.0

func WithConcurrencyLimit(abortConcurrencyLimit int) OptFn

WithConcurrencyLimit sets the maximum number of concurrent jobs for a given key when the limiter is enabled

func WithMetadata added in v1.2.0

func WithMetadata(metadata map[string]string) OptFn

WithMetadata includes the provided metadata in the error messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL