eventbox

package
v0.0.0-...-16534be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package eventbox batches incoming events for a single Datastore entity for processing.

Index

Constants

This section is empty.

Variables

View Source
var TombstonesDelay = 5 * time.Minute

TombstonesDelay is exposed to mitigate frequent errors in CV e2e tests when tasks are run in parallel with fake clock.

Functions

func Emit

func Emit(ctx context.Context, value []byte, to Recipient) error

Emit emits a new event with provided value and auto-generated unique ID.

Types

type EVersion

type EVersion int64

EVersion is recipient entity version.

type Event

type Event dsset.Item

Event is an incoming event.

type Events

type Events []Event

Events are incoming events.

func List

func List(ctx context.Context, r Recipient) (Events, error)

List returns unprocessed events. For use in tests only.

type PostProcessFn

type PostProcessFn func(context.Context) error

PostProcessFn should be executed after event processing completes.

func ProcessBatch

func ProcessBatch(ctx context.Context, r Recipient, p Processor, maxEvents int) (_ []PostProcessFn, err error)

ProcessBatch reliably processes outstanding events, while transactionally modifying state and performing arbitrary side effects.

Returns:

  • a slice of non-nil post process functions which SHOULD be executed immediately after calling this function. Those are generally extra work that needs to be done as the result of state modification.
  • error while processing events. Tags the error with common.DSContentionTag if entity's EVersion has changed or there is contention on Datastore entities involved in a transaction.

type Processor

type Processor interface {
	// LoadState is called to load the state before a transaction.
	LoadState(context.Context) (State, EVersion, error)
	// PrepareMutation is called before a transaction to compute transitions based
	// on a batch of events.
	//
	// The events in a batch are an arbitrary subset of all outstanding events.
	// Because loading of events isn't synchronized with event senders,
	// a recipient of events may see them in different order than the origination
	// order, even if events were produced by a single sender.
	//
	// All actions that must be done atomically with updating state must be
	// encapsulated inside Transition.SideEffectFn callback.
	//
	// Garbage events will be deleted non-transactionally before executing
	// transactional transitions. These events may still be processed by a
	// concurrent invocation of a Processor. The garbage events slice may re-use
	// the given events slice. The garbage will be deleted even if PrepareMutation returns
	// non-nil error.
	//
	// For correctness, two concurrent invocation of a Processor must choose the
	// same events to be deleted as garbage. Consider scenario of 2 events A and B
	// deemed semantically the same and 2 concurrent Processor invocations:
	//   P1: let me delete A and hope to transactionally process B.
	//   P2:  ............ B and ............................... A.
	// Then, it's a real possibility that A and B are both deleted AND no neither
	// P1 nor P2 commits a transaction, thus forever forgetting about A and B.
	PrepareMutation(context.Context, Events, State) (transitions []Transition, garbage Events, err error)
	// FetchEVersion is called at the beginning of a transaction.
	//
	// The returned EVersion is compared against the one associated with a state
	// loaded via GetState. If different, the transaction is aborted and new state
	// isn't saved.
	FetchEVersion(ctx context.Context) (EVersion, error)
	// SaveState is called in a transaction to save the state if it has changed.
	//
	// The passed eversion is incremented value of eversion of what GetState
	// returned before.
	SaveState(context.Context, State, EVersion) error
}

Processor defines safe way to process events in a batch.

type Recipient

type Recipient struct {
	// Key is the Datastore key of the recipient.
	//
	// The corresponding entity doesn't have to exist.
	Key *datastore.Key
	// MonitoringString is the value for the metric field "recipient".
	//
	// There should be very few distinct values.
	MonitoringString string
}

Recipient is the recipient of the events.

type SideEffectFn

type SideEffectFn func(context.Context) error

SideEffectFn performs side effects with a Datastore transaction context. See Transition.SideEffectFn doc.

func Chain

func Chain(fs ...SideEffectFn) SideEffectFn

Chain combines several SideEffectFn.

NOTE: modifies incoming ... slice.

type State

type State any

State is an arbitrary object.

Use a pointer to an actual state.

type Transition

type Transition struct {
	// SideEffectFn is called in a transaction to atomically with the state change
	// execute any side effects of a state transition.
	//
	// Typical use is notifying other CV components via TQ tasks.
	// Can be nil, meaning there no side effects to execute.
	//
	// TODO(tandrii): introduce error tag to indicate that failure was clean and
	// should be treated as if Transition wasn't started, s.t. progress of all
	// transitions before can be saved.
	SideEffectFn SideEffectFn
	// Events to consume with this transition.
	Events Events
	// TransitionTo is a state to transition to.
	//
	// It's allowed to transition to the exact same state.
	TransitionTo State
	// PostProcessFn is the function to be called by the eventbox user after
	// event processing completes.
	//
	// Note that it will be called outside of the transaction of all state
	// transitions, so the operation inside this function is not expected
	// to be atomic with this state transition.
	PostProcessFn PostProcessFn
}

Transition is a state transition.

Directories

Path Synopsis
Package dsset implements a particular flavor of Datastore-on-Firestore backed set.
Package dsset implements a particular flavor of Datastore-on-Firestore backed set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL