Documentation ¶
Index ¶
- Variables
- func EventIDFromToken(token []byte) (int64, error)
- func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)
- func MachineData[T any](n *Node) (T, error)
- func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
- func RegisterImmediateExecutor[T Task](r *Registry, executor ImmediateExecutor[T]) error
- func RegisterTimerExecutor[T Task](r *Registry, executor TimerExecutor[T]) error
- type AccessType
- type Collection
- func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
- func (c Collection[T]) Data(stateMachineID string) (T, error)
- func (c Collection[T]) List() []*Node
- func (c Collection[T]) Node(stateMachineID string) (*Node, error)
- func (c Collection[T]) Size() int
- func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
- type ConcurrentTask
- type Environment
- type EventDefinition
- type ImmediateExecutor
- type Key
- type MachineType
- type Node
- func (n *Node) AddChild(key Key, data any) (*Node, error)
- func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
- func (n *Node) CheckRunning() error
- func (n *Node) Child(path []Key) (*Node, error)
- func (n *Node) ClearTransactionState()
- func (n *Node) Dirty() bool
- func (n *Node) InternalRepr() *persistencespb.StateMachineNode
- func (n *Node) LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)
- func (n *Node) Outputs() []PathAndOutputs
- func (n *Node) Path() []Key
- func (n *Node) Walk(fn func(*Node) error) error
- type NodeBackend
- type PathAndOutputs
- type Ref
- type Registry
- func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)
- func (r *Registry) ExecuteImmediateTask(ctx context.Context, env Environment, ref Ref, task Task) error
- func (r *Registry) ExecuteTimerTask(env Environment, node *Node, task Task) error
- func (r *Registry) Machine(t int32) (def StateMachineDefinition, ok bool)
- func (r *Registry) RegisterEventDefinition(def EventDefinition) error
- func (r *Registry) RegisterMachine(sm StateMachineDefinition) error
- func (r *Registry) RegisterTaskSerializer(t int32, def TaskSerializer) error
- func (r *Registry) TaskSerializer(t int32) (d TaskSerializer, ok bool)
- type StateMachine
- type StateMachineDefinition
- type Task
- type TaskKind
- type TaskKindOutbound
- type TaskKindTimer
- type TaskRegenerator
- type TaskSerializer
- type TaskType
- type TimerExecutor
- type Transition
- type TransitionOutput
Constants ¶
This section is empty.
Variables ¶
var ErrConcurrentTaskNotImplemented = errors.New("concurrent task not implemented")
ErrConcurrentTaskNotImplemented is returned by a Registry when trying to register a concurrent task that did not implement the ConcurrentTask interface.
var ErrDuplicateRegistration = errors.New("duplicate registration")
ErrDuplicateRegistration is returned by a Registry when it detects duplicate registration.
var ErrIncompatibleType = errors.New("state machine data was cast into an incompatible type")
ErrIncompatibleType is returned when trying to cast a state machine's data to a type that it is incompatible with.
var ErrInvalidTaskKind = errors.New("invalid task kind")
ErrInvalidTaskKind can be returned by a TaskSerializer if it received the wrong task kind.
var ErrInvalidTransition = errors.New("invalid transition")
ErrInvalidTransition is returned from Transition.Apply on an invalid state transition.
var ErrNotRegistered error = notRegisteredError{"not registered"}
ErrNotRegistered is returned by a Registry when trying to get a type that is not registered.
var ErrStateMachineAlreadyExists = errors.New("state machine already exists")
ErrStateMachineAlreadyExists is returned when trying to add a state machine with an ID that already exists in a Collection.
var ErrStateMachineNotFound = errors.New("state machine not found")
ErrStateMachineNotFound is returned when looking up a non-existing state machine in a Node or a Collection.
Functions ¶
func EventIDFromToken ¶
EventIDFromToken gets the event ID associated with an event load token.
func GenerateEventLoadToken ¶
func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)
GenerateEventLoadToken generates a token for loading a history event from an Environment. Events should typically be immutable making this function safe to call outside of an [Environment.Access] call.
func MachineData ¶
MachineData deserializes the persistent state machine's data, casts it to type T, and returns it. Returns an error when deserialization or casting fails.
func MachineTransition ¶
func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)
MachineTransition runs the given transitionFn on a machine's data for the given key. It updates the state machine's metadata and marks the entry as dirty in the node's cache. If the transition fails, the changes are rolled back and no state is mutated.
func RegisterImmediateExecutor ¶ added in v1.25.0
func RegisterImmediateExecutor[T Task](r *Registry, executor ImmediateExecutor[T]) error
RegisterImmediateExecutor registers an ImmediateExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.
func RegisterTimerExecutor ¶ added in v1.25.0
func RegisterTimerExecutor[T Task](r *Registry, executor TimerExecutor[T]) error
RegisterTimerExecutor registers a TimerExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.
Types ¶
type AccessType ¶
type AccessType int
AccessType is a specifier for storage access.
const ( // AccessRead specifies read access. AccessRead AccessType = iota // AccessWrite specifies write access. AccessWrite AccessType = iota )
type Collection ¶
type Collection[T any] struct { // The type of machines stored in this collection. Type int32 // contains filtered or unexported fields }
A Collection of similarly typed sibling state machines.
func NewCollection ¶
func NewCollection[T any](node *Node, stateMachineType int32) Collection[T]
NewCollection creates a new Collection.
func (Collection[T]) Add ¶
func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)
Add adds a node to the collection as a child of the collection's underlying Node.
func (Collection[T]) Data ¶
func (c Collection[T]) Data(stateMachineID string) (T, error)
Data gets the data for a given state machine ID.
func (Collection[T]) List ¶
func (c Collection[T]) List() []*Node
List returns all nodes in this collection.
func (Collection[T]) Node ¶
func (c Collection[T]) Node(stateMachineID string) (*Node, error)
Node gets an Node for a given state machine ID.
func (Collection[T]) Size ¶
func (c Collection[T]) Size() int
Size returns the number of machines in this collection.
func (Collection[T]) Transition ¶
func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error
Transition transitions a machine by ID.
type ConcurrentTask ¶ added in v1.25.0
type ConcurrentTask interface { Task // Validate checks if the [ConcurrentTask] is still valid for processing in // either active or standby queue task executor. // Must return ErrStaleReference if the task is no longer valid. // A typical implementation may check if the state of the machine is still // relevant for running this task. Validate(node *Node) error }
A concurrent task can run concurrently with other tasks.
type Environment ¶
type Environment interface { // Wall clock. Backed by a the shard's time source. Now() time.Time // Access a state machine Node for the given ref. // // When using AccessRead, the accessor must guarantee not to mutate any state, accessor errors will not cause // mutable state unload. Access(ctx context.Context, ref Ref, accessType AccessType, accessor func(*Node) error) error }
Executor environment.
type EventDefinition ¶
type EventDefinition interface { Type() enumspb.EventType // IsWorkflowTaskTrigger returns a boolean indicating whether this event type should trigger a workflow task. IsWorkflowTaskTrigger() bool // Apply a history event to the state machine. Triggered during replication and workflow reset. Apply(root *Node, event *historypb.HistoryEvent) error }
EventDefinition is a definition for a history event for a given event type.
type ImmediateExecutor ¶ added in v1.25.0
ImmediateExecutor is responsible for executing immediate tasks (e.g: transfer, outbound). Implementations should be registered via [RegisterImmediateExecutors] to handle specific task types.
type Key ¶
type Key struct { // Type ID of the state machine. Type int32 // ID of the state machine. ID string }
Key is used for looking up a state machine in a Node.
type MachineType ¶
type MachineType struct { // Type ID that is used to minimize the persistence storage space and address a machine (see also [Key]). // Type IDs are expected to be immutable as they are used for looking up state machine definitions when loading data // from persistence. ID int32 // Human readable name for this type. Name string }
State machine type.
type Node ¶
type Node struct { // Key of this node in parent's map. Empty if node is the root. Key Key // Parent node. Nil if current node is the root. Parent *Node // contains filtered or unexported fields }
Node is a node in a hierarchical state machine tree.
It holds a persistent representation of itself and maintains an in-memory cache of deserialized data and child nodes. Node data should not be manipulated directly and should only be done using MachineTransition or [Collection.Transtion] to ensure the tree tracks dirty states and update transition counts.
func NewRoot ¶
func NewRoot(registry *Registry, t int32, data any, children map[int32]*persistencespb.StateMachineMap, backend NodeBackend) (*Node, error)
NewRoot creates a new root Node. Children may be provided from persistence to rehydrate the tree. Returns ErrNotRegistered if the key's type is not registered in the given registry or serialization errors.
func (*Node) AddChild ¶
AddChild adds an immediate child to a node, serializing the given data. Returns ErrStateMachineAlreadyExists if a child with the given key already exists, ErrNotRegistered if the key's type is not found in the node's state machine registry and serialization errors.
func (*Node) AddHistoryEvent ¶
func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
AddHistoryEvent adds a history event to be committed at the end of the current transaction. Must be called within an [Environment.Access] function block with write access.
func (*Node) CheckRunning ¶ added in v1.25.0
CheckRunning has two modes of operation: 1. If the node is **not** attached to a workflow (not yet supported), it returns nil. 2. If the node is attached to a workflow, it verifies that the workflow execution is running, and returns ErrWorkflowCompleted or nil.
May return other errors returned from MachineData.
func (*Node) ClearTransactionState ¶
func (n *Node) ClearTransactionState()
ClearTransactionState resets all transition outputs in the tree. This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate transition outputs.
func (*Node) InternalRepr ¶ added in v1.25.0
func (n *Node) InternalRepr() *persistencespb.StateMachineNode
InternalRepr returns the internal persistence representation of this node. Meant to be used by the framework, **not** by components.
func (*Node) LoadHistoryEvent ¶
Load a history event by token generated via GenerateEventLoadToken. Must be called within an [Environment.Access] function block with either read or write access.
func (*Node) Outputs ¶
func (n *Node) Outputs() []PathAndOutputs
Outputs returns all outputs produced by transitions on this tree.
type NodeBackend ¶
type NodeBackend interface { // AddHistoryEvent adds a history event to be committed at the end of the current transaction. AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent // LoadHistoryEvent loads a history event by token generated via [GenerateEventLoadToken]. LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error) // GetCurrentVersion returns the current namespace failover version. GetCurrentVersion() int64 // TransitionCount returns the current state transition count from the state transition history. TransitionCount() int64 }
NodeBackend is a concrete implementation to support interacting with the underlying platform. It currently has only a single implementation - workflow mutable state.
type PathAndOutputs ¶
type PathAndOutputs struct { Path []Key Outputs []TransitionOutput }
type Ref ¶
type Ref struct { WorkflowKey definition.WorkflowKey StateMachineRef *persistencespb.StateMachineRef // If non-zero, this field represents the ID of the task this Ref came from. Used for stale task detection and // serves as an indicator whether this Ref can reference stale state. This should be set during task processing // where we can validate the task that embeds this reference against shard clock. TaskID int64 }
Ref is a reference to a statemachine on a specific workflow. It contains the workflow key and the key of the statemachine in the state machine Environment as well as the information to perform staleness checks for itself or the state that it is referencing.
func (Ref) StateMachinePath ¶
StateMachinePath gets the state machine path for from this reference.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maintains a mapping from state machine type to a StateMachineDefinition and task type to TaskSerializer. Registry methods are **not** protected by a lock and all registration is expected to happen in a single thread on startup for performance reasons.
func (*Registry) EventDefinition ¶
func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)
EventDefinition returns an EventDefinition for a given type and a boolean indicating whether it was found.
func (*Registry) ExecuteImmediateTask ¶ added in v1.25.0
func (r *Registry) ExecuteImmediateTask( ctx context.Context, env Environment, ref Ref, task Task, ) error
ExecuteImmediateTask gets an ImmediateExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func (*Registry) ExecuteTimerTask ¶ added in v1.25.0
func (r *Registry) ExecuteTimerTask( env Environment, node *Node, task Task, ) error
ExecuteTimerTask gets a TimerExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.
func (*Registry) Machine ¶
func (r *Registry) Machine(t int32) (def StateMachineDefinition, ok bool)
Machine returns a StateMachineDefinition for a given type and a boolean indicating whether it was found.
func (*Registry) RegisterEventDefinition ¶
func (r *Registry) RegisterEventDefinition(def EventDefinition) error
RegisterEventDefinition registers an EventDefinition for the given event type. Returns an ErrDuplicateRegistration if a definition for the type has already been registered.
func (*Registry) RegisterMachine ¶
func (r *Registry) RegisterMachine(sm StateMachineDefinition) error
RegisterMachine registers a StateMachineDefinition by its type. Returns an ErrDuplicateRegistration if the state machine type has already been registered.
func (*Registry) RegisterTaskSerializer ¶
func (r *Registry) RegisterTaskSerializer(t int32, def TaskSerializer) error
RegisterTaskSerializer registers a TaskSerializer for a given type. Returns an ErrDuplicateRegistration if a serializer for this task type has already been registered.
func (*Registry) TaskSerializer ¶
func (r *Registry) TaskSerializer(t int32) (d TaskSerializer, ok bool)
TaskSerializer returns a TaskSerializer for a given type and a boolean indicating whether it was found.
type StateMachine ¶
type StateMachine[S comparable] interface { TaskRegenerator State() S SetState(S) }
A StateMachine is anything that can get and set a comparable state S and re-generate tasks based on current state. It is meant to be used with Transition objects to safely transition their state on a given event.
type StateMachineDefinition ¶
type StateMachineDefinition interface { Type() MachineType // Serialize a state machine into bytes. Serialize(any) ([]byte, error) // Deserialize a state machine from bytes. Deserialize([]byte) (any, error) }
StateMachineDefinition provides type information and a serializer for a state machine.
type Task ¶
type Task interface { // Task type that must be unique per task definition. Type() TaskType // Kind of the task, see [TaskKind] for more info. Kind() TaskKind Concurrent() bool }
A Task is generated by a state machine in order to drive execution. For example, a callback state machine in the SCHEDULED state, would generate an invocation task to be eventually executed by the framework. State machine transitions and tasks are committed atomically to ensure that the system is in a consistent state.
Tasks are generated by calling the [StateMachine.Tasks] method on a state machine after it has transitioned. Tasks are executed by an executor that is registered to handle a specific task type. The framework converts this minimal task representation into [tasks.Task] instances, filling in the state machine reference, workflow key, and task ID. A TaskSerializer need to be registered in a Registry for a given type in order to process tasks of that type.
Tasks must specify whether they can run concurrently with other tasks. A non-concurrent task is a task that correlates with a single machine transition and is considered stale if its corresponding machine has transitioned since it was generated. Non-concurrent tasks are persisted with a Ref that contains the machine transition count at the time they was generated, which is expected to match the current machine's transition count upon execution. Concurrent tasks skip this validation. If the task is concurrent, you must implement the ConcurrentTask interface below.
type TaskKind ¶
type TaskKind interface {
// contains filtered or unexported methods
}
TaskKind represents the possible set of kinds for a task. Each kind is mapped to a concrete [tasks.Task] implementation and is backed by specific protobuf message; for example, TaskKindTimer maps to TimerTaskInfo. Kind also determines which queue this task is scheduled on - it is mapped to a specific tasks.Category.
type TaskKindOutbound ¶
type TaskKindOutbound struct { // The destination of this task, used to group tasks into a per namespace-and-destination scheduler. Destination string // contains filtered or unexported fields }
TaskKindOutbound is a task that is scheduled on an outbound queue such as the callback queue.
type TaskKindTimer ¶
type TaskKindTimer struct { // A deadline for firing this task. // This represents a lower bound and actual execution may get delayed if the system is overloaded or for various // other reasons. Deadline time.Time // contains filtered or unexported fields }
TaskKindTimer is a task that is scheduled on the timer queue.
type TaskRegenerator ¶
A TaskRegenerator is invoked to regenerate tasks post state-based replication or when refreshing all tasks for a workflow.
type TaskSerializer ¶
type TaskSerializer interface { Serialize(Task) ([]byte, error) Deserialize(data []byte, kind TaskKind) (Task, error) }
TaskSerializer provides type information and a serializer for a state machine.
type TaskType ¶
type TaskType struct { // Type ID that is used to minimize the persistence storage space and look up the regisered serializer. // Type IDs are expected to be immutable as a serializer must be compatible with the task's persistent data. ID int32 // Human readable name for this type. Name string }
Task type.
type TimerExecutor ¶ added in v1.25.0
type TimerExecutor[T Task] func(env Environment, node *Node, task T) error
TimerExecutor is responsible for executing timer tasks. Implementations should be registered via [RegisterTimerExecutors] to handle specific task types. Timers tasks are collapsed into a single task which will execute all timers that have hit their deadline while holding a lock on the workflow.
type Transition ¶
type Transition[S comparable, SM StateMachine[S], E any] struct { // Source states that are valid for this transition. Sources []S // Destination state to transition to. Destination S // contains filtered or unexported fields }
Transition represents a state machine transition for a machine of type SM with state S and event E.
func NewTransition ¶
func NewTransition[S comparable, SM StateMachine[S], E any](src []S, dst S, apply func(SM, E) (TransitionOutput, error)) Transition[S, SM, E]
NewTransition creates a new Transition from the given source states to a destination state for a given event. The apply function is called after verifying the transition is possible and setting the destination state.
func (Transition[S, SM, E]) Apply ¶
func (t Transition[S, SM, E]) Apply(sm SM, event E) (TransitionOutput, error)
Apply applies a transition event to the given state machine changing the state machine's state to the transition's Destination on success.
func (Transition[S, SM, E]) Possible ¶
func (t Transition[S, SM, E]) Possible(sm SM) bool
Possible returns a boolean indicating whether the transition is possible for the current state.
type TransitionOutput ¶
type TransitionOutput struct {
Tasks []Task
}
TransitionOutput is output produced for a single transition.