hsm

package
v1.25.0-114.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateRegistration = errors.New("duplicate registration")

ErrDuplicateRegistration is returned by a Registry when it detects duplicate registration.

View Source
var ErrIncompatibleType = errors.New("state machine data was cast into an incompatible type")

ErrIncompatibleType is returned when trying to cast a state machine's data to a type that it is incompatible with.

View Source
var ErrInvalidTaskKind = errors.New("invalid task kind")

ErrInvalidTaskKind can be returned by a TaskSerializer if it received the wrong task kind.

View Source
var ErrInvalidTransition = errors.New("invalid transition")

ErrInvalidTransition is returned from Transition.Apply on an invalid state transition.

View Source
var ErrNotRegistered error = notRegisteredError{"not registered"}

ErrNotRegistered is returned by a Registry when trying to get a type that is not registered.

View Source
var ErrStateMachineAlreadyExists = errors.New("state machine already exists")

ErrStateMachineAlreadyExists is returned when trying to add a state machine with an ID that already exists in a Collection.

View Source
var ErrStateMachineNotFound = errors.New("state machine not found")

ErrStateMachineNotFound is returned when looking up a non-existing state machine in a Node or a Collection.

Functions

func EventIDFromToken

func EventIDFromToken(token []byte) (int64, error)

EventIDFromToken gets the event ID associated with an event load token.

func GenerateEventLoadToken

func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)

GenerateEventLoadToken generates a token for loading a history event from an Environment. Events should typically be immutable making this function safe to call outside of an [Environment.Access] call.

func MachineData

func MachineData[T any](n *Node) (T, error)

MachineData deserializes the persistent state machine's data, casts it to type T, and returns it. Returns an error when deserialization or casting fails.

func MachineTransition

func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)

MachineTransition runs the given transitionFn on a machine's data for the given key. It updates the state machine's metadata and marks the entry as dirty in the node's cache. If the transition fails, the changes are rolled back and no state is mutated.

func RegisterImmediateExecutors

func RegisterImmediateExecutors[T Task](
	r *Registry,
	t int32,
	activeExecutor ImmediateExecutor[T],
	standbyExecutor ImmediateExecutor[T],
) error

RegisterImmediateExecutors registers an active and a standby ImmediateExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.

func RegisterTimerExecutors

func RegisterTimerExecutors[T Task](
	r *Registry,
	t int32,
	activeExecutor TimerExecutor[T],
	standbyExecutor TimerExecutor[T],
) error

RegisterExecutors registers an active and a standby ImmediateExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.

Types

type AccessType

type AccessType int

AccessType is a specifier for storage access.

const (
	// AccessRead specifies read access.
	AccessRead AccessType = iota
	// AccessWrite specifies write access.
	AccessWrite AccessType = iota
)

type Collection

type Collection[T any] struct {
	// The type of machines stored in this collection.
	Type int32
	// contains filtered or unexported fields
}

A Collection of similarly typed sibling state machines.

func NewCollection

func NewCollection[T any](node *Node, stateMachineType int32) Collection[T]

NewCollection creates a new Collection.

func (Collection[T]) Add

func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)

Add adds a node to the collection as a child of the collection's underlying Node.

func (Collection[T]) Data

func (c Collection[T]) Data(stateMachineID string) (T, error)

Data gets the data for a given state machine ID.

func (Collection[T]) List

func (c Collection[T]) List() []*Node

List returns all nodes in this collection.

func (Collection[T]) Node

func (c Collection[T]) Node(stateMachineID string) (*Node, error)

Node gets an Node for a given state machine ID.

func (Collection[T]) Size

func (c Collection[T]) Size() int

Size returns the number of machines in this collection.

func (Collection[T]) Transition

func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error

Transition transitions a machine by ID.

type Environment

type Environment interface {
	// Wall clock. Backed by a the shard's time source.
	Now() time.Time
	// Access a state machine Node for the given ref.
	//
	// When using AccessRead, the accessor must guarantee not to mutate any state, accessor errors will not cause
	// mutable state unload.
	Access(ctx context.Context, ref Ref, accessType AccessType, accessor func(*Node) error) error
}

Executor environment.

type EventDefinition

type EventDefinition interface {
	Type() enumspb.EventType
	// IsWorkflowTaskTrigger returns a boolean indicating whether this event type should trigger a workflow task.
	IsWorkflowTaskTrigger() bool
}

EventDefinition is a definition for a history event for a given event type.

type ImmediateExecutor added in v1.25.0

type ImmediateExecutor[T Task] func(ctx context.Context, env Environment, ref Ref, task T) error

ImmediateExecutor is responsible for executing immediate tasks (e.g: transfer, outbound). Implementations should be registered via RegisterImmediateExecutors to handle specific task types.

type Key

type Key struct {
	// Type ID of the state machine.
	Type int32
	// ID of the state machine.
	ID string
}

Key is used for looking up a state machine in a Node.

type MachineType

type MachineType struct {
	// Type ID that is used to minimize the persistence storage space and address a machine (see also [Key]).
	// Type IDs are expected to be immutable as they are used for looking up state machine definitions when loading data
	// from persistence.
	ID int32
	// Human readable name for this type.
	Name string
}

State machine type.

type Node

type Node struct {
	// Key of this node in parent's map. Empty if node is the root.
	Key Key
	// Parent node. Nil if current node is the root.
	Parent *Node
	// contains filtered or unexported fields
}

Node is a node in a hierarchical state machine tree.

It holds a persistent representation of itself and maintains an in-memory cache of deserialized data and child nodes. Node data should not be manipulated directly and should only be done using MachineTransition or [Collection.Transtion] to ensure the tree tracks dirty states and update transition counts.

func NewRoot

func NewRoot(registry *Registry, t int32, data any, children map[int32]*persistencespb.StateMachineMap, backend NodeBackend) (*Node, error)

NewRoot creates a new root Node. Children may be provided from persistence to rehydrate the tree. Returns ErrNotRegistered if the key's type is not registered in the given registry or serialization errors.

func (*Node) AddChild

func (n *Node) AddChild(key Key, data any) (*Node, error)

AddChild adds an immediate child to a node, serializing the given data. Returns ErrStateMachineAlreadyExists if a child with the given key already exists, ErrNotRegistered if the key's type is not found in the node's state machine registry and serialization errors.

func (*Node) AddHistoryEvent

func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent

AddHistoryEvent adds a history event to be committed at the end of the current transaction. Must be called within an [Environment.Access] function block with write access.

func (*Node) Child

func (n *Node) Child(path []Key) (*Node, error)

Child recursively gets a child for the given path.

func (*Node) ClearTransactionState

func (n *Node) ClearTransactionState()

ClearTransactionState resets all transition outputs in the tree. This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate transition outputs.

func (*Node) Dirty

func (n *Node) Dirty() bool

Dirty returns true if any of the tree's state machines have transitioned.

func (*Node) LoadHistoryEvent

func (n *Node) LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)

Load a history event by token generated via GenerateEventLoadToken. Must be called within an [Environment.Access] function block with either read or write access.

func (*Node) Outputs

func (n *Node) Outputs() []PathAndOutputs

Outputs returns all outputs produced by transitions on this tree.

func (*Node) Path

func (n *Node) Path() []Key

func (*Node) TransitionCount

func (n *Node) TransitionCount() int64

TransitionCount returns the transition count for the state machine contained in this node.

func (*Node) Walk added in v1.25.0

func (n *Node) Walk(fn func(*Node) error) error

Walk applies the given function to all nodes rooted at the current node. Returns after successfully applying the function to all nodes or first error.

type NodeBackend

type NodeBackend interface {
	// AddHistoryEvent adds a history event to be committed at the end of the current transaction.
	AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
	// Load a history event by token generated via [GenerateEventLoadToken].
	LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)
}

NodeBackend is a concrete implementation to support interacting with the underlying platform. It currently has only a single implementation - workflow mutable state.

type PathAndOutputs

type PathAndOutputs struct {
	Path    []Key
	Outputs []TransitionOutput
}

type Ref

type Ref struct {
	WorkflowKey     definition.WorkflowKey
	StateMachineRef *persistencespb.StateMachineRef
}

Ref is a reference to a statemachine on a specific workflow. It contains the workflow key and the key of the statemachine in the state machine [Store] as well as the namespace failover version and transition count that is expected to match on the referenced state machine.

func (Ref) StateMachinePath

func (r Ref) StateMachinePath() []Key

StateMachinePath gets the state machine path for from this reference.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maintains a mapping from state machine type to a StateMachineDefinition and task type to TaskSerializer. Registry methods are **not** protected by a lock and all registration is expected to happen in a single thread on startup for performance reasons.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new Registry.

func (*Registry) EventDefinition

func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)

EventDefinition returns an EventDefinition for a given type and a boolean indicating whether it was found.

func (*Registry) ExecuteActiveImmediateTask

func (r *Registry) ExecuteActiveImmediateTask(
	ctx context.Context,
	env Environment,
	ref Ref,
	task Task,
) error

ExecuteActiveImmediateTask gets an ImmediateExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) ExecuteActiveTimerTask

func (r *Registry) ExecuteActiveTimerTask(
	env Environment,
	node *Node,
	task Task,
) error

ExecuteActiveTimerTask gets a TimerExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) ExecuteStandbyImmediateTask

func (r *Registry) ExecuteStandbyImmediateTask(
	ctx context.Context,
	env Environment,
	ref Ref,
	task Task,
) error

ExecuteStandbyImmediateTask gets an ImmediateExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) ExecuteStandbyTimerTask

func (r *Registry) ExecuteStandbyTimerTask(
	env Environment,
	node *Node,
	task Task,
) error

ExecuteStandbyTimerTask gets a TimerExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) Machine

func (r *Registry) Machine(t int32) (def StateMachineDefinition, ok bool)

Machine returns a StateMachineDefinition for a given type and a boolean indicating whether it was found.

func (*Registry) RegisterEventDefinition

func (r *Registry) RegisterEventDefinition(def EventDefinition) error

RegisterEventDefinition registers an EventDefinition for the given event type. Returns an ErrDuplicateRegistration if a definition for the type has already been registered.

func (*Registry) RegisterMachine

func (r *Registry) RegisterMachine(sm StateMachineDefinition) error

RegisterMachine registers a StateMachineDefinition by its type. Returns an ErrDuplicateRegistration if the state machine type has already been registered.

func (*Registry) RegisterTaskSerializer

func (r *Registry) RegisterTaskSerializer(t int32, def TaskSerializer) error

RegisterTaskSerializer registers a TaskSerializer for a given type. Returns an ErrDuplicateRegistration if a serializer for this task type has already been registered.

func (*Registry) TaskSerializer

func (r *Registry) TaskSerializer(t int32) (d TaskSerializer, ok bool)

TaskSerializer returns a TaskSerializer for a given type and a boolean indicating whether it was found.

type StateMachine

type StateMachine[S comparable] interface {
	TaskRegenerator
	State() S
	SetState(S)
}

A StateMachine is anything that can get and set a comparable state S and re-generate tasks based on current state. It is meant to be used with Transition objects to safely transition their state on a given event.

type StateMachineDefinition

type StateMachineDefinition interface {
	Type() MachineType
	// Serialize a state machine into bytes.
	Serialize(any) ([]byte, error)
	// Deserialize a state machine from bytes.
	Deserialize([]byte) (any, error)
}

StateMachineDefinition provides type information and a serializer for a state machine.

type Task

type Task interface {
	// Task type that must be unique per task definition.
	Type() TaskType
	// Kind of the task, see [TaskKind] for more info.
	Kind() TaskKind
	Concurrent() bool
}

A Task is generated by a state machine in order to drive execution. For example, a callback state machine in the SCHEDULED state, would generate an invocation task to be eventually executed by the framework. State machine transitions and tasks are committed atomically to ensure that the system is in a consistent state.

Tasks are generated by calling the [StateMachine.Tasks] method on a state machine after it has transitioned. Tasks are executed by an executor that is registered to handle a specific task type. The framework converts this minimal task representation into [tasks.Task] instances, filling in the state machine reference, workflow key, and task ID. A TaskSerializer need to be registered in a Registry for a given type in order to process tasks of that type.

Tasks must specify whether they can run concurrently with other tasks. A non-concurrent task is a task that correlates with a single machine transition and is considered stale if its corresponding machine has transitioned since it was generated. Non-concurrent tasks are persisted with a Ref that contains the machine transition count at the time they was generated, which is expected to match the current machine's transition count upon execution. Concurrent tasks skip this validation.

type TaskKind

type TaskKind interface {
	// contains filtered or unexported methods
}

TaskKind represents the possible set of kinds for a task. Each kind is mapped to a concrete [tasks.Task] implementation and is backed by specific protobuf message; for example, TaskKindTimer maps to TimerTaskInfo. Kind also determines which queue this task is scheduled on - it is mapped to a specific tasks.Category.

type TaskKindOutbound

type TaskKindOutbound struct {

	// The destination of this task, used to group tasks into a per namespace-and-destination scheduler.
	Destination string
	// contains filtered or unexported fields
}

TaskKindOutbound is a task that is scheduled on an outbound queue such as the callback queue.

type TaskKindTimer

type TaskKindTimer struct {

	// A deadline for firing this task.
	// This represents a lower bound and actual execution may get delayed if the system is overloaded or for various
	// other reasons.
	Deadline time.Time
	// contains filtered or unexported fields
}

TaskKindTimer is a task that is scheduled on the timer queue.

type TaskRegenerator

type TaskRegenerator interface {
	RegenerateTasks(*Node) ([]Task, error)
}

A TaskRegenerator is invoked to regenerate tasks post state-based replication or when refreshing all tasks for a workflow.

type TaskSerializer

type TaskSerializer interface {
	Serialize(Task) ([]byte, error)
	Deserialize(data []byte, kind TaskKind) (Task, error)
}

TaskSerializer provides type information and a serializer for a state machine.

type TaskType

type TaskType struct {
	// Type ID that is used to minimize the persistence storage space and look up the regisered serializer.
	// Type IDs are expected to be immutable as a serializer must be compatible with the task's persistent data.
	ID int32
	// Human readable name for this type.
	Name string
}

Task type.

type TimerExecutor added in v1.25.0

type TimerExecutor[T Task] func(env Environment, node *Node, task T) error

TimerExecutor is responsible for executing timer tasks. Implementations should be registered via RegisterTimerExecutors to handle specific task types. Timers tasks are collapsed into a single task which will execute all timers that have hit their deadline while holding a lock on the workflow.

type Transition

type Transition[S comparable, SM StateMachine[S], E any] struct {
	// Source states that are valid for this transition.
	Sources []S
	// Destination state to transition to.
	Destination S
	// contains filtered or unexported fields
}

Transition represents a state machine transition for a machine of type SM with state S and event E.

func NewTransition

func NewTransition[S comparable, SM StateMachine[S], E any](src []S, dst S, apply func(SM, E) (TransitionOutput, error)) Transition[S, SM, E]

NewTransition creates a new Transition from the given source states to a destination state for a given event. The apply function is called after verifying the transition is possible and setting the destination state.

func (Transition[S, SM, E]) Apply

func (t Transition[S, SM, E]) Apply(sm SM, event E) (TransitionOutput, error)

Apply applies a transition event to the given state machine changing the state machine's state to the transition's Destination on success.

func (Transition[S, SM, E]) Possible

func (t Transition[S, SM, E]) Possible(sm SM) bool

Possible returns a boolean indicating whether the transition is possible for the current state.

type TransitionOutput

type TransitionOutput struct {
	Tasks []Task
}

TransitionOutput is output produced for a single transition.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL