hsm

package
v1.25.0-119.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConcurrentTaskNotImplemented = errors.New("concurrent task not implemented")

ErrConcurrentTaskNotImplemented is returned by a Registry when trying to register a concurrent task that did not implement the ConcurrentTask interface.

View Source
var ErrDuplicateRegistration = errors.New("duplicate registration")

ErrDuplicateRegistration is returned by a Registry when it detects duplicate registration.

View Source
var ErrIncompatibleType = errors.New("state machine data was cast into an incompatible type")

ErrIncompatibleType is returned when trying to cast a state machine's data to a type that it is incompatible with.

View Source
var ErrInitialTransitionMismatch = errors.New("node initial failover version or transition count mismatch")

ErrInitialTransitionMismatch is returned when the initial failover version or transition count of a node does not match the incoming node upon sync.

View Source
var ErrInvalidTaskKind = errors.New("invalid task kind")

ErrInvalidTaskKind can be returned by a TaskSerializer if it received the wrong task kind.

View Source
var ErrInvalidTransition = errors.New("invalid transition")

ErrInvalidTransition is returned from Transition.Apply on an invalid state transition.

View Source
var ErrNotCherryPickable = errors.New("event not cherry pickable")

ErrNotCherryPickable should be returned by CherryPick if an event should not be cherry picked for whatever reason.

View Source
var ErrNotRegistered error = notRegisteredError{"not registered"}

ErrNotRegistered is returned by a Registry when trying to get a type that is not registered.

View Source
var ErrSerializationFailed = errors.New("serialization failed")
View Source
var ErrStateMachineAlreadyExists = errors.New("state machine already exists")

ErrStateMachineAlreadyExists is returned when trying to add a state machine with an ID that already exists in a Collection.

View Source
var ErrStateMachineNotFound = errors.New("state machine not found")

ErrStateMachineNotFound is returned when looking up a non-existing state machine in a Node or a Collection.

Functions

func EventIDFromToken

func EventIDFromToken(token []byte) (int64, error)

EventIDFromToken gets the event ID associated with an event load token.

func GenerateEventLoadToken

func GenerateEventLoadToken(event *historypb.HistoryEvent) ([]byte, error)

GenerateEventLoadToken generates a token for loading a history event from an Environment. Events should typically be immutable making this function safe to call outside of an [Environment.Access] call.

func MachineData

func MachineData[T any](n *Node) (T, error)

MachineData deserializes the persistent state machine's data, casts it to type T, and returns it. Returns an error when deserialization or casting fails.

func MachineTransition

func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, error)) (retErr error)

MachineTransition runs the given transitionFn on a machine's data for the given key. It updates the state machine's metadata and marks the entry as dirty in the node's cache. If the transition fails, the changes are rolled back and no state is mutated.

func RegisterImmediateExecutor added in v1.25.0

func RegisterImmediateExecutor[T Task](r *Registry, executor ImmediateExecutor[T]) error

RegisterImmediateExecutor registers an ImmediateExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.

func RegisterRemoteMethod added in v1.25.0

func RegisterRemoteMethod(r *Registry, method RemoteMethod, executor RemoteExecutor) error

RegisterRemoteMethod registers an RemoteExecutor for the given remote method definition. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.

func RegisterTimerExecutor added in v1.25.0

func RegisterTimerExecutor[T Task](r *Registry, executor TimerExecutor[T]) error

RegisterTimerExecutor registers a TimerExecutor for the given task type. Returns an ErrDuplicateRegistration if an executor for the type has already been registered.

Types

type AccessType

type AccessType int

AccessType is a specifier for storage access.

const (
	// AccessRead specifies read access.
	AccessRead AccessType = iota
	// AccessWrite specifies write access.
	AccessWrite AccessType = iota
)

type Collection

type Collection[T any] struct {
	// The type of machines stored in this collection.
	Type string
	// contains filtered or unexported fields
}

A Collection of similarly typed sibling state machines.

func NewCollection

func NewCollection[T any](node *Node, stateMachineType string) Collection[T]

NewCollection creates a new Collection.

func (Collection[T]) Add

func (c Collection[T]) Add(stateMachineID string, data T) (*Node, error)

Add adds a node to the collection as a child of the collection's underlying Node.

func (Collection[T]) Data

func (c Collection[T]) Data(stateMachineID string) (T, error)

Data gets the data for a given state machine ID.

func (Collection[T]) List

func (c Collection[T]) List() []*Node

List returns all nodes in this collection.

func (Collection[T]) Node

func (c Collection[T]) Node(stateMachineID string) (*Node, error)

Node gets an Node for a given state machine ID.

func (Collection[T]) Size

func (c Collection[T]) Size() int

Size returns the number of machines in this collection.

func (Collection[T]) Transition

func (c Collection[T]) Transition(stateMachineID string, transitionFn func(T) (TransitionOutput, error)) error

Transition transitions a machine by ID.

type ConcurrentTask added in v1.25.0

type ConcurrentTask interface {
	Task
	// Validate checks if the [ConcurrentTask] is still valid for processing in
	// either active or standby queue task executor.
	// Must return ErrStaleReference if the task is no longer valid.
	// A typical implementation may check if the state of the machine is still
	// relevant for running this task.
	Validate(node *Node) error
}

A concurrent task can run concurrently with other tasks.

type Environment

type Environment interface {
	// Wall clock. Backed by a the shard's time source.
	Now() time.Time
	// Access a state machine Node for the given ref.
	//
	// When using AccessRead, the accessor must guarantee not to mutate any state, accessor errors will not cause
	// mutable state unload.
	Access(ctx context.Context, ref Ref, accessType AccessType, accessor func(*Node) error) error
}

Executor environment.

type EventDefinition

type EventDefinition interface {
	Type() enumspb.EventType
	// IsWorkflowTaskTrigger returns a boolean indicating whether this event type should trigger a workflow task.
	IsWorkflowTaskTrigger() bool
	// Apply a history event to the state machine. Triggered during replication and workflow reset.
	Apply(root *Node, event *historypb.HistoryEvent) error
	// Cherry pick (a.k.a "reapply") an event from a different history branch.
	// Implementations should apply the event to the machine state and return nil in case the event is cherry-pickable.
	// Command events should never be cherry picked as we rely on the workflow to reschedule them.
	// Return [ErrNotCherryPickable], [ErrStateMachineNotFound], or [ErrInvalidTransition] to skip cherry picking. Any
	// other error is considered fatal and will abort the cherry pick process.
	CherryPick(root *Node, event *historypb.HistoryEvent) error
}

EventDefinition is a definition for a history event for a given event type.

type ImmediateExecutor added in v1.25.0

type ImmediateExecutor[T Task] func(ctx context.Context, env Environment, ref Ref, task T) error

ImmediateExecutor is responsible for executing immediate tasks (e.g: transfer, outbound). Implementations should be registered via [RegisterImmediateExecutors] to handle specific task types.

type Key

type Key struct {
	// Type of the state machine.
	Type string
	// ID of the state machine.
	ID string
}

Key is used for looking up a state machine in a Node.

type Node

type Node struct {
	// Key of this node in parent's map. Empty if node is the root.
	Key Key
	// Parent node. Nil if current node is the root.
	Parent *Node
	// contains filtered or unexported fields
}

Node is a node in a hierarchical state machine tree.

It holds a persistent representation of itself and maintains an in-memory cache of deserialized data and child nodes. Node data should not be manipulated directly and should only be done using MachineTransition or [Collection.Transtion] to ensure the tree tracks dirty states and update transition counts.

func NewRoot

func NewRoot(registry *Registry, t string, data any, children map[string]*persistencespb.StateMachineMap, backend NodeBackend) (*Node, error)

NewRoot creates a new root Node. Children may be provided from persistence to rehydrate the tree. Returns ErrNotRegistered if the key's type is not registered in the given registry or serialization errors.

func (*Node) AddChild

func (n *Node) AddChild(key Key, data any) (*Node, error)

AddChild adds an immediate child to a node, serializing the given data. Returns ErrStateMachineAlreadyExists if a child with the given key already exists, ErrNotRegistered if the key's type is not found in the node's state machine registry and serialization errors.

func (*Node) AddHistoryEvent

func (n *Node) AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent

AddHistoryEvent adds a history event to be committed at the end of the current transaction. Must be called within an [Environment.Access] function block with write access.

func (*Node) CheckRunning added in v1.25.0

func (n *Node) CheckRunning() error

CheckRunning has two modes of operation: 1. If the node is **not** attached to a workflow (not yet supported), it returns nil. 2. If the node is attached to a workflow, it verifies that the workflow execution is running, and returns ErrWorkflowCompleted or nil.

May return other errors returned from MachineData.

func (*Node) Child

func (n *Node) Child(path []Key) (*Node, error)

Child recursively gets a child for the given path.

func (*Node) ClearTransactionState

func (n *Node) ClearTransactionState()

ClearTransactionState resets all transition outputs in the tree. This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate transition outputs.

func (*Node) CompareState added in v1.25.0

func (n *Node) CompareState(incomingNode *Node) (int, error)

CompareState compare current node state with the incoming node state. Returns 0 if the states are equal, a positive number if the current state is considered newer, a negative number if the incoming state is considered newer. Meant to be used by the framework, **not** by components. TODO: remove once transition history is enabled.

func (*Node) Dirty

func (n *Node) Dirty() bool

Dirty returns true if any of the tree's state machines have transitioned.

func (*Node) InternalRepr added in v1.25.0

func (n *Node) InternalRepr() *persistencespb.StateMachineNode

InternalRepr returns the internal persistence representation of this node. Meant to be used by the framework, **not** by components.

func (*Node) LoadHistoryEvent

func (n *Node) LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)

Load a history event by token generated via GenerateEventLoadToken. Must be called within an [Environment.Access] function block with either read or write access.

func (*Node) Outputs

func (n *Node) Outputs() []PathAndOutputs

Outputs returns all outputs produced by transitions on this tree.

func (*Node) Path

func (n *Node) Path() []Key

func (*Node) Sync added in v1.25.0

func (n *Node) Sync(incomingNode *Node) error

Sync updates the state of the current node to that of the incoming node. Meant to be used by the framework, **not** by components.

func (*Node) Walk added in v1.25.0

func (n *Node) Walk(fn func(*Node) error) error

Walk applies the given function to all nodes rooted at the current node. Returns after successfully applying the function to all nodes or first error.

type NodeBackend

type NodeBackend interface {
	// AddHistoryEvent adds a history event to be committed at the end of the current transaction.
	AddHistoryEvent(t enumspb.EventType, setAttributes func(*historypb.HistoryEvent)) *historypb.HistoryEvent
	// LoadHistoryEvent loads a history event by token generated via [GenerateEventLoadToken].
	LoadHistoryEvent(ctx context.Context, token []byte) (*historypb.HistoryEvent, error)
	// GetCurrentVersion returns the current namespace failover version.
	GetCurrentVersion() int64
	// NextTransitionCount returns the current state transition count from the state transition history.
	NextTransitionCount() int64
}

NodeBackend is a concrete implementation to support interacting with the underlying platform. It currently has only a single implementation - workflow mutable state.

type PathAndOutputs

type PathAndOutputs struct {
	Path    []Key
	Outputs []TransitionOutputWithCount
}

type Ref

type Ref struct {
	WorkflowKey     definition.WorkflowKey
	StateMachineRef *persistencespb.StateMachineRef
	// If non-zero, this field represents the ID of the task this Ref came from. Used for stale task detection and
	// serves as an indicator whether this Ref can reference stale state. This should be set during task processing
	// where we can validate the task that embeds this reference against shard clock.
	TaskID int64
}

Ref is a reference to a statemachine on a specific workflow. It contains the workflow key and the key of the statemachine in the state machine Environment as well as the information to perform staleness checks for itself or the state that it is referencing.

func (Ref) StateMachinePath

func (r Ref) StateMachinePath() []Key

StateMachinePath gets the state machine path for from this reference.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maintains a mapping from state machine type to a StateMachineDefinition and task type to TaskSerializer. Registry methods are **not** protected by a lock and all registration is expected to happen in a single thread on startup for performance reasons.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new Registry.

func (*Registry) EventDefinition

func (r *Registry) EventDefinition(t enumspb.EventType) (def EventDefinition, ok bool)

EventDefinition returns an EventDefinition for a given type and a boolean indicating whether it was found.

func (*Registry) ExecuteImmediateTask added in v1.25.0

func (r *Registry) ExecuteImmediateTask(
	ctx context.Context,
	env Environment,
	ref Ref,
	task Task,
) error

ExecuteImmediateTask gets an ImmediateExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) ExecuteRemoteMethod added in v1.25.0

func (r *Registry) ExecuteRemoteMethod(
	ctx context.Context,
	env Environment,
	ref Ref,
	methodName string,
	serializedInput []byte,
) ([]byte, error)

ExecuteRemoteMethod gets an RemoteExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) ExecuteTimerTask added in v1.25.0

func (r *Registry) ExecuteTimerTask(
	env Environment,
	node *Node,
	task Task,
) error

ExecuteTimerTask gets a TimerExecutor from the registry and invokes it. Returns ErrNotRegistered if an executor is not registered for the given task's type.

func (*Registry) Machine

func (r *Registry) Machine(t string) (def StateMachineDefinition, ok bool)

Machine returns a StateMachineDefinition for a given type and a boolean indicating whether it was found.

func (*Registry) RegisterEventDefinition

func (r *Registry) RegisterEventDefinition(def EventDefinition) error

RegisterEventDefinition registers an EventDefinition for the given event type. Returns an ErrDuplicateRegistration if a definition for the type has already been registered.

func (*Registry) RegisterMachine

func (r *Registry) RegisterMachine(sm StateMachineDefinition) error

RegisterMachine registers a StateMachineDefinition by its type. Returns an ErrDuplicateRegistration if the state machine type has already been registered.

func (*Registry) RegisterTaskSerializer

func (r *Registry) RegisterTaskSerializer(t string, def TaskSerializer) error

RegisterTaskSerializer registers a TaskSerializer for a given type. Returns an ErrDuplicateRegistration if a serializer for this task type has already been registered.

func (*Registry) TaskSerializer

func (r *Registry) TaskSerializer(t string) (d TaskSerializer, ok bool)

TaskSerializer returns a TaskSerializer for a given type and a boolean indicating whether it was found.

type RemoteExecutor added in v1.25.0

type RemoteExecutor func(ctx context.Context, env Environment, ref Ref, input any) (any, error)

RemoteExecutor is responsible for executing remote methods. // Implementations should be registered via [RegisterRemoteExecutors] to handle specific methods.

type RemoteMethod added in v1.25.0

type RemoteMethod interface {
	// Name of the remote method. Must be unique per state machine.
	Name() string
	// SerializeOutput serializes output of the invocation to a byte array that is suitable for transport.
	SerializeOutput(output any) ([]byte, error)
	// DeserializeInput deserializes input from bytes that is then passed to the handler.
	DeserializeInput(data []byte) (any, error)
}

RemoteMethod can be defined for each state machine to handle external request, like RPCs, but as part of the HSM framework. See RemoteExecutor for how to define the handler for remote methods.

type StateMachine

type StateMachine[S comparable] interface {
	TaskRegenerator
	State() S
	SetState(S)
}

A StateMachine is anything that can get and set a comparable state S and re-generate tasks based on current state. It is meant to be used with Transition objects to safely transition their state on a given event.

type StateMachineDefinition

type StateMachineDefinition interface {
	Type() string
	// Serialize a state machine into bytes.
	Serialize(any) ([]byte, error)
	// Deserialize a state machine from bytes.
	Deserialize([]byte) (any, error)
	// CompareState compares two state objects. It should return 0 if the states are equal, a positive number if the
	// first state is considered newer, a negative number if the second state is considered newer.
	// TODO: Remove this method and implementations once transition history is fully implemented. For now, we have to
	// rely on each component to tell the framework which state is newer and if sync state can overwrite the states in
	// the standby cluster.
	CompareState(any, any) (int, error)
}

StateMachineDefinition provides type information and a serializer for a state machine.

type Task

type Task interface {
	// Task type that must be unique per task definition.
	Type() string
	// Kind of the task, see [TaskKind] for more info.
	Kind() TaskKind
	Concurrent() bool
}

A Task is generated by a state machine in order to drive execution. For example, a callback state machine in the SCHEDULED state, would generate an invocation task to be eventually executed by the framework. State machine transitions and tasks are committed atomically to ensure that the system is in a consistent state.

Tasks are generated by calling the [StateMachine.Tasks] method on a state machine after it has transitioned. Tasks are executed by an executor that is registered to handle a specific task type. The framework converts this minimal task representation into [tasks.Task] instances, filling in the state machine reference, workflow key, and task ID. A TaskSerializer need to be registered in a Registry for a given type in order to process tasks of that type.

Tasks must specify whether they can run concurrently with other tasks. A non-concurrent task is a task that correlates with a single machine transition and is considered stale if its corresponding machine has transitioned since it was generated. Non-concurrent tasks are persisted with a Ref that contains the machine transition count at the time they was generated, which is expected to match the current machine's transition count upon execution. Concurrent tasks skip this validation. If the task is concurrent, you must implement the ConcurrentTask interface below.

type TaskKind

type TaskKind interface {
	// contains filtered or unexported methods
}

TaskKind represents the possible set of kinds for a task. Each kind is mapped to a concrete [tasks.Task] implementation and is backed by specific protobuf message; for example, TaskKindTimer maps to TimerTaskInfo. Kind also determines which queue this task is scheduled on - it is mapped to a specific tasks.Category.

type TaskKindOutbound

type TaskKindOutbound struct {

	// The destination of this task, used to group tasks into a per namespace-and-destination scheduler.
	Destination string
	// contains filtered or unexported fields
}

TaskKindOutbound is a task that is scheduled on an outbound queue such as the callback queue.

type TaskKindTimer

type TaskKindTimer struct {

	// A deadline for firing this task.
	// This represents a lower bound and actual execution may get delayed if the system is overloaded or for various
	// other reasons.
	Deadline time.Time
	// contains filtered or unexported fields
}

TaskKindTimer is a task that is scheduled on the timer queue.

type TaskRegenerator

type TaskRegenerator interface {
	RegenerateTasks(*Node) ([]Task, error)
}

A TaskRegenerator is invoked to regenerate tasks post state-based replication or when refreshing all tasks for a workflow.

type TaskSerializer

type TaskSerializer interface {
	Serialize(Task) ([]byte, error)
	Deserialize(data []byte, kind TaskKind) (Task, error)
}

TaskSerializer provides type information and a serializer for a state machine.

type TimerExecutor added in v1.25.0

type TimerExecutor[T Task] func(env Environment, node *Node, task T) error

TimerExecutor is responsible for executing timer tasks. Implementations should be registered via [RegisterTimerExecutors] to handle specific task types. Timers tasks are collapsed into a single task which will execute all timers that have hit their deadline while holding a lock on the workflow.

type Transition

type Transition[S comparable, SM StateMachine[S], E any] struct {
	// Source states that are valid for this transition.
	Sources []S
	// Destination state to transition to.
	Destination S
	// contains filtered or unexported fields
}

Transition represents a state machine transition for a machine of type SM with state S and event E.

func NewTransition

func NewTransition[S comparable, SM StateMachine[S], E any](src []S, dst S, apply func(SM, E) (TransitionOutput, error)) Transition[S, SM, E]

NewTransition creates a new Transition from the given source states to a destination state for a given event. The apply function is called after verifying the transition is possible and setting the destination state.

func (Transition[S, SM, E]) Apply

func (t Transition[S, SM, E]) Apply(sm SM, event E) (TransitionOutput, error)

Apply applies a transition event to the given state machine changing the state machine's state to the transition's Destination on success.

func (Transition[S, SM, E]) Possible

func (t Transition[S, SM, E]) Possible(sm SM) bool

Possible returns a boolean indicating whether the transition is possible for the current state.

type TransitionOutput

type TransitionOutput struct {
	Tasks []Task
}

TransitionOutput is output produced for a single transition.

type TransitionOutputWithCount added in v1.25.0

type TransitionOutputWithCount struct {
	TransitionOutput
	TransitionCount int64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL