rpc

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 21 Imported by: 0

README

asyncmachine RPC

-> go back to monorepo /

[!NOTE] asyncmachine can transform blocking APIs into controllable state machines with ease. It shares similarities with Ergo's actor model, and focuses on distributed workflows like Temporal. It's lightweight and most features are optional.

aRPC is a transparent RPC for state machines implemented using asyncmachine-go. It's clock-based and features many optimizations, e.g. having most of the API methods executed locally (as the state information is encoded as clock values). It's build on top of cenkalti/rpc2 and net/rpc. There are gRPC benchmark and video demo tutorial available.

Features

  • mutations
  • wait methods
  • clock pushes (from worker-side mutations)
  • remote contexts
  • initial optimizations
  • payload file upload
  • server getters

Not implemented (yet):

  • WhenArgs, Err()
  • client getters
  • chunked encoding
  • TLS
  • reconnect / failsafe
  • compression
  • multiplexing
  • msgpack encoding

Usage

import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssCli "github.com/pancsta/asyncmachine-go/pkg/rpc/states/client"
    ssSrv "github.com/pancsta/asyncmachine-go/pkg/rpc/states/server"
)
Server
// init
s, err := NewServer(ctx, addr, worker.ID, worker, nil)
if err != nil {
    panic(err)
}

// start
s.Start()
<-s.Mach.When1("RpcReady", nil)

// react to the client
<-worker.When1("Foo", nil)
print("Client added Foo")
worker.Add1("Bar", nil)
Client
// init
c, err := NewClient(ctx, addr, "clientid", ss.States, ss.Names)
if err != nil {
    panic(err)
}

// start
c.Start()
<-c.Mach.When1("Ready", nil)

// use the remote worker
c.Worker.Add1("Foo", nil)
<-c.Worker.When1("Bar", nil)
print("Server added Bar")

Documentation

Benchmark: aRPC vs gRPC

A simple and opinionated benchmark showing a subscribe-get-process scenario, implemented in both gRPC and aRPC. See /examples/benchmark_grpc/README.md for details and source code.

results - KiB transferred, number of calls

> task benchmark-grpc
...
BenchmarkClientArpc
    client_arpc_test.go:136: Transferred: 609 bytes
    client_arpc_test.go:137: Calls: 4
    client_arpc_test.go:138: Errors: 0
    client_arpc_test.go:136: Transferred: 1,149,424 bytes
    client_arpc_test.go:137: Calls: 10,003
    client_arpc_test.go:138: Errors: 0
BenchmarkClientArpc-8              10000            248913 ns/op           28405 B/op        766 allocs/op
BenchmarkClientGrpc
    client_grpc_test.go:117: Transferred: 1,113 bytes
    client_grpc_test.go:118: Calls: 9
    client_grpc_test.go:119: Errors: 0
    client_grpc_test.go:117: Transferred: 3,400,812 bytes
    client_grpc_test.go:118: Calls: 30,006
    client_grpc_test.go:119: Errors: 0
BenchmarkClientGrpc-8              10000            262693 ns/op           19593 B/op        391 allocs/op
BenchmarkClientLocal
BenchmarkClientLocal-8             10000               434.4 ns/op            16 B/op          1 allocs/op
PASS
ok      github.com/pancsta/asyncmachine-go/examples/benchmark_grpc      5.187s

API

aRPC implements MachineApi, which is a large subset of Machine methods. Below the full list, with distinction which methods happen where (locally or on remote).

// MachineApi is a subset of `pkg/machine#Machine` for alternative
// implementations.
type MachineApi interface {

    // ///// REMOTE

    // Mutations (remote)

    Add1(state string, args am.A) am.Result
    Add(states am.S, args am.A) am.Result
    Remove1(state string, args am.A) am.Result
    Remove(states am.S, args am.A) am.Result
    Set(states am.S, args am.A) am.Result
    AddErr(err error, args am.A) am.Result
    AddErrState(state string, err error, args am.A) am.Result

    // Waiting (remote)

    WhenArgs(state string, args am.A, ctx context.Context) <-chan struct{}

    // Getters (remote)

    Err() error

    // Misc (remote)

    Log(msg string, args ...any)

    // ///// LOCAL

    // Checking (local)

    IsErr() bool
    Is(states am.S) bool
    Is1(state string) bool
    Not(states am.S) bool
    Not1(state string) bool
    Any(states ...am.S) bool
    Any1(state ...string) bool
    Has(states am.S) bool
    Has1(state string) bool
    IsTime(time am.Time, states am.S) bool
    IsClock(clock am.Clock) bool

    // Waiting (local)

    When(states am.S, ctx context.Context) <-chan struct{}
    When1(state string, ctx context.Context) <-chan struct{}
    WhenNot(states am.S, ctx context.Context) <-chan struct{}
    WhenNot1(state string, ctx context.Context) <-chan struct{}
    WhenTime(
        states am.S, times am.Time, ctx context.Context) <-chan struct{}
    WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
    WhenTicksEq(state string, tick uint64, ctx context.Context) <-chan struct{}
    WhenErr(ctx context.Context) <-chan struct{}

    // Getters (local)

    StateNames() am.S
    ActiveStates() am.S
    Tick(state string) uint64
    Clock(states am.S) am.Clock
    Time(states am.S) am.Time
    TimeSum(states am.S) uint64
    NewStateCtx(state string) context.Context
    Export() *am.Serialized
    GetStruct() am.Struct

    // Misc (local)

    String() string
    StringAll() string
    Inspect(states am.S) string
    Index(state string) int
    Dispose()
    WhenDisposed() <-chan struct{}
}

Tests

aRPC passes the whole test suite of /pkg/machine for the exposed methods and provides a couple of optimization-focused tests, on top of tests for basic RPC.

Optimizations

aRPC implements several optimization strategies to achieve the results.

  • net/rpc method names as runes
  • binary format of encoding/gob
  • index-based clock
    • [0, 100, 0, 120]
  • diff-based clock updates
    • [0, 1, 0, 1]
  • debounced server-mutation clock pushes
    • [0, 5, 2, 1]
  • partial clock updates
    • [[1, 1], [3, 1]]

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package rpc is a transparent RPC for state machines.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidParams = errors.New("invalid params")
	ErrInvalidResp   = errors.New("invalid response")
	ErrRpc           = errors.New("rpc")

	ErrNetwork        = errors.New("network error")
	ErrNetworkTimeout = errors.New("network timeout")
)

Functions

func ClockFromMsg

func ClockFromMsg(before am.Time, msg ClockMsg) am.Time

func TrafficMeter

func TrafficMeter(
	listener net.Listener, fwdTo string, counter chan<- int64,
	end <-chan struct{},
)

Types

type ArgsGet

type ArgsGet struct {
	Name string
}

type ArgsLog

type ArgsLog struct {
	Msg  string
	Args []any
}

type ArgsMut

type ArgsMut struct {
	States []int
	Args   am.A
}

ArgsMut is args for mutation methods.

type ArgsPayload

type ArgsPayload struct {
	Name string
	Data []byte
}

type Client

type Client struct {
	*ExceptionHandler

	Mach *am.Machine
	// Worker is a remote Worker instance
	Worker      *Worker
	Payloads    map[string]*ArgsPayload
	CallCount   uint64
	LogEnabled  bool
	CallTimeout time.Duration
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	ctx context.Context, workerAddr string, id string, stateStruct am.Struct,
	stateNames am.S,
) (*Client, error)

func (*Client) ConnectedState

func (c *Client) ConnectedState(e *am.Event)

func (*Client) ConnectingState

func (c *Client) ConnectingState(e *am.Event)

func (*Client) DisconnectedEnter

func (c *Client) DisconnectedEnter(e *am.Event) bool

func (*Client) DisconnectedState

func (c *Client) DisconnectedState(e *am.Event)

func (*Client) DisconnectingEnter

func (c *Client) DisconnectingEnter(e *am.Event) bool

func (*Client) DisconnectingState

func (c *Client) DisconnectingState(e *am.Event)

func (*Client) Get

func (c *Client) Get(ctx context.Context, name string) (*RespGet, error)

Get requests predefined data from the server's getter function.

func (*Client) GetKind

func (c *Client) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Client) HandshakeDoneState

func (c *Client) HandshakeDoneState(e *am.Event)

func (*Client) HandshakingState

func (c *Client) HandshakingState(e *am.Event)

func (*Client) RemoteSendPayload

func (c *Client) RemoteSendPayload(
	_ *rpc2.Client, file *ArgsPayload, _ *Empty,
) error

RemoteSendPayload receives a payload from the server. Only called by the server.

func (*Client) RemoteSetClock

func (c *Client) RemoteSetClock(
	_ *rpc2.Client, clock ClockMsg, _ *Empty,
) error

RemoteSetClock updates the client's clock. Only called by the server.

func (*Client) Start

func (c *Client) Start() am.Result

Start connects the client to the server and initializes the worker. Results in the Ready state.

func (*Client) StartEnd

func (c *Client) StartEnd(e *am.Event)

func (*Client) Stop

func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result

Stop disconnects the client from the server and disposes the worker. waitTillExit: if passed, waits for the client to disconnect using the context.

type ClockMsg

type ClockMsg [][2]int

func NewClockMsg

func NewClockMsg(before, after am.Time) ClockMsg

type Empty

type Empty struct{}

type ExceptionHandler

type ExceptionHandler struct {
	*am.ExceptionHandler
}

ExceptionHandler is a shared exception handler for RPC server and client.

func (*ExceptionHandler) ExceptionEnter

func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool

func (*ExceptionHandler) ExceptionState

func (h *ExceptionHandler) ExceptionState(e *am.Event)

type GetterFunc

type GetterFunc func(string) any

type Kind

type Kind string
const (
	KindClient Kind = "client"
	KindServer Kind = "server"
)

type RespGet

type RespGet struct {
	Value any
}

type RespHandshake

type RespHandshake = am.Serialized

type RespResult

type RespResult struct {
	Clock  ClockMsg
	Result am.Result
}

type RespSync

type RespSync struct {
	Time am.Time
}

type Server

type Server struct {
	*ExceptionHandler

	Addr string
	Mach *am.Machine
	// ClockInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	ClockInterval time.Duration
	// Listener can be set manually before starting the server.
	Listener   net.Listener
	LogEnabled bool
	CallCount  uint64
	// contains filtered or unexported fields
}

func NewServer

func NewServer(
	ctx context.Context, addr string, id string, worker *am.Machine,
	getter GetterFunc,
) (*Server, error)

func (*Server) GetKind

func (s *Server) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Server) HandshakeDoneEnd

func (s *Server) HandshakeDoneEnd(e *am.Event)

func (*Server) RemoteAdd

func (s *Server) RemoteAdd(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteAddNS

func (s *Server) RemoteAddNS(
	_ *rpc2.Client, args *ArgsMut, _ *Empty,
) error

func (*Server) RemoteBye

func (s *Server) RemoteBye(
	_ *rpc2.Client, _ *Empty, _ *Empty,
) error

func (*Server) RemoteGet

func (s *Server) RemoteGet(
	_ *rpc2.Client, name string, resp *RespGet,
) error

func (*Server) RemoteHandshake

func (s *Server) RemoteHandshake(
	client *rpc2.Client, _ *Empty, resp *RespHandshake,
) error

func (*Server) RemoteHandshakeAck

func (s *Server) RemoteHandshakeAck(
	client *rpc2.Client, done *bool, _ *Empty,
) error

func (*Server) RemoteRemove

func (s *Server) RemoteRemove(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSet

func (s *Server) RemoteSet(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSync

func (s *Server) RemoteSync(
	_ *rpc2.Client, sum uint64, resp *RespSync,
) error

func (*Server) RpcReadyEnd

func (s *Server) RpcReadyEnd(e *am.Event)

func (*Server) RpcReadyState

func (s *Server) RpcReadyState(e *am.Event)

RpcReadyState starts a ticker to compensate for clock push denounces.

func (*Server) RpcStartingState

func (s *Server) RpcStartingState(e *am.Event)

func (*Server) SendPayload

func (s *Server) SendPayload(ctx context.Context, file *ArgsPayload) error

SendPayload sends a payload to the client.

func (*Server) Start

func (s *Server) Start() am.Result

Start starts the server, optionally creates a listener if not provided and results in the Ready state.

func (*Server) Stop

func (s *Server) Stop(dispose bool) am.Result

Stop stops the server, and optionally disposes resources.

type Worker

type Worker struct {
	ID  string
	Ctx context.Context
	// TODO remote push
	Disposed atomic.Bool
	// contains filtered or unexported fields
}

Worker is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally.

func (*Worker) ActiveStates

func (w *Worker) ActiveStates() am.S

ActiveStates returns a copy of the currently active states.

func (*Worker) Add

func (w *Worker) Add(states am.S, args am.A) am.Result

Add activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Add1

func (w *Worker) Add1(state string, args am.A) am.Result

Add1 is a shorthand method to add a single state with the passed args.

func (*Worker) Add1NS

func (w *Worker) Add1NS(state string, args am.A) am.Result

Add1NS is a single state version of AddNS.

func (*Worker) AddErr

func (w *Worker) AddErr(err error, args am.A) am.Result

AddErr is a dedicated method to add the Exception state with the passed error and optional arguments. Like every mutation method, it will resolve relations and trigger handlers. AddErr produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddErrState

func (w *Worker) AddErrState(state string, err error, args am.A) am.Result

AddErrState adds a dedicated error state, along with the build in Exception state. Like every mutation method, it will resolve relations and trigger handlers. AddErrState produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddNS

func (w *Worker) AddNS(states am.S, args am.A) am.Result

AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.

func (*Worker) Any

func (w *Worker) Any(states ...am.S) bool

Any is group call to Is, returns true if any of the params return true from Is.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
// is(Foo, Bar) or is(Bar)
machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false
// is(Foo) or is(Bar)
machine.Any(S{"Foo"}, S{"Bar"}) // true

func (*Worker) Any1

func (w *Worker) Any1(states ...string) bool

Any1 is group call to Is1(), returns true if any of the params return true from Is1().

func (*Worker) Clock

func (w *Worker) Clock(states am.S) am.Clock

Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.

func (*Worker) Dispose

func (w *Worker) Dispose()

Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.

func (*Worker) Err

func (w *Worker) Err() error

Err returns the last error.

func (*Worker) Export

func (w *Worker) Export() *am.Serialized

Export exports the machine state: ID, time and state names.

func (*Worker) GetStruct

func (w *Worker) GetStruct() am.Struct

func (*Worker) Has

func (w *Worker) Has(states am.S) bool

Has return true is passed states are registered in the machine.

func (*Worker) Has1

func (w *Worker) Has1(state string) bool

Has1 is a shorthand for Has. It returns true if the passed state is registered in the machine.

func (*Worker) Index

func (w *Worker) Index(state string) int

Index returns the index of a state in the machine's StateNames() list.

func (*Worker) Inspect

func (w *Worker) Inspect(states am.S) string

Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.

func (*Worker) Is

func (w *Worker) Is(states am.S) bool

Is checks if all the passed states are currently active.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
machine.Is(S{"Foo"}) // true
machine.Is(S{"Foo", "Bar"}) // false

func (*Worker) Is1

func (w *Worker) Is1(state string) bool

Is1 is a shorthand method to check if a single state is currently active. See Is().

func (*Worker) IsClock

func (w *Worker) IsClock(clock am.Clock) bool

IsClock checks if the machine has changed since the passed clock. Returns true if at least one state has changed.

func (*Worker) IsErr

func (w *Worker) IsErr() bool

IsErr checks if the machine has the Exception state currently active.

func (*Worker) IsTime

func (w *Worker) IsTime(t am.Time, states am.S) bool

IsTime checks if the machine has changed since the passed time (list of ticks). Returns true if at least one state has changed. The states param is optional and can be used to check only a subset of states.

func (*Worker) Log

func (w *Worker) Log(msg string, args ...any)

Log logs an [extern] message unless LogNothing is set (default). Optionally redirects to a custom logger from SetLogger.

func (*Worker) MustParseStates

func (w *Worker) MustParseStates(states am.S) am.S

MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.

func (*Worker) NewStateCtx

func (w *Worker) NewStateCtx(state string) context.Context

NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.

Context cancels when the state has been de-activated, or right away, if it isn't currently active.

State contexts are used to check state expirations and should be checked often inside goroutines. TODO reuse existing ctxs

func (*Worker) Not

func (w *Worker) Not(states am.S) bool

Not checks if **none** of the passed states are currently active.

machine.StringAll()
// -> ()[A:0 B:0 C:0 D:0]
machine.Add(S{"A", "B"})

// not(A) and not(C)
machine.Not(S{"A", "C"})
// -> false

// not(C) and not(D)
machine.Not(S{"C", "D"})
// -> true

func (*Worker) Not1

func (w *Worker) Not1(state string) bool

Not1 is a shorthand method to check if a single state is currently inactive. See Not().

func (*Worker) Remove

func (w *Worker) Remove(states am.S, args am.A) am.Result

Remove de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Remove1

func (w *Worker) Remove1(state string, args am.A) am.Result

Remove1 is a shorthand method to remove a single state with the passed args. See Remove().

func (*Worker) Set

func (w *Worker) Set(states am.S, args am.A) am.Result

Set de-activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) StateNames

func (w *Worker) StateNames() am.S

StateNames returns a copy of all the state names.

func (*Worker) String

func (w *Worker) String() string

String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)

func (*Worker) StringAll

func (w *Worker) StringAll() string

StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]

func (*Worker) Switch

func (w *Worker) Switch(states ...string) string

Switch returns the first state from the passed list that is currently active, making it useful for switch statements.

switch mach.Switch(ss.GroupPlaying...) {
case "Playing":
case "Paused":
case "Stopped":
}

func (*Worker) Sync

func (w *Worker) Sync() am.Time

Sync requests fresh clock values from the remote machine. Useful to call after a batch of no-sync methods, eg AddNS.

func (*Worker) Tick

func (w *Worker) Tick(state string) uint64

Tick return the current tick for a given state.

func (*Worker) Time

func (w *Worker) Time(states am.S) am.Time

Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.

func (*Worker) TimeSum

func (w *Worker) TimeSum(states am.S) uint64

TimeSum returns the sum of machine's time (ticks per state). Returned value includes the specified states, or all the states if nil. It's a very inaccurate, yet simple way to measure the machine's time. TODO handle overflow

func (*Worker) When

func (w *Worker) When(states am.S, ctx context.Context) <-chan struct{}

When returns a channel that will be closed when all the passed states become active or the machine gets disposed.

ctx: optional context that will close the channel when done. Useful when listening on 2 When() channels within the same `select` to GC the 2nd one. TODO re-use channels with the same state set and context

func (*Worker) When1

func (w *Worker) When1(state string, ctx context.Context) <-chan struct{}

When1 is an alias to When() for a single state. See When.

func (*Worker) WhenArgs

func (w *Worker) WhenArgs(
	state string, args am.A, ctx context.Context,
) <-chan struct{}

WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.

func (*Worker) WhenDisposed

func (w *Worker) WhenDisposed() <-chan struct{}

WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.

func (*Worker) WhenErr

func (w *Worker) WhenErr(ctx context.Context) <-chan struct{}

WhenErr returns a channel that will be closed when the machine is in the Exception state.

ctx: optional context defaults to the machine's context.

func (*Worker) WhenNot

func (w *Worker) WhenNot(states am.S, ctx context.Context) <-chan struct{}

WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.

ctx: optional context that will close the channel when done. Useful when listening on 2 WhenNot() channels within the same `select` to GC the 2nd one.

func (*Worker) WhenNot1

func (w *Worker) WhenNot1(state string, ctx context.Context) <-chan struct{}

WhenNot1 is an alias to WhenNot() for a single state. See WhenNot.

func (*Worker) WhenTicks

func (w *Worker) WhenTicks(
	state string, ticks int, ctx context.Context,
) <-chan struct{}

WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.

func (*Worker) WhenTicksEq

func (w *Worker) WhenTicksEq(
	state string, ticks uint64, ctx context.Context,
) <-chan struct{}

WhenTicksEq waits till ticks for a single state equal the given absolute value (or more). Uses WhenTime underneath.

func (*Worker) WhenTime

func (w *Worker) WhenTime(
	states am.S, times am.Time, ctx context.Context,
) <-chan struct{}

WhenTime returns a channel that will be closed when all the passed states have passed the specified time. The time is a logical clock of the state. Machine time can be sourced from the Time() method, or Clock() for a specific state.

type WorkerTracer

type WorkerTracer struct {
	*am.NoOpTracer
	// contains filtered or unexported fields
}

func (*WorkerTracer) TransitionEnd

func (t *WorkerTracer) TransitionEnd(_ *am.Transition)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL