helpers

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: MIT Imports: 15 Imported by: 0

README

/pkg/helpers

cd /

[!NOTE] Asyncmachine-go is an AOP Actor Model library for distributed workflows, built on top of a clock-based state machine. It has atomic transitions, subscriptions, RPC, logging, TUI debugger, metrics, tracing, and soon diagrams.

/pkg/helpers is the swiss army knife for async state machines. It encapsulates many low-level details of /pkg/machine and provide easy to reason about functions. It specifically deals with machine time and multi states, which can be quite verbose.

Highlights:

Import

import amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
// for tests
import amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers/testing"

Synchronous Calls

Synchronous wrappers of async state machine calls, which assume a single, blocking scenario controlled with context. Multi states are handled automatically.

  • Add1Block
  • Add1AsyncBlock

Example - add state StateNameSelected and wait until it becomes active

res := amhelp.Add1Block(ctx, mach, ss.StateNameSelected, am.A{"state": state})
print(mach.Is1(ss.StateNameSelected)) // true
print(res) // am.Executed or am.Canceled, never am.Queued

Example - wait for ScrollToTx, triggered by ScrollToMutTx

res := amhelp.Add1AsyncBlock(ctx, mach, ss.ScrollToTx, ss.ScrollToMutTx, am.A{
    "state": state,
    "fwd":   true,
})

print(mach.Is1(ss.ScrollToTx)) // true
print(res) // am.Executed or am.Canceled, never am.Queued

Debugging Helpers

Activate debugging in 1 line - either for a single machine, or for the whole process.

  • EnableDebugging
  • SetLogLevel
  • MachDebugEnv
  • MachDebug
  • Healthcheck
  • IsDebug

Example - enable dbg telemetry using conventional env vars.

// set env vars
amhelp.EnableDebugging(true)
// debug
amhelp.MachDebugEnv(mach)

Waiting Helpers

Waiting helpers mostly provide wrappers for when methods in select statements with a state context, and timeout context. State context is checked before, and timeout transformed into am.ErrTimeout. It reduces state context checking, if used as a first call after a blocking call / start (which is a common case).

  • WaitForAll
  • WaitForErrAll
  • WaitForAny
  • WaitForErrAny
  • GroupWhen1
  • Wait
  • ExecAndClose

Example - wait for bootstrap RPC to become ready.

//
err := amhelp.WaitForAll(ctx, s.ConnTimeout,
    boot.server.Mach.When1(ssrpc.ServerStates.RpcReady, nil))
if ctx.Err() != nil {
    return // expired
}
if err != nil {
    AddErrWorker(s.Mach, err, Pass(argsOut))
    return
}

Example - wait for all clients to be ready.

var clients []*node.Client

amhelp.WaitForAll(t, ctx, 2*time.Second,
    amhelp.GroupWhen1(clients, ssC.Ready, nil)...)

Example - wait 1s with a state context.

if !amhelp.Wait(ctx, 1*time.Second) {
    return // expired
}
// wait ok

Example - wait for Foo, Bar, or Exception.

ctx := client.Mach.NewStateCtx("Start")
err := amhelp.WaitForErrAll(ctx, 1*time.Second, mach,
        mach.When1("Foo", nil),
        mach.When1("Bar", nil))
if ctx.Err() != nil {
    // no err
    return nil // expired
}
if err != nil {
    // either timeout happened or Exception has been activated
    return err
}

Failsafe Request Object

Failsafe mutation requests use failsafe-go policies when mutating a state machine. Useful for network communication, or when waiting on a state isn't an option, but also helps with overloaded machines and populated queues. Policies can be customized and MutRequest objects cloned.

  • NewReqAdd
  • NewReqAdd1
  • NewReqRemove
  • NewReqRemove1
  • NewMutRequest

Example - try to add state WorkerRequested with 10 retries, 100ms delay, 5s backoff, and 5s max duration.

// failsafe worker request
_, err := amhelp.NewReqAdd1(c.Mach, ssC.WorkerRequested, nil).Run(ctx)
if err != nil {
    return err
}

Test Helpers

Test helpers mostly provide assertions for stretchr/testify, but also wrappers for regular helpers which go t.Fatal on errors (to save some lines).

  • AssertIs
  • AssertIs1
  • AssertNot
  • AssertNot1
  • AssertNoErrNow
  • AssertNoErrEver
  • AssertErr
  • MachDebug
  • MachDebugEnv
  • Wait
  • WaitForAll
  • WaitForAny
  • WaitForErrAll
  • WaitForErrAny
  • GroupWhen1

Example - assert mach has ClientConnected active.

amhelpt.AssertIs1(t, mach, "ClientConnected")

Example - assert mach never had an error (not just currently).

amhelpt.AssertNoErrEver(t, mach)

Miscellaneous Utils

  • Implements
  • Activations
  • IsMulti
  • RemoveMulti
  • Toggle
  • StatesToIndexes
  • IndexesToStates
  • GetTransitionStates
  • ArgsToLogMap
  • ArgsToArgs

Documentation

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package helpers is a set of useful functions when working with async state machines.

Index

Constants

View Source
const (
	// EnvAmHealthcheck enables a healthcheck ticker for every debugged machine.
	EnvAmHealthcheck = "AM_HEALTHCHECK"
	// EnvAmTestRunner indicates the main test tunner, disables any telemetry.
	EnvAmTestRunner = "AM_TEST_RUNNER"
)

Variables

This section is empty.

Functions

func Activations added in v0.8.0

func Activations(u uint64) int

Activations returns the number of state activations from an amount of ticks passed.

func Add1AsyncBlock

func Add1AsyncBlock(
	ctx context.Context, mach am.Api, waitState string,
	addState string, args am.A,
) am.Result

Add1AsyncBlock adds a state from an async op and waits for another one from the op to become active. Theoretically, it should work with any state pair, including Multi states (assuming they remove themselves).

func Add1Block

func Add1Block(
	ctx context.Context, mach am.Api, state string, args am.A,
) am.Result

Add1Block activates a state and waits until it becomes active. If it's a multi state, it also waits for it te de-activate. Returns early if a non-multi state is already active. Useful to avoid the queue.

func Add1BlockCh added in v0.8.0

func Add1BlockCh(
	ctx context.Context, mach am.Api, state string, args am.A,
) <-chan struct{}

Add1BlockCh is like Add1Block, but returns a channel to compose with other "when" methods.

func ArgsToArgs added in v0.8.0

func ArgsToArgs[T any](src interface{}, dest T) T

ArgsToArgs converts [A] (arguments) into an overlapping [A]. Useful for removing fields which can't be passed over RPC, and back. Both params should be pointers to struct and share at least one field.

func ArgsToLogMap added in v0.8.0

func ArgsToLogMap(s interface{}) map[string]string

ArgsToLogMap converts an [A] (arguments) struct to a map of strings using `log` tags as keys, and their cased string values.

func EnableDebugging added in v0.8.0

func EnableDebugging(stdout bool)

EnableDebugging sets env vars for debugging tested machines with am-dbg on port 6831.

func ExecAndClose added in v0.8.0

func ExecAndClose(fn func()) <-chan struct{}

ExecAndClose closes the chan when the function ends.

func FanOutIn added in v0.8.1

func FanOutIn(
	mach *am.Machine, name string, total, concurrency int,
	fn FanFn,
) (any, error)

FanOutIn creates [total] numer of state pairs of "Name1" and "Name1Done", as well as init and merge states ("Name", "NameDone"). [name] is treated as a namespace and can't have other states within. Retry can be achieved by adding the init state repetively. FanOutIn can be chained, but it should be called before any mutations or telemetry (as it changes the state struct). The returned handlers struct can be used to adjust concurrency level.

func GetTransitionStates added in v0.8.0

func GetTransitionStates(
	tx *am.Transition, index am.S,
) (added am.S, removed am.S, touched am.S)

GetTransitionStates will extract added, removed, and touched states from transition's clock values and steps. Requires a state names index.

func GroupWhen1 added in v0.8.0

func GroupWhen1(
	machs []am.Api, state string, ctx context.Context,
) ([]<-chan struct{}, error)

GroupWhen1 will create wait channels for the same state in a group of machines, or return a am.ErrStateMissing.

func Healthcheck added in v0.8.0

func Healthcheck(mach am.Api)

Healthcheck adds a state to a machine every 5 seconds, until the context is done. This makes sure all the logs are pushed to the telemetry server.

func Implements added in v0.8.0

func Implements(statesChecked, statesNeeded am.S) error

Implements checks is statesChecked implement statesNeeded. It's an equivalent of Machine.Has(), but for slices of state names, and with better error msgs.

func IndexesToStates

func IndexesToStates(allStates am.S, indexes []int) am.S

IndexesToStates converts a list of state indexes to a list of state names, for a given machine.

func Interval added in v0.8.0

func Interval(
	ctx context.Context, length time.Duration, interval time.Duration,
	fn func() bool,
) error

Interval runs a function at a given interval, for a given duration, or until the context is done. Returns nil if the duration has passed, or err is ctx is done. The function should return false to stop the interval.

func IsDebug added in v0.8.0

func IsDebug() bool

IsDebug returns true if the process is in simple debug mode.

func IsMulti

func IsMulti(mach am.Api, state string) bool

IsMulti returns true if a state is a multi state.

func IsTelemetry added in v0.8.0

func IsTelemetry() bool

IsTelemetry returns true if the process is in telemetry debug mode.

func IsTestRunner added in v0.8.0

func IsTestRunner() bool

func MachDebug

func MachDebug(
	mach am.Api, amDbgAddr string, logLvl am.LogLevel, stdout bool,
)

MachDebug sets up a machine for debugging, based on the AM_DEBUG env var, passed am-dbg address, log level and stdout flag.

func MachDebugEnv

func MachDebugEnv(mach am.Api)

MachDebugEnv sets up a machine for debugging, based on env vars only: AM_DBG_ADDR, AM_LOG, and AM_DEBUG. This function should be called right after the machine is created, to catch all the log entries.

func RemoveMulti added in v0.8.0

func RemoveMulti(mach am.Api, state string) am.HandlerFinal

RemoveMulti creates a final handler which removes a multi state from a machine. Useful to avoid FooState-Remove1-Foo repetition.

func SetLogLevel added in v0.8.0

func SetLogLevel(level am.LogLevel)

SetLogLevel sets AM_LOG env var to the passed log level. It will affect all future state machines using MachDebugEnv.

func StatesToIndexes

func StatesToIndexes(allStates am.S, states am.S) []int

StatesToIndexes converts a list of state names to a list of state indexes, for a given machine.

func Toggle added in v0.8.0

func Toggle(mach am.Api, state string, args am.A) am.Result

Toggle adds or removes a state based on its current state.

func Wait added in v0.8.0

func Wait(ctx context.Context, length time.Duration) bool

Wait waits for a duration, or until the context is done. Returns nil if the duration has passed, or err is ctx is done.

func WaitForAll added in v0.8.0

func WaitForAll(
	ctx context.Context, timeout time.Duration, chans ...<-chan struct{},
) error

WaitForAll waits for a list of channels to close, or until the context is done, or until the timeout is reached. Returns nil if all channels are closed, or ErrTimeout, or ctx.Err().

It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.

func WaitForAny added in v0.8.0

func WaitForAny(
	ctx context.Context, timeout time.Duration, chans ...<-chan struct{},
) error

WaitForAny waits for any of the channels to close, or until the context is done, or until the timeout is reached. Returns nil if any channel is closed, or ErrTimeout, or ctx.Err().

It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.

This function uses reflection to wait for multiple channels at once.

func WaitForErrAll added in v0.8.0

func WaitForErrAll(
	ctx context.Context, timeout time.Duration, mach *am.Machine,
	chans ...<-chan struct{},
) error

WaitForErrAll is like WaitForAll, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.Exception instead.

func WaitForErrAny added in v0.8.0

func WaitForErrAny(
	ctx context.Context, timeout time.Duration, mach *am.Machine,
	chans ...<-chan struct{},
) error

WaitForErrAny is like WaitForAny, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.Exception instead.

Types

type FanFn added in v0.8.1

type FanFn func(num int, state, stateDone string)

type FanHandlers added in v0.8.1

type FanHandlers struct {
	Concurrency int
	AnyState    am.HandlerFinal
	GroupTasks  am.S
	GroupDone   am.S
}

type MutRequest added in v0.8.0

type MutRequest struct {
	Mach    am.Api
	MutType am.MutationType
	States  am.S
	Args    am.A

	// PolicyRetries is the max number of retries.
	PolicyRetries int
	// PolicyDelay is the delay before the first retry, then doubles.
	PolicyDelay time.Duration
	// PolicyBackoff is the max time to wait between retries.
	PolicyBackoff time.Duration
	// PolicyMaxDuration is the max time to wait for the mutation to be accepted.
	PolicyMaxDuration time.Duration
}

MutRequest is a failsafe request for a machine mutation. It supports retries, backoff, max duration, delay, and timeout policies. It will try to mutate the machine until the context is done, or the max duration is reached. Queued mutations are considered supported a success.

func NewMutRequest added in v0.8.0

func NewMutRequest(
	mach am.Api, mutType am.MutationType, states am.S, args am.A,
) *MutRequest

NewMutRequest creates a new MutRequest with defaults - 10 retries, 100ms delay, 5s backoff, and 5s max duration.

func NewReqAdd added in v0.8.0

func NewReqAdd(mach am.Api, states am.S, args am.A) *MutRequest

NewReqAdd creates a new failsafe request to add states to a machine. See MutRequest and NewMutRequest for more info.

func NewReqAdd1 added in v0.8.0

func NewReqAdd1(mach am.Api, state string, args am.A) *MutRequest

NewReqAdd1 creates a new failsafe request to add a single state to a machine. See MutRequest and NewMutRequest for more info.

func NewReqRemove added in v0.8.0

func NewReqRemove(mach am.Api, states am.S, args am.A) *MutRequest

NewReqRemove creates a new failsafe request to remove states from a machine. See MutRequest and NewMutRequest for more info.

func NewReqRemove1 added in v0.8.0

func NewReqRemove1(mach am.Api, state string, args am.A) *MutRequest

NewReqRemove1 creates a new failsafe request to remove a single state from a machine. See MutRequest and NewMutRequest for more info.

func (*MutRequest) Backoff added in v0.8.0

func (r *MutRequest) Backoff(backoff time.Duration) *MutRequest

func (*MutRequest) Clone added in v0.8.0

func (r *MutRequest) Clone(
	mach am.Api, mutType am.MutationType, states am.S, args am.A,
) *MutRequest

func (*MutRequest) Delay added in v0.8.0

func (r *MutRequest) Delay(delay time.Duration) *MutRequest

func (*MutRequest) MaxDuration added in v0.8.0

func (r *MutRequest) MaxDuration(maxDuration time.Duration) *MutRequest

func (*MutRequest) Retries added in v0.8.0

func (r *MutRequest) Retries(retries int) *MutRequest

func (*MutRequest) Run added in v0.8.0

func (r *MutRequest) Run(ctx context.Context) (am.Result, error)

Directories

Path Synopsis
Package testing provides testing helpers for state machines using testify.
Package testing provides testing helpers for state machines using testify.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL