Documentation ¶
Index ¶
- Variables
- func EventIDFromToken(token []byte) (int64, error)
- func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)
- func MachineData[T any](n *Node) (T, error)
- func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
- func RegisterImmediateExecutor[T Task](r *Registry, executor ImmediateExecutor[T]) error
- func RegisterRemoteMethod(r *Registry, method RemoteMethod, executor RemoteExecutor) error
- func RegisterTimerExecutor[T Task](r *Registry, executor TimerExecutor[T]) error
- func ValidateNotTransitioned(ref *persistencespb.StateMachineRef, node *Node) error
- type AccessType
- type Collection
- func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
- func (c Collection[T]) Data(stateMachineID string) (T, error)
- func (c Collection[T]) List() []*Node
- func (c Collection[T]) Node(stateMachineID string) (*Node, error)
- func (c Collection[T]) Size() int
- func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
- type Environment
- type EventDefinition
- type ImmediateExecutor
- type Key
- type Node
- func (n *Node) AddChild(key Key, data any) (*Node, error)
- func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
- func (n *Node) CheckRunning() error
- func (n *Node) Child(path []Key) (*Node, error)
- func (n *Node) ClearTransactionState()
- func (n *Node) CompareState(incomingNode *Node) (int, error)
- func (n *Node) Dirty() bool
- func (n *Node) InternalRepr() *persistencespb.StateMachineNode
- func (n *Node) LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)
- func (n *Node) Outputs() []PathAndOutputs
- func (n *Node) Path() []Key
- func (n *Node) Sync(incomingNode *Node) error
- func (n *Node) Walk(fn func(*Node) error) error
- type NodeBackend
- type PathAndOutputs
- type Ref
- type Registry
- func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)
- func (r *Registry) ExecuteImmediateTask(ctx context.Context, env Environment, ref Ref, task Task) error
- func (r *Registry) ExecuteRemoteMethod(ctx context.Context, env Environment, ref Ref, methodName string, ...) ([]byte, error)
- func (r *Registry) ExecuteTimerTask(env Environment, node *Node, task Task) error
- func (r *Registry) Machine(t string) (def StateMachineDefinition, ok bool)
- func (r *Registry) RegisterEventDefinition(def EventDefinition) error
- func (r *Registry) RegisterMachine(sm StateMachineDefinition) error
- func (r *Registry) RegisterTaskSerializer(t string, def TaskSerializer) error
- func (r *Registry) TaskSerializer(t string) (d TaskSerializer, ok bool)
- type RemoteExecutor
- type RemoteMethod
- type StateMachine
- type StateMachineDefinition
- type Task
- type TaskAttributes
- type TaskRegenerator
- type TaskSerializer
- type TimerExecutor
- type Transition
- type TransitionOutput
- type TransitionOutputWithCount
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicateRegistration = errors.New("duplicate registration")
ErrDuplicateRegistration is returned by a Registry when it detects duplicate registration.
var ErrIncompatibleType = errors.New("state machine data was cast into an incompatible type")
ErrIncompatibleType is returned when trying to cast a state machine's data to a type that it is incompatible with.
var ErrInitialTransitionMismatch = errors.New("node initial failover version or transition count mismatch")
ErrInitialTransitionMismatch is returned when the initial failover version or transition count of a node does not match the incoming node upon sync.
var ErrInvalidTransition = errors.New("invalid transition")
ErrInvalidTransition is returned from Transition.Apply on an invalid state transition.
var ErrNotCherryPickable = errors.New("event not cherry pickable")
ErrNotCherryPickable should be returned by CherryPick if an event should not be cherry picked for whatever reason.
var ErrNotRegistered error = notRegisteredError{"not registered"}
ErrNotRegistered is returned by a Registry when trying to get a type that is not registered.
var ErrSerializationFailed = errors.New("serialization failed")
var ErrStateMachineAlreadyExists = errors.New("state machine already exists")
ErrStateMachineAlreadyExists is returned when trying to add a state machine with an ID that already exists in a Collection.
var ErrStateMachineNotFound = errors.New("state machine not found")
ErrStateMachineNotFound is returned when looking up a non-existing state machine in a Node or a Collection.
var Immediate = time.Time{}
Immediate is a sentinel value to express that a task must be executed immediately instead of delayed to a certain deadline.
Functions ¶
func EventIDFromToken ¶
EventIDFromToken gets the event ID associated with an event load token.
func GenerateEventLoadToken ¶
func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)
GenerateEventLoadToken generates a token for loading a history event from an Environment. Events should typically be immutable making this function safe to call outside of an [Environment.Access] call.
func MachineData ¶
MachineData deserializes the persistent state machine's data, casts it to type T, and returns it. Returns an error when deserialization or casting fails.
func MachineTransition ¶
func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
MachineTransition runs the given transitionFn on a machine's data for the given key. It updates the state machine's metadata and marks the entry as dirty in the node's cache. If the transition fails, the changes are rolled back and no state is mutated.
func RegisterImmediateExecutor ¶ added in v1.25.0
func RegisterImmediateExecutor[T Task](r *Registry, executor ImmediateExecutor[T]) error
RegisterImmediateExecutor registers an ImmediateExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.
func RegisterRemoteMethod ¶ added in v1.25.0
func RegisterRemoteMethod(r *Registry, method RemoteMethod, executor RemoteExecutor) error
RegisterRemoteMethod registers an RemoteExecutor for the given remote method definition. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.
func RegisterTimerExecutor ¶ added in v1.25.0
func RegisterTimerExecutor[T Task](r *Registry, executor TimerExecutor[T]) error
RegisterTimerExecutor registers a TimerExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.
func ValidateNotTransitioned ¶ added in v1.26.2
func ValidateNotTransitioned(ref *persistencespb.StateMachineRef, node *Node) error
ValidateNotTransitioned returns a consts.ErrStaleReference if the machine has transitioned since the task was generated.
Types ¶
type AccessType ¶
type AccessType int
AccessType is a specifier for storage access.
const ( // AccessRead specifies read access. AccessRead AccessType = iota // AccessWrite specifies write access. AccessWrite AccessType = iota )
type Collection ¶
type Collection[T any] struct { // The type of machines stored in this collection. Type string // contains filtered or unexported fields }
A Collection of similarly typed sibling state machines.
func NewCollection ¶
func NewCollection[T any](node *Node, stateMachineType string) Collection[T]
NewCollection creates a new Collection.
func (Collection[T]) Add ¶
func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
Add adds a node to the collection as a child of the collection's underlying Node.
func (Collection[T]) Data ¶
func (c Collection[T]) Data(stateMachineID string) (T, error)
Data gets the data for a given state machine ID.
func (Collection[T]) List ¶
func (c Collection[T]) List() []*Node
List returns all nodes in this collection.
func (Collection[T]) Node ¶
func (c Collection[T]) Node(stateMachineID string) (*Node, error)
Node gets an Node for a given state machine ID.
func (Collection[T]) Size ¶
func (c Collection[T]) Size() int
Size returns the number of machines in this collection.
func (Collection[T]) Transition ¶
func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
Transition transitions a machine by ID.
type Environment ¶
type Environment interface { // Wall clock. Backed by a the shard's time source. Now() time.Time // Access a state machine Node for the given ref. // // When using AccessRead, the accessor must guarantee not to mutate any state, accessor errors will not cause // mutable state unload. Access(ctx context.Context, ref Ref, accessType AccessType, accessor func(*Node) error) error }
Executor environment.
type EventDefinition ¶
type EventDefinition interface { Type() enumspb.EventType // IsWorkflowTaskTrigger returns a boolean indicating whether this event type should trigger a workflow task. IsWorkflowTaskTrigger() bool // Apply a history event to the state machine. Triggered during replication and workflow reset. Apply(root *Node, event *historypb.HistoryEvent) error // Cherry pick (a.k.a "reapply") an event from a different history branch. // Implementations should apply the event to the machine state and return nil in case the event is cherry-pickable. // Command events should never be cherry picked as we rely on the workflow to reschedule them. // Return [ErrNotCherryPickable], [ErrStateMachineNotFound], or [ErrInvalidTransition] to skip cherry picking. Any // other error is considered fatal and will abort the cherry pick process. CherryPick(root *Node, event *historypb.HistoryEvent, resetReapplyExcludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error }
EventDefinition is a definition for a history event for a given event type.
type ImmediateExecutor ¶ added in v1.25.0
ImmediateExecutor is responsible for executing immediate tasks (e.g: transfer, outbound). Implementations should be registered via [RegisterImmediateExecutors] to handle specific task types.
type Node ¶
type Node struct { // Key of this node in parent's map. Empty if node is the root. Key Key // Parent node. Nil if current node is the root. Parent *Node // contains filtered or unexported fields }
Node is a node in a hierarchical state machine tree.
It holds a persistent representation of itself and maintains an in-memory cache of deserialized data and child nodes. Node data should not be manipulated directly and should only be done using MachineTransition or [Collection.Transtion] to ensure the tree tracks dirty states and update transition counts.
func NewRoot ¶
func NewRoot(registry *Registry, t string, data any, children map[string]*persistencespb.StateMachineMap, backend NodeBackend) (*Node, error)
NewRoot creates a new root Node. Children may be provided from persistence to rehydrate the tree. Returns ErrNotRegistered if the key's type is not registered in the given registry or serialization errors.
func (*Node) AddChild ¶
AddChild adds an immediate child to a node, serializing the given data. Returns ErrStateMachineAlreadyExists if a child with the given key already exists, ErrNotRegistered if the key's type is not found in the node's state machine registry and serialization errors.
func (*Node) AddHistoryEvent ¶
func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
AddHistoryEvent adds a history event to be committed at the end of the current transaction. Must be called within an [Environment.Access] function block with write access.
func (*Node) CheckRunning ¶ added in v1.25.0
CheckRunning has two modes of operation: 1. If the node is **not** attached to a workflow (not yet supported), it returns nil. 2. If the node is attached to a workflow, it verifies that the workflow execution is running, and returns ErrWorkflowCompleted or nil.
May return other errors returned from MachineData.
func (*Node) ClearTransactionState ¶
func (n *Node) ClearTransactionState()
ClearTransactionState resets all transition outputs in the tree. This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate transition outputs.
func (*Node) CompareState ¶ added in v1.25.0
CompareState compare current node state with the incoming node state. Returns 0 if the states are equal, a positive number if the current state is considered newer, a negative number if the incoming state is considered newer. Meant to be used by the framework, **not** by components. TODO: remove once transition history is enabled.
func (*Node) InternalRepr ¶ added in v1.25.0
func (n *Node) InternalRepr() *persistencespb.StateMachineNode
InternalRepr returns the internal persistence representation of this node. Meant to be used by the framework, **not** by components.
func (*Node) LoadHistoryEvent ¶
Load a history event by token generated via GenerateEventLoadToken. Must be called within an [Environment.Access] function block with either read or write access.
func (*Node) Outputs ¶
func (n *Node) Outputs() []PathAndOutputs
Outputs returns all outputs produced by transitions on this tree.
type NodeBackend ¶
type NodeBackend interface { // AddHistoryEvent adds a history event to be committed at the end of the current transaction. AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent // LoadHistoryEvent loads a history event by token generated via [GenerateEventLoadToken]. LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error) // GetCurrentVersion returns the current namespace failover version. GetCurrentVersion() int64 // NextTransitionCount returns the current state transition count from the state transition history. NextTransitionCount() int64 }
NodeBackend is a concrete implementation to support interacting with the underlying platform. It currently has only a single implementation - workflow mutable state.
type PathAndOutputs ¶
type PathAndOutputs struct { Path []Key Outputs []TransitionOutputWithCount }
type Ref ¶
type Ref struct { WorkflowKey definition.WorkflowKey StateMachineRef *persistencespb.StateMachineRef // If non-zero, this field represents the ID of the task this Ref came from. Used for stale task detection and // serves as an indicator whether this Ref can reference stale state. This should be set during task processing // where we can validate the task that embeds this reference against shard clock. TaskID int64 // An optional function to validate the ref is not stale. // For tasks, this is copied from the task's Validate() implementation. Validate func(ref *persistencespb.StateMachineRef, node *Node) error }
Ref is a reference to a statemachine on a specific workflow. It contains the workflow key and the key of the statemachine in the state machine Environment as well as the information to perform staleness checks for itself or the state that it is referencing.
func (Ref) StateMachinePath ¶
StateMachinePath gets the state machine path for from this reference.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maintains a mapping from state machine type to a StateMachineDefinition and task type to TaskSerializer. Registry methods are **not** protected by a lock and all registration is expected to happen in a single thread on startup for performance reasons.
func (*Registry) EventDefinition ¶
func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)
EventDefinition returns an EventDefinition for a given type and a boolean indicating whether it was found.
func (*Registry) ExecuteImmediateTask ¶ added in v1.25.0
func (r *Registry) ExecuteImmediateTask( ctx context.Context, env Environment, ref Ref, task Task, ) error
ExecuteImmediateTask gets an ImmediateExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func (*Registry) ExecuteRemoteMethod ¶ added in v1.25.0
func (r *Registry) ExecuteRemoteMethod( ctx context.Context, env Environment, ref Ref, methodName string, serializedInput []byte, ) ([]byte, error)
ExecuteRemoteMethod gets an RemoteExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func (*Registry) ExecuteTimerTask ¶ added in v1.25.0
func (r *Registry) ExecuteTimerTask( env Environment, node *Node, task Task, ) error
ExecuteTimerTask gets a TimerExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func (*Registry) Machine ¶
func (r *Registry) Machine(t string) (def StateMachineDefinition, ok bool)
Machine returns a StateMachineDefinition for a given type and a boolean indicating whether it was found.
func (*Registry) RegisterEventDefinition ¶
func (r *Registry) RegisterEventDefinition(def EventDefinition) error
RegisterEventDefinition registers an EventDefinition for the given event type. Returns an ErrDuplicateRegistration if a definition for the type has already been registered.
func (*Registry) RegisterMachine ¶
func (r *Registry) RegisterMachine(sm StateMachineDefinition) error
RegisterMachine registers a StateMachineDefinition by its type. Returns an ErrDuplicateRegistration if the state machine type has already been registered.
func (*Registry) RegisterTaskSerializer ¶
func (r *Registry) RegisterTaskSerializer(t string, def TaskSerializer) error
RegisterTaskSerializer registers a TaskSerializer for a given type. Returns an ErrDuplicateRegistration if a serializer for this task type has already been registered.
func (*Registry) TaskSerializer ¶
func (r *Registry) TaskSerializer(t string) (d TaskSerializer, ok bool)
TaskSerializer returns a TaskSerializer for a given type and a boolean indicating whether it was found.
type RemoteExecutor ¶ added in v1.25.0
RemoteExecutor is responsible for executing remote methods. // Implementations should be registered via [RegisterRemoteExecutors] to handle specific methods.
type RemoteMethod ¶ added in v1.25.0
type RemoteMethod interface { // Name of the remote method. Must be unique per state machine. Name() string // SerializeOutput serializes output of the invocation to a byte array that is suitable for transport. SerializeOutput(output any) ([]byte, error) // DeserializeInput deserializes input from bytes that is then passed to the handler. DeserializeInput(data []byte) (any, error) }
RemoteMethod can be defined for each state machine to handle external request, like RPCs, but as part of the HSM framework. See RemoteExecutor for how to define the handler for remote methods.
type StateMachine ¶
type StateMachine[S comparable] interface { TaskRegenerator State() S SetState(S) }
A StateMachine is anything that can get and set a comparable state S and re-generate tasks based on current state. It is meant to be used with Transition objects to safely transition their state on a given event.
type StateMachineDefinition ¶
type StateMachineDefinition interface { Type() string // Serialize a state machine into bytes. Serialize(any) ([]byte, error) // Deserialize a state machine from bytes. Deserialize([]byte) (any, error) // CompareState compares two state objects. It should return 0 if the states are equal, a positive number if the // first state is considered newer, a negative number if the second state is considered newer. // TODO: Remove this method and implementations once transition history is fully implemented. For now, we have to // rely on each component to tell the framework which state is newer and if sync state can overwrite the states in // the standby cluster. CompareState(any, any) (int, error) }
StateMachineDefinition provides type information and a serializer for a state machine.
type Task ¶
type Task interface { // Task type that must be unique per task definition. Type() string // A deadline for firing this task. // This represents a lower bound and actual execution may get delayed if the system is overloaded or for various // other reasons. // Return [Immediate] to schedule this task immediately. Deadline() time.Time // The destination of this task, used to group tasks into a per namespace-and-destination scheduler. // Tasks may return an empty string if the task is delivered internally within the system. // If a destination is set, a task will be scheduled on the outbound queue. // Currently Destination and Deadline are mutually exclusive. Destination() string // Validate checks if the task is still valid for processing for the current node state. // Implementors may return [ErrStaleReference] or [consts.ErrWorkflowCompleted] if the task is no longer valid. // A typical implementation may use [node.CheckRunning], [ValidateNotTransitioned], or check if the state of the // machine is still relevant for running this task. Validate(ref *persistencespb.StateMachineRef, node *Node) error }
A Task is generated by a state machine in order to drive execution. For example, a callback state machine in the SCHEDULED state, would generate an invocation task to be eventually executed by the framework. State machine transitions and tasks are committed atomically to ensure that the system is in a consistent state.
Tasks are generated by calling the [StateMachine.Tasks] method on a state machine after it has transitioned. Tasks are executed by an executor that is registered to handle a specific task type. The framework converts this minimal task representation into [tasks.Task] instances, filling in the state machine reference, workflow key, and task ID. A TaskSerializer need to be registered in a Registry for a given type in order to process tasks of that type.
type TaskAttributes ¶ added in v1.26.2
TaskAttributes are common attributes for a Task.
type TaskRegenerator ¶
A TaskRegenerator is invoked to regenerate tasks post state-based replication or when refreshing all tasks for a workflow.
type TaskSerializer ¶
type TaskSerializer interface { Serialize(Task) ([]byte, error) Deserialize(data []byte, attrs TaskAttributes) (Task, error) }
TaskSerializer provides type information and a serializer for a state machine.
type TimerExecutor ¶ added in v1.25.0
type TimerExecutor[T Task] func(env Environment, node *Node, task T) error
TimerExecutor is responsible for executing timer tasks. Implementations should be registered via [RegisterTimerExecutors] to handle specific task types. Timers tasks are collapsed into a single task which will execute all timers that have hit their deadline while holding a lock on the workflow.
type Transition ¶
type Transition[S comparable, SM StateMachine[S], E any] struct { // Source states that are valid for this transition. Sources []S // Destination state to transition to. Destination S // contains filtered or unexported fields }
Transition represents a state machine transition for a machine of type SM with state S and event E.
func NewTransition ¶
func NewTransition[S comparable, SM StateMachine[S], E any](src []S, dst S, apply func(SM, E) (TransitionOutput, error)) Transition[S, SM, E]
NewTransition creates a new Transition from the given source states to a destination state for a given event. The apply function is called after verifying the transition is possible and setting the destination state.
func (Transition[S, SM, E]) Apply ¶
func (t Transition[S, SM, E]) Apply(sm SM, event E) (TransitionOutput, error)
Apply applies a transition event to the given state machine changing the state machine's state to the transition's Destination on success.
func (Transition[S, SM, E]) Possible ¶
func (t Transition[S, SM, E]) Possible(sm SM) bool
Possible returns a boolean indicating whether the transition is possible for the current state.
type TransitionOutput ¶
type TransitionOutput struct {
Tasks []Task
}
TransitionOutput is output produced for a single transition.
type TransitionOutputWithCount ¶ added in v1.25.0
type TransitionOutputWithCount struct { TransitionOutput TransitionCount int64 }