Documentation ¶
Overview ¶
Package rpc is a transparent RPC for state machines.
Index ¶
- Variables
- func ClockFromMsg(before am.Time, msg ClockMsg) am.Time
- func TrafficMeter(listener net.Listener, fwdTo string, counter chan<- int64, end <-chan struct{})
- type ArgsGet
- type ArgsLog
- type ArgsMut
- type ArgsPayload
- type Client
- func (c *Client) ConnectedState(e *am.Event)
- func (c *Client) ConnectingState(e *am.Event)
- func (c *Client) DisconnectedEnter(e *am.Event) bool
- func (c *Client) DisconnectedState(e *am.Event)
- func (c *Client) DisconnectingEnter(e *am.Event) bool
- func (c *Client) DisconnectingState(e *am.Event)
- func (c *Client) Get(ctx context.Context, name string) (*RespGet, error)
- func (c *Client) GetKind() Kind
- func (c *Client) HandshakeDoneState(e *am.Event)
- func (c *Client) HandshakingState(e *am.Event)
- func (c *Client) RemoteSendPayload(_ *rpc2.Client, file *ArgsPayload, _ *Empty) error
- func (c *Client) RemoteSetClock(_ *rpc2.Client, clock ClockMsg, _ *Empty) error
- func (c *Client) Start() am.Result
- func (c *Client) StartEnd(e *am.Event)
- func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result
- type ClockMsg
- type Empty
- type ExceptionHandler
- type GetterFunc
- type Kind
- type RespGet
- type RespHandshake
- type RespResult
- type RespSync
- type Server
- func (s *Server) GetKind() Kind
- func (s *Server) HandshakeDoneEnd(e *am.Event)
- func (s *Server) RemoteAdd(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteAddNS(_ *rpc2.Client, args *ArgsMut, _ *Empty) error
- func (s *Server) RemoteBye(_ *rpc2.Client, _ *Empty, _ *Empty) error
- func (s *Server) RemoteGet(_ *rpc2.Client, name string, resp *RespGet) error
- func (s *Server) RemoteHandshake(client *rpc2.Client, _ *Empty, resp *RespHandshake) error
- func (s *Server) RemoteHandshakeAck(client *rpc2.Client, done *bool, _ *Empty) error
- func (s *Server) RemoteRemove(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteSet(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteSync(_ *rpc2.Client, sum uint64, resp *RespSync) error
- func (s *Server) RpcReadyEnd(e *am.Event)
- func (s *Server) RpcReadyState(e *am.Event)
- func (s *Server) RpcStartingState(e *am.Event)
- func (s *Server) SendPayload(ctx context.Context, file *ArgsPayload) error
- func (s *Server) Start() am.Result
- func (s *Server) Stop(dispose bool) am.Result
- type Worker
- func (w *Worker) ActiveStates() am.S
- func (w *Worker) Add(states am.S, args am.A) am.Result
- func (w *Worker) Add1(state string, args am.A) am.Result
- func (w *Worker) Add1NS(state string, args am.A) am.Result
- func (w *Worker) AddErr(err error, args am.A) am.Result
- func (w *Worker) AddErrState(state string, err error, args am.A) am.Result
- func (w *Worker) AddNS(states am.S, args am.A) am.Result
- func (w *Worker) Any(states ...am.S) bool
- func (w *Worker) Any1(states ...string) bool
- func (w *Worker) Clock(states am.S) am.Clock
- func (w *Worker) Dispose()
- func (w *Worker) Err() error
- func (w *Worker) Export() *am.Serialized
- func (w *Worker) GetStruct() am.Struct
- func (w *Worker) Has(states am.S) bool
- func (w *Worker) Has1(state string) bool
- func (w *Worker) Index(state string) int
- func (w *Worker) Inspect(states am.S) string
- func (w *Worker) Is(states am.S) bool
- func (w *Worker) Is1(state string) bool
- func (w *Worker) IsClock(clock am.Clock) bool
- func (w *Worker) IsErr() bool
- func (w *Worker) IsTime(t am.Time, states am.S) bool
- func (w *Worker) Log(msg string, args ...any)
- func (w *Worker) MustParseStates(states am.S) am.S
- func (w *Worker) NewStateCtx(state string) context.Context
- func (w *Worker) Not(states am.S) bool
- func (w *Worker) Not1(state string) bool
- func (w *Worker) Remove(states am.S, args am.A) am.Result
- func (w *Worker) Remove1(state string, args am.A) am.Result
- func (w *Worker) Set(states am.S, args am.A) am.Result
- func (w *Worker) StateNames() am.S
- func (w *Worker) String() string
- func (w *Worker) StringAll() string
- func (w *Worker) Switch(states ...string) string
- func (w *Worker) Sync() am.Time
- func (w *Worker) Tick(state string) uint64
- func (w *Worker) Time(states am.S) am.Time
- func (w *Worker) TimeSum(states am.S) uint64
- func (w *Worker) When(states am.S, ctx context.Context) <-chan struct{}
- func (w *Worker) When1(state string, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenArgs(state string, args am.A, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenDisposed() <-chan struct{}
- func (w *Worker) WhenErr(ctx context.Context) <-chan struct{}
- func (w *Worker) WhenNot(states am.S, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenNot1(state string, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenTicksEq(state string, ticks uint64, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenTime(states am.S, times am.Time, ctx context.Context) <-chan struct{}
- type WorkerTracer
Constants ¶
This section is empty.
Variables ¶
Functions ¶
Types ¶
type ArgsPayload ¶
type Client ¶
type Client struct { *ExceptionHandler Mach *am.Machine // Worker is a remote Worker instance Worker *Worker Payloads map[string]*ArgsPayload CallCount uint64 LogEnabled bool CallTimeout time.Duration // contains filtered or unexported fields }
func (*Client) ConnectedState ¶
func (*Client) ConnectingState ¶
func (*Client) DisconnectedState ¶
func (*Client) DisconnectingState ¶
func (*Client) HandshakeDoneState ¶
func (*Client) HandshakingState ¶
func (*Client) RemoteSendPayload ¶
RemoteSendPayload receives a payload from the server. Only called by the server.
func (*Client) RemoteSetClock ¶
RemoteSetClock updates the client's clock. Only called by the server.
type ExceptionHandler ¶
type ExceptionHandler struct {
*am.ExceptionHandler
}
ExceptionHandler is a shared exception handler for RPC server and client.
func (*ExceptionHandler) ExceptionEnter ¶
func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool
func (*ExceptionHandler) ExceptionState ¶
func (h *ExceptionHandler) ExceptionState(e *am.Event)
type GetterFunc ¶
type RespHandshake ¶
type RespHandshake = am.Serialized
type RespResult ¶
type Server ¶
type Server struct { *ExceptionHandler Addr string Mach *am.Machine // ClockInterval is the interval for clock updates, effectively throttling // the number of updates sent to the client within the interval window. // 0 means pushes are disabled. Setting to a very small value will make // pushes instant. ClockInterval time.Duration // Listener can be set manually before starting the server. Listener net.Listener LogEnabled bool CallCount uint64 // contains filtered or unexported fields }
func (*Server) HandshakeDoneEnd ¶
func (*Server) RemoteAddNS ¶
func (*Server) RemoteHandshake ¶
func (*Server) RemoteHandshakeAck ¶
func (*Server) RemoteRemove ¶
func (*Server) RemoteSync ¶
func (*Server) RpcReadyEnd ¶
func (*Server) RpcReadyState ¶
RpcReadyState starts a ticker to compensate for clock push denounces.
func (*Server) RpcStartingState ¶
func (*Server) SendPayload ¶
func (s *Server) SendPayload(ctx context.Context, file *ArgsPayload) error
SendPayload sends a payload to the client.
type Worker ¶
type Worker struct { ID string Ctx context.Context // TODO remote push Disposed atomic.Bool // contains filtered or unexported fields }
Worker is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally.
func (*Worker) ActiveStates ¶
ActiveStates returns a copy of the currently active states.
func (*Worker) Add ¶
Add activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) AddErr ¶
AddErr is a dedicated method to add the Exception state with the passed error and optional arguments. Like every mutation method, it will resolve relations and trigger handlers. AddErr produces a stack trace of the error, if LogStackTrace is enabled.
func (*Worker) AddErrState ¶
AddErrState adds a dedicated error state, along with the build in Exception state. Like every mutation method, it will resolve relations and trigger handlers. AddErrState produces a stack trace of the error, if LogStackTrace is enabled.
func (*Worker) AddNS ¶
AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.
func (*Worker) Any ¶
Any is group call to Is, returns true if any of the params return true from Is.
machine.StringAll() // ()[Foo:0 Bar:0 Baz:0] machine.Add(S{"Foo"}) // is(Foo, Bar) or is(Bar) machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false // is(Foo) or is(Bar) machine.Any(S{"Foo"}, S{"Bar"}) // true
func (*Worker) Any1 ¶
Any1 is group call to Is1(), returns true if any of the params return true from Is1().
func (*Worker) Clock ¶
Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.
func (*Worker) Dispose ¶
func (w *Worker) Dispose()
Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.
func (*Worker) Export ¶
func (w *Worker) Export() *am.Serialized
Export exports the machine state: ID, time and state names.
func (*Worker) Has1 ¶
Has1 is a shorthand for Has. It returns true if the passed state is registered in the machine.
func (*Worker) Inspect ¶
Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.
func (*Worker) Is ¶
Is checks if all the passed states are currently active.
machine.StringAll() // ()[Foo:0 Bar:0 Baz:0] machine.Add(S{"Foo"}) machine.Is(S{"Foo"}) // true machine.Is(S{"Foo", "Bar"}) // false
func (*Worker) Is1 ¶
Is1 is a shorthand method to check if a single state is currently active. See Is().
func (*Worker) IsClock ¶
IsClock checks if the machine has changed since the passed clock. Returns true if at least one state has changed.
func (*Worker) IsTime ¶
IsTime checks if the machine has changed since the passed time (list of ticks). Returns true if at least one state has changed. The states param is optional and can be used to check only a subset of states.
func (*Worker) Log ¶
Log logs an [extern] message unless LogNothing is set (default). Optionally redirects to a custom logger from SetLogger.
func (*Worker) MustParseStates ¶
MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.
func (*Worker) NewStateCtx ¶
NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.
Context cancels when the state has been de-activated, or right away, if it isn't currently active.
State contexts are used to check state expirations and should be checked often inside goroutines. TODO reuse existing ctxs
func (*Worker) Not ¶
Not checks if **none** of the passed states are currently active.
machine.StringAll() // -> ()[A:0 B:0 C:0 D:0] machine.Add(S{"A", "B"}) // not(A) and not(C) machine.Not(S{"A", "C"}) // -> false // not(C) and not(D) machine.Not(S{"C", "D"}) // -> true
func (*Worker) Not1 ¶
Not1 is a shorthand method to check if a single state is currently inactive. See Not().
func (*Worker) Remove ¶
Remove de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) Remove1 ¶
Remove1 is a shorthand method to remove a single state with the passed args. See Remove().
func (*Worker) Set ¶
Set de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) StateNames ¶
StateNames returns a copy of all the state names.
func (*Worker) String ¶
String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)
func (*Worker) StringAll ¶
StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]
func (*Worker) Switch ¶
Switch returns the first state from the passed list that is currently active, making it useful for switch statements.
switch mach.Switch(ss.GroupPlaying...) { case "Playing": case "Paused": case "Stopped": }
func (*Worker) Sync ¶
Sync requests fresh clock values from the remote machine. Useful to call after a batch of no-sync methods, eg AddNS.
func (*Worker) Time ¶
Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.
func (*Worker) TimeSum ¶
TimeSum returns the sum of machine's time (ticks per state). Returned value includes the specified states, or all the states if nil. It's a very inaccurate, yet simple way to measure the machine's time. TODO handle overflow
func (*Worker) When ¶
When returns a channel that will be closed when all the passed states become active or the machine gets disposed.
ctx: optional context that will close the channel when done. Useful when listening on 2 When() channels within the same `select` to GC the 2nd one. TODO re-use channels with the same state set and context
func (*Worker) WhenArgs ¶
WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.
func (*Worker) WhenDisposed ¶
func (w *Worker) WhenDisposed() <-chan struct{}
WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.
func (*Worker) WhenErr ¶
WhenErr returns a channel that will be closed when the machine is in the Exception state.
ctx: optional context defaults to the machine's context.
func (*Worker) WhenNot ¶
WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.
ctx: optional context that will close the channel when done. Useful when listening on 2 WhenNot() channels within the same `select` to GC the 2nd one.
func (*Worker) WhenTicks ¶
WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.
func (*Worker) WhenTicksEq ¶
WhenTicksEq waits till ticks for a single state equal the given absolute value (or more). Uses WhenTime underneath.
type WorkerTracer ¶
type WorkerTracer struct { *am.NoOpTracer // contains filtered or unexported fields }
func (*WorkerTracer) TransitionEnd ¶
func (t *WorkerTracer) TransitionEnd(_ *am.Transition)