Documentation ¶
Overview ¶
Package eventbox batches incoming events for a single Datastore entity for processing.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var TombstonesDelay = 5 * time.Minute
TombstonesDelay is exposed to mitigate frequent errors in CV e2e tests when tasks are run in parallel with fake clock.
Functions ¶
Types ¶
type PostProcessFn ¶
PostProcessFn should be executed after event processing completes.
func ProcessBatch ¶
func ProcessBatch(ctx context.Context, r Recipient, p Processor, maxEvents int) (_ []PostProcessFn, err error)
ProcessBatch reliably processes outstanding events, while transactionally modifying state and performing arbitrary side effects.
Returns:
- a slice of non-nil post process functions which SHOULD be executed immediately after calling this function. Those are generally extra work that needs to be done as the result of state modification.
- error while processing events. Tags the error with common.DSContentionTag if entity's EVersion has changed or there is contention on Datastore entities involved in a transaction.
type Processor ¶
type Processor interface { // LoadState is called to load the state before a transaction. LoadState(context.Context) (State, EVersion, error) // PrepareMutation is called before a transaction to compute transitions based // on a batch of events. // // The events in a batch are an arbitrary subset of all outstanding events. // Because loading of events isn't synchronized with event senders, // a recipient of events may see them in different order than the origination // order, even if events were produced by a single sender. // // All actions that must be done atomically with updating state must be // encapsulated inside Transition.SideEffectFn callback. // // Garbage events will be deleted non-transactionally before executing // transactional transitions. These events may still be processed by a // concurrent invocation of a Processor. The garbage events slice may re-use // the given events slice. The garbage will be deleted even if PrepareMutation returns // non-nil error. // // For correctness, two concurrent invocation of a Processor must choose the // same events to be deleted as garbage. Consider scenario of 2 events A and B // deemed semantically the same and 2 concurrent Processor invocations: // P1: let me delete A and hope to transactionally process B. // P2: ............ B and ............................... A. // Then, it's a real possibility that A and B are both deleted AND no neither // P1 nor P2 commits a transaction, thus forever forgetting about A and B. PrepareMutation(context.Context, Events, State) (transitions []Transition, garbage Events, err error) // FetchEVersion is called at the beginning of a transaction. // // The returned EVersion is compared against the one associated with a state // loaded via GetState. If different, the transaction is aborted and new state // isn't saved. FetchEVersion(ctx context.Context) (EVersion, error) // SaveState is called in a transaction to save the state if it has changed. // // The passed eversion is incremented value of eversion of what GetState // returned before. SaveState(context.Context, State, EVersion) error }
Processor defines safe way to process events in a batch.
type Recipient ¶
type Recipient struct { // Key is the Datastore key of the recipient. // // The corresponding entity doesn't have to exist. Key *datastore.Key // MonitoringString is the value for the metric field "recipient". // // There should be very few distinct values. MonitoringString string }
Recipient is the recipient of the events.
type SideEffectFn ¶
SideEffectFn performs side effects with a Datastore transaction context. See Transition.SideEffectFn doc.
func Chain ¶
func Chain(fs ...SideEffectFn) SideEffectFn
Chain combines several SideEffectFn.
NOTE: modifies incoming ... slice.
type Transition ¶
type Transition struct { // SideEffectFn is called in a transaction to atomically with the state change // execute any side effects of a state transition. // // Typical use is notifying other CV components via TQ tasks. // Can be nil, meaning there no side effects to execute. // // TODO(tandrii): introduce error tag to indicate that failure was clean and // should be treated as if Transition wasn't started, s.t. progress of all // transitions before can be saved. SideEffectFn SideEffectFn // Events to consume with this transition. Events Events // TransitionTo is a state to transition to. // // It's allowed to transition to the exact same state. TransitionTo State // PostProcessFn is the function to be called by the eventbox user after // event processing completes. // // Note that it will be called outside of the transaction of all state // transitions, so the operation inside this function is not expected // to be atomic with this state transition. PostProcessFn PostProcessFn }
Transition is a state transition.