rpc

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 24 Imported by: 0

README

go report coverage go reference last commit release matrix chat

/pkg/rpc

cd /

[!NOTE] Asyncmachine-go is an AOP Actor Model library for distributed workflows, built on top of a lightweight state machine (nondeterministic, multi-state, clock-based, relational, optionally-accepting, and non-blocking). It has atomic transitions, RPC, logging, TUI debugger, metrics, tracing, and soon diagrams.

aRPC is a transparent RPC for state machines implemented using asyncmachine-go. It's clock-based and features many optimizations, e.g. having most of the API methods executed locally (as state changes are regularly pushed to the client). It's built on top of cenkalti/rpc2, net/rpc, and soheilhy/cmux. Check out a dedicated example, gRPC benchmark, and integration tests tutorial.

Features

  • mutation methods
  • wait methods
  • clock pushes (from worker-side mutations)
  • remote contexts
  • multiplexing
  • reconnect / fail-safety
  • worker sending payloads to the client
  • initial optimizations

Not implemented (yet):

  • WhenArgs, Err()
  • PushAllTicks
  • chunked payloads
  • TLS
  • compression
  • msgpack encoding

Each server can handle 1 client at a time, but 1 worker can have many servers attached to itself (via Tracer API). Additionally, remote workers can also have servers attached to themselves, creating a tree structure (see /examples/benchmark_state_source).

flowchart TB

    subgraph s [Server Host]
        direction TB
        Server[aRPC Server]
        Worker[Worker Mach]
    end
    subgraph c [Client Host]
        direction TB
        Consumer[Consumer]
        Client[aRPC Client]
        RemoteWorker[Remote Worker Mach]
    end

    Client -- WorkerPayload state --> Consumer
    Client --> RemoteWorker
    Worker --> Server
    Client -- Mutations --> Server
    Server -. Clock Updates .-> Client

Components

Worker

Any state machine can be exposed as an RPC worker, as long as it implements /pkg/rpc/states/WorkerStructDef. This can be done either manually, or by using state helpers (StructMerge, SAdd), or by generating a states file with am-gen. It's also required to have the states verified by Machine.VerifyStates. Worker can send data to the client via the SendPayload state.

import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// inherit from RPC worker
ssStruct := am.StructMerge(ssrpc.WorkerStruct, am.Struct{
    "Foo": {Require: am.S{"Bar"}},
    "Bar": {},
})
ssNames := am.SAdd(ssrpc.WorkerStates.Names(), am.S{"Foo", "Bar"})

// init
worker := am.New(ctx, ssStruct, nil)
worker.VerifyStates(ssNames)

// ...

// send data to the client
worker.Add1(ssrpc.WorkerStates.SendPayload, arpc.Pass(&arpc.A{
    Name: "mypayload",
    Payload: &arpc.ArgsPayload{
        Name: "mypayload",
        Source: "worker1",
        Data: []byte{1,2,3},
    },
}))
Server

Each RPC server can handle 1 client at a time. Both client and server need the same worker states definition (structure map and ordered list of states). After the initial handshake, server will be pushing local state changes every PushInterval, while state changes made by an RPC client are delivered synchronously. Server starts listening on either Addr, Listener, or Conn. Basic ACL is possible via AllowId.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var addr string
var worker *am.Machine

// init
s, err := arpc.NewServer(ctx, addr, worker.ID, worker, nil)
if err != nil {
    panic(err)
}

// start
s.Start()
err = amhelp.WaitForAll(ctx, 2*time.Second,
    s.Mach.When1(ssrpc.ServerStates.RpcReady, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

// react to the client
<-worker.When1("Foo", nil)
print("Client added Foo")
worker.Add1("Bar", nil)
Client

Each RPC client can connect to 1 server and needs to know worker's states structure and order. Data send by a worker via SendPayload will be received by a Consumer machine (passed via ClientOpts.Consumer) as an Add mutation of the WorkerPayload state (see a detailed diagram). Client supports fail-safety for both connection (eg ConnRetries, ConnRetryBackoff) and calls (eg CallRetries, CallRetryBackoff).

After the client's Ready state becomes active, it exposes a remote worker at client.Worker. Remote worker implements most of Machine's methods, many of which are evaluated locally (like Is, When, NewStateCtx). See machine.Api for a full list.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var addr string
// worker state structure
var ssStruct am.Struct
// worker state names
var ssNames am.S

// consumer
consumer := am.New(ctx, ssrpc.ConsumerStruct, nil)

// init
c, err := arpc.NewClient(ctx, addr, "clientid", ssStruct, ssNames, &arpc.ClientOpts{
    Consumer: consumer,
})
if err != nil {
    panic(err)
}

// start
c.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    c.Mach.When1(ssrpc.ClientStates.Ready, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

// use the remote worker
c.Worker.Add1("Foo", nil)
<-c.Worker.When1("Bar", nil)
print("Server added Bar")
Multiplexer

Because 1 server can serve only 1 client (for simplicity), it's often required to use a port multiplexer. It's very simple to create one using NewMux and a callback function, which returns a new server instance.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// new server per each new client (optional)
var newServer arpc.MuxNewServer = func(num int64, _ net.Conn) (*Server, error) {
    name := fmt.Sprintf("%s-%d", t.Name(), num)
    s, err := NewServer(ctx, "", name, w, nil)
    if err != nil {
        t.Fatal(err)
    }

    return s, nil
}

// start cmux
mux, err := arpc.NewMux(ctx, t.Name(), newServer, nil)
if err != nil {
    t.Fatal(err)
}
mux.Listener = listener // or mux.Addr := ":1234"
mux.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    mux.Mach.When1(ssrpc.MuxStates.Ready, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

Documentation

Benchmark: aRPC vs gRPC

A simple and opinionated benchmark showing a subscribe-get-process scenario, implemented in both gRPC and aRPC. See /examples/benchmark_grpc for details and source code.

results - KiB transferred, number of calls

> task benchmark-grpc
...
BenchmarkClientArpc
    client_arpc_test.go:136: Transferred: 609 bytes
    client_arpc_test.go:137: Calls: 4
    client_arpc_test.go:138: Errors: 0
    client_arpc_test.go:136: Transferred: 1,149,424 bytes
    client_arpc_test.go:137: Calls: 10,003
    client_arpc_test.go:138: Errors: 0
BenchmarkClientArpc-8              10000            248913 ns/op           28405 B/op        766 allocs/op
BenchmarkClientGrpc
    client_grpc_test.go:117: Transferred: 1,113 bytes
    client_grpc_test.go:118: Calls: 9
    client_grpc_test.go:119: Errors: 0
    client_grpc_test.go:117: Transferred: 3,400,812 bytes
    client_grpc_test.go:118: Calls: 30,006
    client_grpc_test.go:119: Errors: 0
BenchmarkClientGrpc-8              10000            262693 ns/op           19593 B/op        391 allocs/op
BenchmarkClientLocal
BenchmarkClientLocal-8             10000               434.4 ns/op            16 B/op          1 allocs/op
PASS
ok      github.com/pancsta/asyncmachine-go/examples/benchmark_grpc      5.187s

API

aRPC implements /pkg/machine#Api, which is a large subset of /pkg/machine#Machine methods. Below the full list, with distinction which methods happen where (locally or on remote).

// Api is a subset of Machine for alternative implementations.
type Api interface {
    // ///// REMOTE

    // Mutations (remote)

    Add1(state string, args A) Result
    Add(states S, args A) Result
    Remove1(state string, args A) Result
    Remove(states S, args A) Result
    Set(states S, args A) Result
    AddErr(err error, args A) Result
    AddErrState(state string, err error, args A) Result

    // Waiting (remote)

    WhenArgs(state string, args A, ctx context.Context) <-chan struct{}

    // Getters (remote)

    Err() error

    // ///// LOCAL

    // Checking (local)

    IsErr() bool
    Is(states S) bool
    Is1(state string) bool
    Not(states S) bool
    Not1(state string) bool
    Any(states ...S) bool
    Any1(state ...string) bool
    Has(states S) bool
    Has1(state string) bool
    IsTime(time Time, states S) bool
    IsClock(clock Clock) bool

    // Waiting (local)

    When(states S, ctx context.Context) <-chan struct{}
    When1(state string, ctx context.Context) <-chan struct{}
    WhenNot(states S, ctx context.Context) <-chan struct{}
    WhenNot1(state string, ctx context.Context) <-chan struct{}
    WhenTime(
        states S, times Time, ctx context.Context) <-chan struct{}
    WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
    WhenTicksEq(state string, tick uint64, ctx context.Context) <-chan struct{}
    WhenErr(ctx context.Context) <-chan struct{}

    // Getters (local)

    StateNames() S
    ActiveStates() S
    Tick(state string) uint64
    Clock(states S) Clock
    Time(states S) Time
    TimeSum(states S) uint64
    NewStateCtx(state string) context.Context
    Export() *Serialized
    GetStruct() Struct
    Switch(groups ...S) string

    // Misc (local)

    Log(msg string, args ...any)
    Id() string
    ParentId() string
    SetLogId(val bool)
    GetLogId() bool
    SetLogger(logger Logger)
    SetLogLevel(lvl LogLevel)
    SetLoggerEmpty(lvl LogLevel)
    SetLoggerSimple(logf func(format string, args ...any), level LogLevel)
    Ctx() context.Context
    String() string
    StringAll() string
    Inspect(states S) string
    Index(state string) int
    BindHandlers(handlers any) error
    StatesVerified() bool
    Tracers() []Tracer
    DetachTracer(tracer Tracer) bool
    BindTracer(tracer Tracer)
    Dispose()
    WhenDisposed() <-chan struct{}
    IsDisposed() bool
}

Tests

aRPC passes the whole test suite of /pkg/machine for the exposed methods and provides a couple of optimization-focused tests (on top of tests for basic RPC).

Optimizations

aRPC implements several optimization strategies to achieve the results.

  • net/rpc method names as runes
  • binary format of encoding/gob
  • index-based clock
    • [0, 100, 0, 120]
  • diff-based clock updates
    • [0, 1, 0, 1]
  • debounced server-mutation clock pushes
    • [0, 5, 2, 1]
  • partial clock updates
    • [[1, 1], [3, 1]]

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package rpc is a transparent RPC for state machines.

Index

Constants

View Source
const (
	// EnvAmRpcLogServer enables machine logging for RPC server.
	EnvAmRpcLogServer = "AM_RPC_LOG_SERVER"
	// EnvAmRpcLogClient enables machine logging for RPC client.
	EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT"
)
View Source
const EnvAmRpcLogMux = "AM_RPC_LOG_MUX"

Variables

View Source
var (
	ErrInvalidParams = errors.New("invalid params")
	ErrInvalidResp   = errors.New("invalid response")
	ErrRpc           = errors.New("rpc")
	ErrNoAccess      = errors.New("no access")
	ErrNoConn        = errors.New("not connected")

	ErrNetwork        = errors.New("network error")
	ErrNetworkTimeout = errors.New("network timeout")
)

Functions

func AddErr added in v0.8.0

func AddErr(mach *am.Machine, msg string, err error)

AddErr detects sentinels from error msgs and calls the proper error setter.

func AddErrNetwork added in v0.8.0

func AddErrNetwork(mach *am.Machine, err error)

func AddErrNoConn added in v0.8.0

func AddErrNoConn(mach *am.Machine, err error)

func AddErrParams added in v0.8.0

func AddErrParams(mach *am.Machine, err error)

func AddErrResp added in v0.8.0

func AddErrResp(mach *am.Machine, err error)

func AddErrRpcStr added in v0.8.0

func AddErrRpcStr(mach *am.Machine, msg string)

func BindServer added in v0.8.0

func BindServer(source, target *am.Machine, rpcReady, clientConn string) error

BindServer binds RpcReady and ClientConnected with Add/Remove, to custom states.

func BindServerMulti added in v0.8.0

func BindServerMulti(
	source, target *am.Machine, rpcReady, clientConn, clientDisconn string,
) error

BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected. RpcReady is Add/Remove, other two are Add-only to passed multi states.

func BindServerRpcReady added in v0.8.0

func BindServerRpcReady(source, target *am.Machine, rpcReady string) error

BindServerRpcReady bind RpcReady using Add to a custom multi state.

func ClockFromMsg

func ClockFromMsg(before am.Time, msg ClockMsg) am.Time

func GetClientId added in v0.8.0

func GetClientId(name string) string

GetClientId returns a machine ID from a name. This ID will be used to handshake the server.

func LogArgs added in v0.8.0

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A.

func Pass added in v0.8.0

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

func TrafficMeter

func TrafficMeter(
	listener net.Listener, fwdTo string, counter chan<- int64,
	end <-chan struct{},
)

Types

type A added in v0.8.0

type A struct {
	Id        string `log:"id"`
	Name      string `log:"name"`
	MachTime  am.Time
	Payload   *ArgsPayload
	Addr      string `log:"addr"`
	Err       error
	Method    string `log:"addr"`
	StartedAt time.Time
	Client    *rpc2.Client
	Dispose   bool
}

A represents typed arguments of the RPC package.

func ParseArgs added in v0.8.0

func ParseArgs(args am.A) *A

ParseArgs extracts A from am.Event.Args.

type ArgsGet

type ArgsGet struct {
	Name string
}

type ArgsLog

type ArgsLog struct {
	Msg  string
	Args []any
}

type ArgsMut

type ArgsMut struct {
	States []int
	Args   am.A
}

ArgsMut is args for mutation methods.

type ArgsPayload

type ArgsPayload struct {
	Name string
	// Source is the machine ID that sent the payload.
	Source string
	// Data is the payload data. The Consumer has to know the type.
	Data any
	// Token is a unique random ID for the payload. Autofilled by the server.
	Token string
}

type Client

type Client struct {
	*ExceptionHandler

	Mach *am.Machine
	Name string

	// Addr is the address the Client will connect to.
	Addr string
	// Worker is a remote am.Machine instance
	Worker *Worker
	// Consumer is the optional consumer for deliveries.
	Consumer   *am.Machine
	CallCount  uint64
	LogEnabled bool
	// DisconnCooldown is the time to wait after notifying the server about
	// disconnecting before actually disconnecting. Default 10ms.
	DisconnCooldown time.Duration
	// LastMsgAt is the last received msg from the worker TODO
	LastMsgAt time.Time
	// HelloDelay between Connected and Handshaking. Default 0, useful for
	// rpc/Mux.
	HelloDelay time.Duration
	// ReconnectOn decides if the client will try to [RetryingConn] after a
	// clean [Disconnect].
	ReconnectOn bool

	// ConnTimeout is the maximum time to wait for a connection to be established.
	// Default 3s.
	ConnTimeout time.Duration
	// ConnRetries is the number of retries for a connection. Default 15.
	ConnRetries int
	// ConnRetryTimeout is the maximum time to retry a connection. Default 1m.
	ConnRetryTimeout time.Duration
	// ConnRetryDelay is the time to wait between retries. Default 100ms. If
	// ConnRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	ConnRetryDelay time.Duration
	// ConnRetryBackoff is the maximum time to wait between retries. Default 3s.
	ConnRetryBackoff time.Duration

	// CallTimeout is the maximum time to wait for a call to complete. Default 3s.
	CallTimeout time.Duration
	// CallRetries is the number of retries for a call. Default 15.
	CallRetries int
	// CallRetryTimeout is the maximum time to retry a call. Default 1m.
	CallRetryTimeout time.Duration
	// CallRetryDelay is the time to wait between retries. Default 100ms. If
	// CallRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	CallRetryDelay time.Duration
	// CallRetryBackoff is the maximum time to wait between retries. Default 3s.
	CallRetryBackoff time.Duration
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	ctx context.Context, workerAddr string, name string, stateStruct am.Struct,
	stateNames am.S, opts *ClientOpts,
) (*Client, error)

NewClient creates a new RPC client and exposes a remote state machine as a remote worker, with a subst of the API under Client.Worker. Optionally takes a consumer, which is a state machine with a WorkerPayload state. See states.ConsumerStates.

func (*Client) CallRetryFailedState added in v0.8.0

func (c *Client) CallRetryFailedState(e *am.Event)

func (*Client) ConnectedState

func (c *Client) ConnectedState(e *am.Event)

func (*Client) ConnectingState

func (c *Client) ConnectingState(e *am.Event)

func (*Client) DisconnectedEnter

func (c *Client) DisconnectedEnter(e *am.Event) bool

func (*Client) DisconnectedState

func (c *Client) DisconnectedState(e *am.Event)

func (*Client) DisconnectingEnter

func (c *Client) DisconnectingEnter(e *am.Event) bool

func (*Client) DisconnectingState

func (c *Client) DisconnectingState(e *am.Event)

func (*Client) ExceptionState added in v0.8.0

func (c *Client) ExceptionState(e *am.Event)

ExceptionState handles network errors and retries the connection.

func (*Client) Get

func (c *Client) Get(ctx context.Context, name string) (*RespGet, error)

Get requests predefined data from the server's getter function.

func (*Client) GetKind

func (c *Client) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Client) HandshakeDoneEnter added in v0.8.0

func (c *Client) HandshakeDoneEnter(e *am.Event) bool

func (*Client) HandshakeDoneState

func (c *Client) HandshakeDoneState(e *am.Event)

func (*Client) HandshakingState

func (c *Client) HandshakingState(e *am.Event)

func (*Client) RemotePushAllTicks added in v0.8.0

func (c *Client) RemotePushAllTicks(
	_ *rpc2.Client, clocks []PushAllTicks, _ *Empty,
) error

RemotePushAllTicks log all the machine clock's ticks, so all final handlers can be executed in order. Only called by the server.

func (*Client) RemoteSendPayload

func (c *Client) RemoteSendPayload(
	_ *rpc2.Client, payload *ArgsPayload, _ *Empty,
) error

RemoteSendPayload receives a payload from the server and triggers WorkDelivered. The Consumer should bind his handlers and handle this state to receive the data.

func (*Client) RemoteSendingPayload added in v0.8.0

func (c *Client) RemoteSendingPayload(
	_ *rpc2.Client, payload *ArgsPayload, _ *Empty,
) error

RemoteSendingPayload triggers the WorkerDelivering state, which is an optional indication that the server has started a data transmission to the Client. This payload shouldn't contain the data itself, only the name and token.

func (*Client) RemoteSetClock

func (c *Client) RemoteSetClock(
	_ *rpc2.Client, clock ClockMsg, _ *Empty,
) error

RemoteSetClock updates the client's clock. Only called by the server.

func (*Client) RetryingCallEnter added in v0.8.0

func (c *Client) RetryingCallEnter(e *am.Event) bool

func (*Client) RetryingConnState added in v0.8.0

func (c *Client) RetryingConnState(e *am.Event)

RetryingConnState should be set without Connecting in the same tx

func (*Client) Start

func (c *Client) Start() am.Result

Start connects the client to the server and initializes the worker. Results in the Ready state.

func (*Client) StartEnd

func (c *Client) StartEnd(e *am.Event)

func (*Client) StartState added in v0.8.0

func (c *Client) StartState(e *am.Event)

func (*Client) Stop

func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result

Stop disconnects the client from the server and disposes the worker.

waitTillExit: if passed, waits for the client to disconnect using the context.

func (*Client) WorkerPayloadEnter added in v0.8.0

func (c *Client) WorkerPayloadEnter(e *am.Event) bool

func (*Client) WorkerPayloadState added in v0.8.0

func (c *Client) WorkerPayloadState(e *am.Event)

type ClientOpts added in v0.8.0

type ClientOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client adds this state to request a payload from the
	// worker. Default: am/rpc/states/WorkerStates.SendPayload.
	Consumer *am.Machine
	// Parent is a parent state machine for a new Client state machine. See
	// [am.Opts].
	Parent am.Api
}

type ClockMsg

type ClockMsg [][2]int

func NewClockMsg

func NewClockMsg(before, after am.Time) ClockMsg

type Empty

type Empty struct{}

type Event added in v0.8.0

type Event struct {
	// Name of the event / handler
	Name string
	// Machine is the machine that the event belongs to.
	Machine am.Api
}

Event struct represents a single event of a Mutation within a Transition. One event can have 0-n handlers.

type ExceptionHandler

type ExceptionHandler struct {
	*am.ExceptionHandler
}

ExceptionHandler is a shared exception handler for RPC server and client.

func (*ExceptionHandler) ExceptionEnter

func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool

type HandlerFinal added in v0.8.0

type HandlerFinal func(e *Event)

type Kind

type Kind string
const (
	KindClient Kind = "client"
	KindServer Kind = "server"
)

type Mux added in v0.8.0

type Mux struct {
	*am.ExceptionHandler
	Mach *am.Machine

	Name        string
	Addr        string
	Listener    net.Listener
	LogEnabled  bool
	NewServerFn MuxNewServer
	// The last error returned by NewServerFn.
	NewServerErr error
	// contains filtered or unexported fields
}

Mux creates a new RPC server for each incoming connection.

func NewMux added in v0.8.0

func NewMux(
	ctx context.Context, name string, newServer MuxNewServer, opts *MuxOpts,
) (*Mux, error)

func (*Mux) ClientConnectedState added in v0.8.0

func (m *Mux) ClientConnectedState(e *am.Event)

func (*Mux) ExceptionState added in v0.8.0

func (m *Mux) ExceptionState(e *am.Event)

func (*Mux) HasClientsEnd added in v0.8.0

func (m *Mux) HasClientsEnd(e *am.Event) bool

func (*Mux) HealthcheckState added in v0.8.0

func (m *Mux) HealthcheckState(e *am.Event)

func (*Mux) NewServerErrEnter added in v0.8.0

func (m *Mux) NewServerErrEnter(e *am.Event) bool

func (*Mux) NewServerErrState added in v0.8.0

func (m *Mux) NewServerErrState(e *am.Event)

func (*Mux) Start added in v0.8.0

func (m *Mux) Start() am.Result

func (*Mux) StartEnd added in v0.8.0

func (m *Mux) StartEnd(e *am.Event)

func (*Mux) StartState added in v0.8.0

func (m *Mux) StartState(e *am.Event)

func (*Mux) Stop added in v0.8.0

func (m *Mux) Stop(dispose bool) am.Result

type MuxNewServer added in v0.8.0

type MuxNewServer func(num int, conn net.Conn) (*Server, error)

MuxNewServer is a function to create a new RPC server for each incoming connection.

type MuxOpts added in v0.8.0

type MuxOpts struct {
	// Parent is a parent state machine for a new Mux state machine. See
	// [am.Opts].
	Parent am.Api
}

type PushAllTicks added in v0.8.0

type PushAllTicks struct {
	// Mutation is 0:[am.MutationType] 1-n: called state index
	Mutation []int
	ClockMsg ClockMsg
}

type RespGet

type RespGet struct {
	Value any
}

type RespHandshake

type RespHandshake = am.Serialized

type RespResult

type RespResult struct {
	Clock  ClockMsg
	Result am.Result
}

type RespSync

type RespSync struct {
	Time am.Time
}

type SendPayloadHandlers added in v0.8.0

type SendPayloadHandlers struct {
	SendPayloadState am.HandlerFinal
}

type Server

type Server struct {
	*ExceptionHandler
	Mach *am.Machine

	// Addr is the address of the server on the network.
	Addr            string
	DeliveryTimeout time.Duration
	// PushInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	PushInterval time.Duration
	// PushAllTicks will push all ticks to the client, enabling client-side final
	// handlers. TODO implement
	PushAllTicks bool
	// Listener can be set manually before starting the server.
	Listener net.Listener
	// Conn can be set manually before starting the server.
	Conn net.Conn
	// NoNewListener will prevent the server from creating a new listener if
	// one is not provided, or has been closed. Useful for cmux.
	NoNewListener bool
	LogEnabled    bool
	CallCount     uint64

	// AllowId will limit clients to a specific ID, if set.
	AllowId string
	// contains filtered or unexported fields
}

Server is an RPC server that can be bound to a worker machine and provide remote access to its states and methods.

func NewServer

func NewServer(
	ctx context.Context, addr string, name string, worker am.Api,
	opts *ServerOpts,
) (*Server, error)

NewServer creates a new RPC server, bound to a worker machine. The worker machine has to implement am/rpc/states/WorkerStatesDef interface.

func (*Server) GetKind

func (s *Server) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Server) HandshakeDoneEnd

func (s *Server) HandshakeDoneEnd(e *am.Event)

func (*Server) RemoteAdd

func (s *Server) RemoteAdd(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteAddNS

func (s *Server) RemoteAddNS(
	_ *rpc2.Client, args *ArgsMut, _ *Empty,
) error

func (*Server) RemoteBye

func (s *Server) RemoteBye(
	_ *rpc2.Client, _ *Empty, _ *Empty,
) error

RemoteBye means the client says goodbye and will disconnect shortly.

func (*Server) RemoteHandshake

func (s *Server) RemoteHandshake(
	client *rpc2.Client, id *string, _ *Empty,
) error

func (*Server) RemoteHello added in v0.8.0

func (s *Server) RemoteHello(
	client *rpc2.Client, _ *Empty, resp *RespHandshake,
) error

func (*Server) RemoteRemove

func (s *Server) RemoteRemove(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSet

func (s *Server) RemoteSet(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSetPushAllTicks added in v0.8.0

func (s *Server) RemoteSetPushAllTicks(
	_ *rpc2.Client, val bool, _ *Empty,
) error

func (*Server) RemoteSync

func (s *Server) RemoteSync(
	_ *rpc2.Client, sum uint64, resp *RespSync,
) error

func (*Server) RpcReadyEnd

func (s *Server) RpcReadyEnd(e *am.Event)

func (*Server) RpcReadyEnter added in v0.8.0

func (s *Server) RpcReadyEnter(e *am.Event) bool

func (*Server) RpcReadyState

func (s *Server) RpcReadyState(e *am.Event)

RpcReadyState starts a ticker to compensate for clock push denounces.

func (*Server) RpcStartingEnter added in v0.8.0

func (s *Server) RpcStartingEnter(e *am.Event) bool

func (*Server) RpcStartingState

func (s *Server) RpcStartingState(e *am.Event)

func (*Server) SendPayload

func (s *Server) SendPayload(ctx context.Context, payload *ArgsPayload) error

SendPayload sends a payload to the client.

func (*Server) Start

func (s *Server) Start() am.Result

Start starts the server, optionally creating a Listener (if Addr provided). Results in either RpcReady or Exception.

func (*Server) StartEnd added in v0.8.0

func (s *Server) StartEnd(e *am.Event)

func (*Server) Stop

func (s *Server) Stop(dispose bool) am.Result

Stop stops the server, and optionally disposes resources.

type ServerOpts added in v0.8.0

type ServerOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client adds this state to request a payload from the
	// worker. Default: am/rpc/states/WorkerStates.SendPayload.
	PayloadState string
	// Parent is a parent state machine for a new Server state machine. See
	// [am.Opts].
	Parent am.Api
}

type Transition added in v0.8.0

type Transition struct {
	// Machine is the parent machine of this transition.
	Machine am.Api
	// TimeBefore is the machine time from before the transition.
	TimeBefore am.Time
	// TimeAfter is the machine time from after the transition. If the transition
	// has been canceled, this will be the same as TimeBefore.
	TimeAfter am.Time
}

Transition represents processing of a single mutation within a machine.

type Worker

type Worker struct {
	ID string

	// TODO remote push
	Disposed atomic.Bool
	// contains filtered or unexported fields
}

Worker is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally.

func (*Worker) ActiveStates

func (w *Worker) ActiveStates() am.S

ActiveStates returns a copy of the currently active states.

func (*Worker) Add

func (w *Worker) Add(states am.S, args am.A) am.Result

Add activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Add1

func (w *Worker) Add1(state string, args am.A) am.Result

Add1 is a shorthand method to add a single state with the passed args.

func (*Worker) Add1NS

func (w *Worker) Add1NS(state string, args am.A) am.Result

Add1NS is a single state version of AddNS.

func (*Worker) AddErr

func (w *Worker) AddErr(err error, args am.A) am.Result

AddErr is a dedicated method to add the Exception state with the passed error and optional arguments. Like every mutation method, it will resolve relations and trigger handlers. AddErr produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddErrState

func (w *Worker) AddErrState(state string, err error, args am.A) am.Result

AddErrState adds a dedicated error state, along with the build in Exception state. Like every mutation method, it will resolve relations and trigger handlers. AddErrState produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddNS

func (w *Worker) AddNS(states am.S, args am.A) am.Result

AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.

func (*Worker) Any

func (w *Worker) Any(states ...am.S) bool

Any is group call to Is, returns true if any of the params return true from Is.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
// is(Foo, Bar) or is(Bar)
machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false
// is(Foo) or is(Bar)
machine.Any(S{"Foo"}, S{"Bar"}) // true

func (*Worker) Any1

func (w *Worker) Any1(states ...string) bool

Any1 is group call to Is1(), returns true if any of the params return true from Is1().

func (*Worker) BindHandlers added in v0.8.0

func (w *Worker) BindHandlers(handlers any) error

func (*Worker) BindTracer added in v0.8.0

func (w *Worker) BindTracer(tracer am.Tracer)

func (*Worker) Clock

func (w *Worker) Clock(states am.S) am.Clock

Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.

func (*Worker) Ctx

func (w *Worker) Ctx() context.Context

func (*Worker) DetachTracer added in v0.8.0

func (w *Worker) DetachTracer(tracer am.Tracer) bool

func (*Worker) Dispose

func (w *Worker) Dispose()

Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.

func (*Worker) Err

func (w *Worker) Err() error

Err returns the last error.

func (*Worker) Export

func (w *Worker) Export() *am.Serialized

Export exports the machine state: ID, time and state names.

func (*Worker) GetLogId added in v0.8.0

func (w *Worker) GetLogId() bool

func (*Worker) GetLogLevel added in v0.8.0

func (w *Worker) GetLogLevel() am.LogLevel

GetLogLevel returns the log level of the machine.

func (*Worker) GetLogger added in v0.8.0

func (w *Worker) GetLogger() *am.Logger

GetLogger returns the current custom logger function, or nil.

func (*Worker) GetStruct

func (w *Worker) GetStruct() am.Struct

func (*Worker) Has

func (w *Worker) Has(states am.S) bool

Has return true is passed states are registered in the machine.

func (*Worker) Has1

func (w *Worker) Has1(state string) bool

Has1 is a shorthand for Has. It returns true if the passed state is registered in the machine.

func (*Worker) Id added in v0.8.0

func (w *Worker) Id() string

Id returns the machine's ID.

func (*Worker) Index

func (w *Worker) Index(state string) int

Index returns the index of a state in the machine's StateNames() list.

func (*Worker) Inspect

func (w *Worker) Inspect(states am.S) string

Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.

func (*Worker) Is

func (w *Worker) Is(states am.S) bool

Is checks if all the passed states are currently active.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
machine.TestIs(S{"Foo"}) // true
machine.TestIs(S{"Foo", "Bar"}) // false

func (*Worker) Is1

func (w *Worker) Is1(state string) bool

Is1 is a shorthand method to check if a single state is currently active. See Is().

func (*Worker) IsClock

func (w *Worker) IsClock(clock am.Clock) bool

IsClock checks if the machine has changed since the passed clock. Returns true if at least one state has changed.

func (*Worker) IsDisposed added in v0.8.0

func (w *Worker) IsDisposed() bool

IsDisposed returns true if the machine has been disposed.

func (*Worker) IsErr

func (w *Worker) IsErr() bool

IsErr checks if the machine has the Exception state currently active.

func (*Worker) IsTime

func (w *Worker) IsTime(t am.Time, states am.S) bool

IsTime checks if the machine has changed since the passed time (list of ticks). Returns true if at least one state has changed. The states param is optional and can be used to check only a subset of states.

func (*Worker) Log

func (w *Worker) Log(msg string, args ...any)

Log logs is a remote logger.

func (*Worker) LogLvl added in v0.8.0

func (w *Worker) LogLvl(lvl am.LogLevel, msg string, args ...any)

LogLvl adds an internal log entry from the outside. It should be used only by packages extending pkg/machine. Use Log instead.

func (*Worker) MustParseStates

func (w *Worker) MustParseStates(states am.S) am.S

MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.

func (*Worker) NewStateCtx

func (w *Worker) NewStateCtx(state string) context.Context

NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.

Context cancels when the state has been de-activated, or right away, if it isn't currently active.

State contexts are used to check state expirations and should be checked often inside goroutines. TODO log reader

func (*Worker) Not

func (w *Worker) Not(states am.S) bool

Not checks if **none** of the passed states are currently active.

machine.StringAll()
// -> ()[A:0 B:0 C:0 D:0]
machine.Add(S{"A", "B"})

// not(A) and not(C)
machine.TestNot(S{"A", "C"})
// -> false

// not(C) and not(D)
machine.TestNot(S{"C", "D"})
// -> true

func (*Worker) Not1

func (w *Worker) Not1(state string) bool

Not1 is a shorthand method to check if a single state is currently inactive. See Not().

func (*Worker) ParentId added in v0.8.0

func (w *Worker) ParentId() string

TODO

func (*Worker) Remove

func (w *Worker) Remove(states am.S, args am.A) am.Result

Remove de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Remove1

func (w *Worker) Remove1(state string, args am.A) am.Result

Remove1 is a shorthand method to remove a single state with the passed args. See Remove().

func (*Worker) Set

func (w *Worker) Set(states am.S, args am.A) am.Result

Set de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) SetLogId added in v0.8.0

func (w *Worker) SetLogId(val bool)

func (*Worker) SetLogLevel added in v0.8.0

func (w *Worker) SetLogLevel(level am.LogLevel)

SetLogLevel sets the log level of the machine.

func (*Worker) SetLogger added in v0.8.0

func (w *Worker) SetLogger(fn am.Logger)

SetLogger sets a custom logger function.

func (*Worker) SetLoggerEmpty added in v0.8.0

func (w *Worker) SetLoggerEmpty(level am.LogLevel)

SetLoggerEmpty creates an empty logger that does nothing and sets the log level in one call. Useful when combined with am-dbg. Requires LogChanges log level to produce any output.

func (*Worker) SetLoggerSimple added in v0.8.0

func (w *Worker) SetLoggerSimple(
	logf func(format string, args ...any), level am.LogLevel,
)

SetLoggerSimple takes log.Printf and sets the log level in one call. Useful for testing. Requires LogChanges log level to produce any output.

func (*Worker) StateNames

func (w *Worker) StateNames() am.S

StateNames returns a copy of all the state names.

func (*Worker) StatesVerified added in v0.8.0

func (w *Worker) StatesVerified() bool

StatesVerified returns true if the state names have been ordered using VerifyStates.

func (*Worker) String

func (w *Worker) String() string

String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)

func (*Worker) StringAll

func (w *Worker) StringAll() string

StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]

func (*Worker) Switch

func (w *Worker) Switch(groups ...am.S) string

Switch returns the first state from the passed list that is currently active, making it useful for switch statements.

switch mach.Switch(ss.GroupPlaying) {
case "Playing":
case "Paused":
case "Stopped":
}

func (*Worker) Sync

func (w *Worker) Sync() am.Time

Sync requests fresh clock values from the remote machine. Useful to call after a batch of no-sync methods, eg AddNS.

func (*Worker) Tick

func (w *Worker) Tick(state string) uint64

Tick return the current tick for a given state.

func (*Worker) Time

func (w *Worker) Time(states am.S) am.Time

Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.

func (*Worker) TimeSum

func (w *Worker) TimeSum(states am.S) uint64

TimeSum returns the sum of machine's time (ticks per state). Returned value includes the specified states, or all the states if nil. It's a very inaccurate, yet simple way to measure the machine's time. TODO handle overflow

func (*Worker) Tracers added in v0.8.0

func (w *Worker) Tracers() []am.Tracer

func (*Worker) When

func (w *Worker) When(states am.S, ctx context.Context) <-chan struct{}

When returns a channel that will be closed when all the passed states become active or the machine gets disposed.

ctx: optional context that will close the channel when done. Useful when listening on 2 When() channels within the same `select` to GC the 2nd one.

func (*Worker) When1

func (w *Worker) When1(state string, ctx context.Context) <-chan struct{}

When1 is an alias to When() for a single state. See When.

func (*Worker) WhenArgs

func (w *Worker) WhenArgs(
	state string, args am.A, ctx context.Context,
) <-chan struct{}

WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.

func (*Worker) WhenDisposed

func (w *Worker) WhenDisposed() <-chan struct{}

WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.

func (*Worker) WhenErr

func (w *Worker) WhenErr(ctx context.Context) <-chan struct{}

WhenErr returns a channel that will be closed when the machine is in the Exception state.

ctx: optional context defaults to the machine's context.

func (*Worker) WhenNot

func (w *Worker) WhenNot(states am.S, ctx context.Context) <-chan struct{}

WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.

ctx: optional context that will close the channel when done. Useful when listening on 2 WhenNot() channels within the same `select` to GC the 2nd one.

func (*Worker) WhenNot1

func (w *Worker) WhenNot1(state string, ctx context.Context) <-chan struct{}

WhenNot1 is an alias to WhenNot() for a single state. See WhenNot.

func (*Worker) WhenTicks

func (w *Worker) WhenTicks(
	state string, ticks int, ctx context.Context,
) <-chan struct{}

WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.

func (*Worker) WhenTicksEq

func (w *Worker) WhenTicksEq(
	state string, ticks uint64, ctx context.Context,
) <-chan struct{}

WhenTicksEq waits till ticks for a single state equal the given absolute value (or more). Uses WhenTime underneath.

func (*Worker) WhenTime

func (w *Worker) WhenTime(
	states am.S, times am.Time, ctx context.Context,
) <-chan struct{}

WhenTime returns a channel that will be closed when all the passed states have passed the specified time. The time is a logical clock of the state. Machine time can be sourced from the Time() method, or Clock() for a specific state. TODO log reader

type WorkerTracer

type WorkerTracer struct {
	*am.NoOpTracer
	// contains filtered or unexported fields
}

WorkerTracer is a tracer for local worker machines (event source).

func (*WorkerTracer) TransitionEnd

func (t *WorkerTracer) TransitionEnd(_ *am.Transition)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL