eventorder

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2022 License: AGPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnsupportedState = errors.New("unsupported state")

Functions

This section is empty.

Types

type Barrier

type Barrier struct {
	// contains filtered or unexported fields
}

Barrier is an abstraction for applying event ordering guarantees in the router.

Events for the same userID need to be processed in order, thus when an event fails but will be retried later by the router, we need to put all subsequent events for that userID on hold until the failed event succeeds or fails in a terminal way.

A barrier controls the concurrency of events in two places:

1. At entrance, before the event enters the pipeline.

2. Before actually trying to send the event, since events after being accepted by the router, they are processed asynchronously through buffered channels by separate goroutine(s) aka workers.

func NewBarrier

func NewBarrier(abortConcurrencyLimit int) *Barrier

NewBarrier creates a new properly initialized Barrier

func (*Barrier) Enter

func (b *Barrier) Enter(userID string, jobID int64) (accepted bool, previousFailedJobID *int64)

Enter the barrier for this userID and jobID. If there is not already a barrier for this userID returns true, otherwise false along with the previous failed jobID if this is the cause of the barrier. Another scenario where a barrier might exist for a user is when the previous job has failed in an unrecoverable manner and the concurrency limiter is enabled.

func (*Barrier) Size

func (b *Barrier) Size() int

Size returns the number of active user barriers

func (*Barrier) StateChanged

func (b *Barrier) StateChanged(userID string, jobID int64, state string) error

StateChanged must be called at the end, after the job state change has been persisted. The only exception to this rule is when a job has failed in a retryable manner, in this scenario you should notify the barrier immediately after the failure. An ErrUnsupportedState error will be returned if the state is not supported.

func (*Barrier) String

func (b *Barrier) String() string

String returns a string representation of the barrier

func (*Barrier) Sync

func (b *Barrier) Sync() int

Sync applies any enqueued commands to the barrier's state. It should be called at the beginning of every new iteration of the main loop

func (*Barrier) Wait

func (b *Barrier) Wait(userID string, jobID int64) (wait bool, previousFailedJobID *int64)

Wait returns true if the job for this userID shouldn't continue, but wait (transition to a waiting state)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL