Documentation ¶
Overview ¶
Package helpers is a set of useful functions when working with async state machines.
Index ¶
- Constants
- func Activations(u uint64) int
- func Add1AsyncBlock(ctx context.Context, mach am.Api, waitState string, addState string, args am.A) am.Result
- func Add1Block(ctx context.Context, mach am.Api, state string, args am.A) am.Result
- func Add1BlockCh(ctx context.Context, mach am.Api, state string, args am.A) <-chan struct{}
- func ArgsToArgs[T any](src interface{}, dest T) T
- func ArgsToLogMap(s interface{}) map[string]string
- func EnableDebugging(stdout bool)
- func ExecAndClose(fn func()) <-chan struct{}
- func FanOutIn(mach *am.Machine, name string, total, concurrency int, fn FanFn) (any, error)
- func GetTransitionStates(tx *am.Transition, index am.S) (added am.S, removed am.S, touched am.S)
- func GroupWhen1(machs []am.Api, state string, ctx context.Context) ([]<-chan struct{}, error)
- func Healthcheck(mach am.Api)
- func Implements(statesChecked, statesNeeded am.S) error
- func IndexesToStates(allStates am.S, indexes []int) am.S
- func Interval(ctx context.Context, length time.Duration, interval time.Duration, ...) error
- func IsDebug() bool
- func IsMulti(mach am.Api, state string) bool
- func IsTelemetry() bool
- func IsTestRunner() bool
- func MachDebug(mach am.Api, amDbgAddr string, logLvl am.LogLevel, stdout bool)
- func MachDebugEnv(mach am.Api)
- func RemoveMulti(mach am.Api, state string) am.HandlerFinal
- func SetLogLevel(level am.LogLevel)
- func StatesToIndexes(allStates am.S, states am.S) []int
- func Toggle(mach am.Api, state string, args am.A) am.Result
- func Wait(ctx context.Context, length time.Duration) bool
- func WaitForAll(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
- func WaitForAny(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
- func WaitForErrAll(ctx context.Context, timeout time.Duration, mach *am.Machine, ...) error
- func WaitForErrAny(ctx context.Context, timeout time.Duration, mach *am.Machine, ...) error
- type FanFn
- type FanHandlers
- type MutRequest
- func NewMutRequest(mach am.Api, mutType am.MutationType, states am.S, args am.A) *MutRequest
- func NewReqAdd(mach am.Api, states am.S, args am.A) *MutRequest
- func NewReqAdd1(mach am.Api, state string, args am.A) *MutRequest
- func NewReqRemove(mach am.Api, states am.S, args am.A) *MutRequest
- func NewReqRemove1(mach am.Api, state string, args am.A) *MutRequest
- func (r *MutRequest) Backoff(backoff time.Duration) *MutRequest
- func (r *MutRequest) Clone(mach am.Api, mutType am.MutationType, states am.S, args am.A) *MutRequest
- func (r *MutRequest) Delay(delay time.Duration) *MutRequest
- func (r *MutRequest) MaxDuration(maxDuration time.Duration) *MutRequest
- func (r *MutRequest) Retries(retries int) *MutRequest
- func (r *MutRequest) Run(ctx context.Context) (am.Result, error)
Constants ¶
const ( // EnvAmHealthcheck enables a healthcheck ticker for every debugged machine. EnvAmHealthcheck = "AM_HEALTHCHECK" // EnvAmTestRunner indicates the main test tunner, disables any telemetry. EnvAmTestRunner = "AM_TEST_RUNNER" )
Variables ¶
This section is empty.
Functions ¶
func Activations ¶ added in v0.8.0
Activations returns the number of state activations from an amount of ticks passed.
func Add1AsyncBlock ¶
func Add1AsyncBlock( ctx context.Context, mach am.Api, waitState string, addState string, args am.A, ) am.Result
Add1AsyncBlock adds a state from an async op and waits for another one from the op to become active. Theoretically, it should work with any state pair, including Multi states (assuming they remove themselves).
func Add1Block ¶
Add1Block activates a state and waits until it becomes active. If it's a multi state, it also waits for it te de-activate. Returns early if a non-multi state is already active. Useful to avoid the queue.
func Add1BlockCh ¶ added in v0.8.0
Add1BlockCh is like Add1Block, but returns a channel to compose with other "when" methods.
func ArgsToArgs ¶ added in v0.8.0
func ArgsToArgs[T any](src interface{}, dest T) T
ArgsToArgs converts [A] (arguments) into an overlapping [A]. Useful for removing fields which can't be passed over RPC, and back. Both params should be pointers to struct and share at least one field.
func ArgsToLogMap ¶ added in v0.8.0
ArgsToLogMap converts an [A] (arguments) struct to a map of strings using `log` tags as keys, and their cased string values.
func EnableDebugging ¶ added in v0.8.0
func EnableDebugging(stdout bool)
EnableDebugging sets env vars for debugging tested machines with am-dbg on port 6831.
func ExecAndClose ¶ added in v0.8.0
func ExecAndClose(fn func()) <-chan struct{}
ExecAndClose closes the chan when the function ends.
func FanOutIn ¶ added in v0.8.1
FanOutIn creates [total] numer of state pairs of "Name1" and "Name1Done", as well as init and merge states ("Name", "NameDone"). [name] is treated as a namespace and can't have other states within. Retry can be achieved by adding the init state repetively. FanOutIn can be chained, but it should be called before any mutations or telemetry (as it changes the state struct). The returned handlers struct can be used to adjust concurrency level.
func GetTransitionStates ¶ added in v0.8.0
GetTransitionStates will extract added, removed, and touched states from transition's clock values and steps. Requires a state names index.
func GroupWhen1 ¶ added in v0.8.0
GroupWhen1 will create wait channels for the same state in a group of machines, or return a am.ErrStateMissing.
func Healthcheck ¶ added in v0.8.0
Healthcheck adds a state to a machine every 5 seconds, until the context is done. This makes sure all the logs are pushed to the telemetry server.
func Implements ¶ added in v0.8.0
Implements checks is statesChecked implement statesNeeded. It's an equivalent of Machine.Has(), but for slices of state names, and with better error msgs.
func IndexesToStates ¶
IndexesToStates converts a list of state indexes to a list of state names, for a given machine.
func Interval ¶ added in v0.8.0
func Interval( ctx context.Context, length time.Duration, interval time.Duration, fn func() bool, ) error
Interval runs a function at a given interval, for a given duration, or until the context is done. Returns nil if the duration has passed, or err is ctx is done. The function should return false to stop the interval.
func IsDebug ¶ added in v0.8.0
func IsDebug() bool
IsDebug returns true if the process is in simple debug mode.
func IsTelemetry ¶ added in v0.8.0
func IsTelemetry() bool
IsTelemetry returns true if the process is in telemetry debug mode.
func IsTestRunner ¶ added in v0.8.0
func IsTestRunner() bool
func MachDebug ¶
MachDebug sets up a machine for debugging, based on the AM_DEBUG env var, passed am-dbg address, log level and stdout flag.
func MachDebugEnv ¶
MachDebugEnv sets up a machine for debugging, based on env vars only: AM_DBG_ADDR, AM_LOG, and AM_DEBUG. This function should be called right after the machine is created, to catch all the log entries.
func RemoveMulti ¶ added in v0.8.0
func RemoveMulti(mach am.Api, state string) am.HandlerFinal
RemoveMulti creates a final handler which removes a multi state from a machine. Useful to avoid FooState-Remove1-Foo repetition.
func SetLogLevel ¶ added in v0.8.0
SetLogLevel sets AM_LOG env var to the passed log level. It will affect all future state machines using MachDebugEnv.
func StatesToIndexes ¶
StatesToIndexes converts a list of state names to a list of state indexes, for a given machine.
func Wait ¶ added in v0.8.0
Wait waits for a duration, or until the context is done. Returns nil if the duration has passed, or err is ctx is done.
func WaitForAll ¶ added in v0.8.0
WaitForAll waits for a list of channels to close, or until the context is done, or until the timeout is reached. Returns nil if all channels are closed, or ErrTimeout, or ctx.Err().
It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.
func WaitForAny ¶ added in v0.8.0
WaitForAny waits for any of the channels to close, or until the context is done, or until the timeout is reached. Returns nil if any channel is closed, or ErrTimeout, or ctx.Err().
It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.
This function uses reflection to wait for multiple channels at once.
func WaitForErrAll ¶ added in v0.8.0
func WaitForErrAll( ctx context.Context, timeout time.Duration, mach *am.Machine, chans ...<-chan struct{}, ) error
WaitForErrAll is like WaitForAll, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.Exception instead.
func WaitForErrAny ¶ added in v0.8.0
func WaitForErrAny( ctx context.Context, timeout time.Duration, mach *am.Machine, chans ...<-chan struct{}, ) error
WaitForErrAny is like WaitForAny, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.Exception instead.
Types ¶
type FanHandlers ¶ added in v0.8.1
type MutRequest ¶ added in v0.8.0
type MutRequest struct { Mach am.Api MutType am.MutationType States am.S Args am.A // PolicyRetries is the max number of retries. PolicyRetries int // PolicyDelay is the delay before the first retry, then doubles. PolicyDelay time.Duration // PolicyBackoff is the max time to wait between retries. PolicyBackoff time.Duration // PolicyMaxDuration is the max time to wait for the mutation to be accepted. PolicyMaxDuration time.Duration }
MutRequest is a failsafe request for a machine mutation. It supports retries, backoff, max duration, delay, and timeout policies. It will try to mutate the machine until the context is done, or the max duration is reached. Queued mutations are considered supported a success.
func NewMutRequest ¶ added in v0.8.0
func NewMutRequest( mach am.Api, mutType am.MutationType, states am.S, args am.A, ) *MutRequest
NewMutRequest creates a new MutRequest with defaults - 10 retries, 100ms delay, 5s backoff, and 5s max duration.
func NewReqAdd ¶ added in v0.8.0
NewReqAdd creates a new failsafe request to add states to a machine. See MutRequest and NewMutRequest for more info.
func NewReqAdd1 ¶ added in v0.8.0
NewReqAdd1 creates a new failsafe request to add a single state to a machine. See MutRequest and NewMutRequest for more info.
func NewReqRemove ¶ added in v0.8.0
NewReqRemove creates a new failsafe request to remove states from a machine. See MutRequest and NewMutRequest for more info.
func NewReqRemove1 ¶ added in v0.8.0
NewReqRemove1 creates a new failsafe request to remove a single state from a machine. See MutRequest and NewMutRequest for more info.
func (*MutRequest) Backoff ¶ added in v0.8.0
func (r *MutRequest) Backoff(backoff time.Duration) *MutRequest
func (*MutRequest) Clone ¶ added in v0.8.0
func (r *MutRequest) Clone( mach am.Api, mutType am.MutationType, states am.S, args am.A, ) *MutRequest
func (*MutRequest) Delay ¶ added in v0.8.0
func (r *MutRequest) Delay(delay time.Duration) *MutRequest
func (*MutRequest) MaxDuration ¶ added in v0.8.0
func (r *MutRequest) MaxDuration(maxDuration time.Duration) *MutRequest
func (*MutRequest) Retries ¶ added in v0.8.0
func (r *MutRequest) Retries(retries int) *MutRequest