rpc25519

package module
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: BSD-3-Clause, ISC Imports: 59 Imported by: 1

README

rpc25519: ed25519 based RPC for Go/golang

  • Recent News (2025 January 11): v1.6.0 adds compression.

By default S2 compression is used. See the link below for details.

"S2 is designed to have high throughput on content that cannot be compressed. This is important, so you don't have to worry about spending CPU cycles on already compressed data." -- Klaus Post

https://pkg.go.dev/github.com/klauspost/compress/s2

Also available are the LZ4 (lz4) and the Zstandard (zstd) compression algorithms by Yann Collet. Zstandard compression is available at levels 01, 03, 07, and 11; these go from fastest (01) to most compressed(11) in trading off time for space.

The sender decides on one of these supported compression algorithms, and the reader decodes what it gets, in a "reader-makes-right" pattern. This allows you to benchmark different compression approaches to uploading your data quickly, without restarting the server. The server will note which compressor the client last used, and will use that same compression in its responses.

some quick compression benchmarks (2025 January 11)

To roughly compare the compression algorithms, I prepared a 1.5 GB tar file of mixed source code text and binary assets. It was a raw tar of the filesystem, and not compressed with gzip or bzip2 after tarring it up. If given random data, all these compressors perform about the same. With a mix of compressible and incompressible, however, we can see some performance differences.

Total file size: 1_584_015_360 bytes.

Network: an isolated local LAN transfer, to take the variability of WAN links out of the picture. The bandwith was computed from the elapsed time to do the one-way upload (lower elapsed time, which means higher bandwidth, is better).

compressor   bandwidth       total elapsed time for one-way transfer
----------   ------------    ---------------------------------------
   s2      163.412400 MB/sec; total time for upload: 9.2 seconds
  lz4      157.343690 MB/sec; total time for upload: 9.6 seconds
zstd:01    142.088387 MB/sec; total time for upload: 10.6 seconds
zstd:03    130.871089 MB/sec; total time for upload: 11.6 seconds
zstd:07    121.209097 MB/sec; total time for upload: 12.5 seconds
zstd:11     27.766271 MB/sec; total time for upload: 54.4 seconds

The default compressor in rpc25519, at the moment, is S2. It is the fastest of the bunch. Use the -press flag in the cli / srv demo commands to set the compression in your own benchmarks.

Users implementing application-specific compression strategies can opt out of compression, if desired. To disable all rpc25519 system-level compression on a per-Message basis, simply set the HDR flag NoSystemCompression to true. This may be useful if you already know your data is incompressible. The system will not apply any compression algorithm to such messages.

  • Recent News (2025 January 04): (Happy New Year!)

For bulk uploads and downloads, v1.3.0 has streaming support. A stream can have any number of Messages in its sequence, and they can be handled by a single server function.

See example.go and the cli_test.go tests 045, 055, and 065.

The structs Downloader, Uploader, and Bistreamer (both up and downloads) provide support.

  • Recent News (2024 December 28): remote cancellation support with context.Context

We recently implemented (in v1.2.0) remote call cancellation based on the standard library's context.Context.

The details:

If the client closes the cancelJobCh that was supplied to a previous SendAndGetReply() call, a cancellation request will be sent to the server.

On the server, the request.HDR.Ctx will be set, having be created by context.WithCancel; so the request.HDR.Ctx.Done() channel can be honored.

Similarly, for traditional net/rpc calls, the Client.Go() and Client.Call() methods will respect the optional octx context and transmit a cancellation request to the server. On the server, the net/rpc methods registered must (naturally) be using the first-parameter ctx form of registration in order to be cancellation aware.

Any client-side blocked calls will return on cancellation. The msg.LocalErr for the []byte oriented Message API will be set. For net/rpc API users, the Client.Go() returned call will have the call.Error set. Users of Client.Call() will see the returned error set.

In all cases, the error will be ErrCancelReqSent if the call was in flight on the server, but will be ErrDone if the original call had not been transmitted yet.

Note that context-based cancellation requires the cooperation of the registered server-side call implementations. Go does not support goroutine cancelation, so the remote methods must implement and honor context awareness in order for the remote cancellation message to have effect.

overview

Motivation: I needed a small, simple, and compact RPC system with modern, strong cryptography for goq. To that end, rpc25519 uses only ed25519 keys for its public-key cryptography. A pre-shared-key layer can also be configured for post-quantum security.

Excitedly, I am delighted to report this package also supports QUIC as a transport. QUIC is very fast even though it is always encrypted. This is due to its 0-RTT design and the mature quic-go implementation of the protocol. QUIC allows a local client and server in the same process to share a UDP port. This feature can be useful for conserving ports and connecting across networks.

After tuning and hardening, the UDP/QUIC versus TCP/TLS decision is not really difficult if the client is new every time. In our measurements, TLS (over TCP) has both better connection latency and better throughput than QUIC (over UDP). QUIC does not get to take advantage of its optimization for 0-RTT re-connection under these circumstances (when the client is new). If you have clients that frequently re-connect after loosing network connectivity, then measure QUIC versus TLS/TCP in your application. Otherwise, for performance, prefer TLS/TCP over QUIC. The latency of TLS is better and, moreover, the throughput of TLS can be much better (4-5x greater). If client port re-use and conservation is a needed, then QUIC may be your only choice.

The rpc25519 package docs are here.

Benchmarks versus other rpc systems are here: https://github.com/glycerine/rpcx-benchmark

  • A note on the relaxed semantic versioning scheme in use:

Our versioning scheme, due to various long reasons, is atypical.

We offer only the following with respect to breakage:

Within a minor version v1.X, we will not make breaking API changes.

However, when we increment to v1.(X+1), then there may be breaking changes. Stay with v1.X.y where y is the largest number to avoid breaks. That said, we strive to keep them minimal.

  • Only use tagged releases for testing/production, as the main master branch often has the latest work-in-progress development with red tests and partially implemented solutions.

  • getting started

# to install/get started: 
#   *warning: ~/go/bin/{srv,cli,selfy,greenpack} are written
#

 git clone https://github.com/glycerine/greenpack ## pre-req
 cd greenpack; make; cd ..
 git clone https://github.com/glycerine/rpc25519
 cd rpc25519;  make
 
 # make test keys and CA: saved to ~/.config/rpc25519/certs
 # and ~/.config/rpc25519/my-keep-private-dir
 ./selfy -k client -nopass; ./selfy -k node -nopass 
 ./selfy -gensym psk.binary ## saved to my-keep-private-dir/psk.binary
 make run && make runq  ## verify TLS over TCP and QUIC
 

For getting started, see the small example programs here: https://github.com/glycerine/rpc25519/tree/master/cmd . These illustrate client (cli), server (srv), and QUIC port sharing by a client and a server (samesame). The tests in srv_test.go and cli_test.go also make great starting points. Use, e.g. cli -h or srv -h to see the flag guidance.

overview

rpc25519 is a Remote Procedure Call (RPC) system with two APIs.

We offer both a traditional net/rpc style API, and a generic []byte oriented API for carrying user typed or self describing []byte payloads (in Message.JobSerz).

As of v1.1.0, the net/rpc API has been updated to use greenpack encoding rather than gob encoding, to provide a self-describing, evolvable serialization format. Greenpack allows fields to be added or deprecated over time and is multi-language compatible. We re-used net/rpc's client-facing API layer, and wired it into/on top of our native []byte slice Message transport infrastructure. (The LICENSE file reflects this code re-use.) Instead of taking any struct, arguments and responses must now have greenpack generated methods. Typically this means adding //go:generate greenpack to the files that define the structs that will go over the wire, and running go generate.

rpc25519 was built originally for the distributed job management use-case, and so uses TLS/QUIC directly. It does not use http, except perhaps in the very first contact: like net/rpc, there is limited support for http CONNECT based hijacking; see the cfg.HTTPConnectRequired flag. Nonetheless, this protocol remains distinct from http. In particular, note that the connection hijacking does not work with (https/http2/http3) encrypted protocols.

The generic byte-slice API is designed to work smoothly with our greenpack serialization format that requires no extra IDL file. See the https://github.com/glycerine/rpc25519/blob/master/hdr.go#L18 file herein, for example.

Using the rpc25519.Message based API:

  func ExampleOneWayFunc(req *Message) { ... }
  func ExampleTwoWayFunc(req *Message, reply *Message) error { ... }

The central Message struct itself is simple.

  type Message struct {

   // HDR contains header information. See hdr.go.
   HDR HDR `zid:"0"`

   // JobSerz is the "body" of the message.
   // The user provides and interprets this.
   JobSerz []byte `zid:"1"`

   // JobErrs returns error information from 
   // user-defined callback functions. If a 
   // TwoWayFunc returns a non-nil error, its
   // err.Error() will be set here.
   JobErrs string `zid:"2"`

   // LocalErr is not serialized on the wire.
   // It communicates only local (client/server side) 
   // API information. For example, Server.SendMessage() or
   // Client.SendAndGetReply() can read it after
   // DoneCh has been received on.
   //
   // Callback functions should convey 
   // errors in JobErrs (by returning an error); 
   // or in-band within JobSerz.
   LocalErr error `msg:"-"`

   // DoneCh will receive this Message itself when the call completes.
   // It must be buffered, with at least capacity 1.
   // NewMessage() automatically allocates DoneCh correctly and
   // should always be used when creating a new Message.
   DoneCh chan *Message `msg:"-"`
}

Using the net/rpc API:

  • Server.Register() registers structs with callback methods on them. For a struct called Service, this method would be identified and registered:
  func (s *Service) NoContext(args *Args, reply *Reply) error { ... }

See the net/rpc docs for full guidance on using that API.

  • Extended method types:

Callback methods in the net/rpc style traditionally look like the NoContext method above. We also allow a ctx context.Context as an additional first parameter. This is an extension to what net/rpc provides. The ctx will have an "HDR" value set on it giving a pointer to the rpc25519.HDR header from the incoming Message.

func (s *Service) GetsContext(ctx context.Context, args *Args, reply *Reply) error {
   if hdr, ok := rpc25519.HDRFromContext(ctx); ok {
        fmt.Printf("GetsContext called with HDR = '%v'; "+
           "HDR.Nc.RemoteAddr() gives '%v'; HDR.Nc.LocalAddr() gives '%v'\n", 
           hdr.String(), hdr.Nc.RemoteAddr(), hdr.Nc.LocalAddr())
   } else {
      fmt.Println("HDR not found")
   }
   ...
   return nil
}

The net/rpc API is implemented as a layer on top of the rpc25519.Message based API. Both can be used concurrently if desired.

In the Message API, server push is available. Use Client.GetReadIncomingCh or Client.GetReads on the client side to receive server initiated messages. To push from the server (in a callback func), see Server.SendMessage. An live application example of server push is here, in the ServerCallbackMgr.pushToClient() method.

See the full source for my distributed job-queuing server goq as an example application that uses most all features of this, the rpc25519 package.

In the following we'll look at choice of transport, why public-key certs are preferred, and how to use the included selfy tool to easily generate self-signed certificates.

TLS-v1.3 over TCP

Three transports are available: TLS-v1.3 over TCP, plain TCP, and QUIC which uses TLS-v1.3 over UDP.

How to KYC or Know Your Clients

How do we identify our clients in an RPC situation?

We have clients and servers in RPC. For that matter, how do we authenticate the server too?

The identity of an both client and server, either end of a connection, is established with a private-key/public-key pair in which the remote party proves posession of a private-key and we can confirm that the associated public-key has been signed by our certificate authority. Public keys signed in this way are more commonly called certificates or "certs".

An email address for convenience in identifying jobs can be listed in the certificate.

This is a much saner approach than tying a work-load identity to a specific machine, domain name, or machine port. Access to the private key corresponding to our cert should convey identity during a TLS handshake. The later part of the handshake verifies that the key was signed by our CA. This suffices. We may also want to reject based on IP address to block off clearly irrelevant traffic; both to for DDos mitigation and to keep our CPU cycles low, but these are second order optimizations. The crytographic proof is the central identifying factor.

Such a requirement maintains the strongest security but still allows elastic clouds to grow and shrink and migrate the workloads of both clients and servers. It allows for sane development and deployment on any hardware a developer can access with ssh, without compromising on encryption on either end.

TOFU or not

By creating (touch will do) a known_client_keys file on the server directory where the server is running, you activate the key tracking system on the server.

Similarly, by touching the known_server_keys file in the directory where the client is running, you tell the client to record the server certificates it has seen.

Without these files, no history of seen certs is recorded. While certs are still checked (assuming you have left SkipVerifyKeys at the default false value in the config), they are not remembered.

In a typical use case, touch the file to record the certs in use, then make the known keys file read-only to block out any new certs.

This works because clients are accepted or rejected depending on the writability of the known_client_keys file, and the presence of their certificates in that file.

Trust on first use, or TOFU, is used if the known_client_keys file is writable. When the file is writable and a new client arrives, their public key and email (if available) are stored. This is similar to how the classic ~/.ssh/known_hosts file works, but with a twist.

The twist is that once the desired identities have been recorded, the file can be used to reject any new or unknown certs from new or unkown clients.

If the file has been made unwritable (say with chmod -w known_client_keys), then we are "locked down". In this case, TOFU is not allowed and only existing keys will be accepted. Any others whose public keys are not in the known_client_hosts file will be dropped during the TLS handshake.

These unknown clients are usually just attackers that should in fact be rejected soundly. In case of misconfiguration however, all clients should be prepared to timeout as, if they are not on the list, they will be shunned and shown no further attention or packets. Without a timeout, they may well hang indefinitely waiting for network activity.

encryption details

Modern Ed25519 keys are used with TLS-1.3. The TLS_CHACHA20_POLY1305_SHA256 cipher suite is the only one configured. This is similar to the crypto suite used in Wireguard (TailScale), the only difference being that Wireguard uses Blake2s as a hash function (apparently faster than SHA-256 when hardware support SHA-extensions/SHA-NI instructions are not available). The ChaCha20, Poly1305, and Curve25519 parts are the same.

Config.PreSharedKeyPath allows specifying a 32 byte pre-shared key file for further security. An additional, independently keyed layer of ChaCha20-Poly1305 stream cipher/AEAD will be applied by mixing that key into the shared secret from an ephemeral Elliptic Curve Diffie-Hellman handshake. The same pre-shared-key must be pre-installed on both client and server.

The pre-shared-key traffic is "tunnelled" or runs inside the outer encryption layer. Thus a different symmetric encryption scheme could be wired in without much difficulty.

The pre-shared-key file format is just raw random binary bytes. See the srv_test.go Test011_PreSharedKey_over_TCP test in https://github.com/glycerine/rpc25519/blob/master/srv_test.go#L297 for an example of using NewChaCha20CryptoRandKey() to generate a key programmatically. Or just use

selfy -gensym my_pre_shared_key.binary

on the command line. For safety, selfy -gensym will not over-write an existing file. If you want to change keys, mv the old key out of the way first.

security posture for both extremes

The strength of security is controlled by the Config options to NewServer() and NewClient(). This section was written before we added the second symmetric encryption by pre-shared key option. All comments below about lack of security (e.g. in TCPonly_no_TLS = true mode) should be read modulo the pre-shared-key stuff: assume there is no 2nd layer.

See the cli.go file and the Config struct there; also copied below.

By default security is very strong, requiring TLS-1.3 and valid signed client certs, but allowing TOFU for previously unseen clients who come bearing valid certs; those signed by our CA.

This allows one to test one's initial setup with a minimum of fuss.

Further hardening (into a virtual fortress) can then be accomplished by making read-only the set of already seen clients; with chmod -w known_client_keys.

With this done, only those clients who we have already seen will be permitted in; and these are clients bearing proper certs signed by our CA private key; this will be verified during the TLS handshake. Any others will be rejected.

On the other extreme, setting TCPonly_no_TLS to true means we will use only TCP, no TLS at all, and everything will be transmitted in clear text.

In the middle of the road, setting Config.SkipVerifyKeys to true means the server will act like an HTTPS web server: any client with any key will be allowed to talk to the server, but TLS-1.3 will encrypt the traffic. Whenever TLS is used, it is always TLS-1.3 and set to the tls.TLS_CHACHA20_POLY1305_SHA256 cipher suite. Thus this setting is of no use for authenticating clients; it should almost never be used since it opens up your server to the world. But for web servers, this may be desirable.

Note that both sides of the connection (client and server) must agree to -skip-verify (Config.SkipVerifyKeys = true); otherwise the signing authority (CA) must agree. Under the default, SkipVerifyKeys = false, by signing only your own keys, with your own CA, that you keep private from the world, you maintain very tight control over access to your server.

We only create and only accept Ed25519 keys (with SkipVerifyKeys off/false, the default). (See selfy below, or the included gen.sh script to create keys).

type Config struct {

   // ServerAddr host:port where the server should listen.
   ServerAddr string

   // optional. Can be used to suggest that the
   // client use a specific host:port. NB: For QUIC, by default, the client and
   // server will share the same port if they are in the same process.
   // In that case this setting will definitely be ignored.
   ClientHostPort string

   // Who the client should contact
   ClientDialToHostPort string

   // TCP false means TLS-1.3 secured. 
   // So true here means do TCP only; with no encryption.
   TCPonly_no_TLS bool

   // UseQUIC cannot be true if TCPonly_no_TLS is true.
   UseQUIC bool

   // path to certs/ like certificate
   // directory on the live filesystem.
   CertPath string

   // SkipVerifyKeys true allows any incoming
   // key to be signed by
   // any CA; it does not have to be ours. Obviously
   // this discards almost all access control; it
   // should rarely be used unless communication
   // with the any random agent/hacker/public person
   // is desired.
   SkipVerifyKeys bool

   // default "client" means use certs/client.crt and certs/client.key
   ClientKeyPairName string 
   
   // default "node" means use certs/node.crt and certs/node.key
   ServerKeyPairName string 

   // PreSharedKeyPath locates an optional pre-shared
   // binary that  must be 32 bytes long.
   // If supplied, this key will be used in a symmetric 
   // encryption layer inside the outer TLS encryption.
   PreSharedKeyPath string

   // These are timeouts for connection and transport tuning.
   // The defaults of 0 mean wait forever.
   ConnectTimeout time.Duration
   ReadTimeout    time.Duration
   WriteTimeout   time.Duration

   ...
   
   // This is not a Config option, but creating
   // the known_{server,client}_keys file on the client/server is
   // typically the last security measure in hardening.
   //
   // If known_client_keys exists in the server's directory,
   // then we will read from it.
   // Likewise, if known_server_keys exists in
   // the client's directory, then we will read from it.
   //
   // If the known keys file is read-only: Read-only
   // means we are in lockdown mode and no unknown
   // client certs will be accepted, even if they
   // have been properly signed by our CA.
   //
   // If the known keys file is writable then we are
   // Trust On First Use mode, and new remote parties
   // are recorded in the file if their certs are valid (signed
   // by us/our CA).
   //
   // Note if the known_client_keys is read-only, it
   // had better not be empty or nobody will be
   // able to contact us. The server will notice
   // this and crash since why bother being up.
   
}

The selfy tool: create new keys quickly; view certificates

Certificates are great, but making them has traditionally been a massive pain. That is why I wrote selfy.

The selfy command is an easy way to create private keys, certificates, and self-signed certficate authories. It is vastly more usable than the mountain of complexity that is openssl. It is more limited in scope. In our opinion, this a good thing.

If you are in a hurry to get started, the most basic use of selfy is to create a new ed25519 key-pair:

$ selfy -k name_of_your_identity -e your@email.here.org

With other tools you would need to already have a CA (Certificate Authority).

But we've got your back. If you lack a CA in the default directory (see -p below), then, for your convenience, a self-signed CA will be auto-generated for you. It will then be used to sign the new cert. Hence the above command is all that a typical developer needs to get started. Yay(!)

If you want to first create a CA manually, you can do that with the -ca flag. The -p flag will let you put it somewhere other than the default directory.

selfy -ca # make a new self-signed Certificate Authority

Your newly created cert can be viewed with the selfy -v flag, just give it the path to your .crt file.

$ selfy -v certs/name_of_your_identity.crt

By default, the CA is stored in the ./my-keep-private-dir/ca.{crt,key}, while the -k named identifying cert is stored in certs/name.crt. The corresponding private key is stored in certs/name.key.

Update: we have added pass-phrase protection to the private keys by default. In order to forgo this protection and use the original behavior, supply the selfy --nopass flag. A long salt and the Argon2id key-derivation-function are used to provide time and memory-hard protection against ASIC brute-force cracking attempts (see https://en.wikipedia.org/wiki/Argon2 https://datatracker.ietf.org/doc/html/rfc9106 ).

$ selfy -h

Usage of selfy:
  -ca
        create a new self-signed certificate authority (1st of 2 
        steps in making new certs). Written to the -p directory 
        (see selfy -p flag, where -p is for private).
        The CA uses ed25519 keys too.
        
  -e string
        email to write into the certificate (who to contact 
        about this job) (strongly encouraged when making 
        new certs! defaults to name@host if not given)
    
  -gensym string
        generate a new 32-byte symmetric encryption 
        key with crypto/rand, and save it under 
        this filename in the -p directory.
    
  -k string
        2nd of 2 steps: -k {key_name} ; create a new ed25519 
        key pair (key and cert), and save it to this name. 
        The pair will be saved under the -o directory; 
        we strongly suggest you also use the 
        -e your_email@actually.org flag to 
        describe the job and/or provide the owner's email 
        to contact when needed. A CA will be auto-generated 
        if none is found in the -p directory, which has 
        a default name which warns the user to protect it.

  -nopass
        by default we request a password and use 
        it with Argon2id to encrypt private key files (CA & identity). 
        Setting -nopass means we generate an un-encrypted 
        private key; this is not recommended.

  -o string
        directory to save newly created certs into. (default "certs")
        
  -p string
        directory to find the CA in. If doing -ca, we will save 
        the newly created Certificate Authority (CA) private 
        key to this directory. If doing -k to create a new key, 
        we'll look for the CA here. (default "my-keep-private-dir")  
        
  -v string
        path to cert to view. Similar output as the openssl 
        command: 'openssl x509 -in certs/client.crt  -text -noout', 
        which you could use instead; just replace certs/client.crt 
        with the path to your cert.

  -verify string
        verify this path is a certificate signed by the private key 
        corresponding to the -p {my-keep-private-dir}/ca.crt public key

The openssl commands in the included gen.sh script do the same things as selfy does, but it is more painful to incorporate an email because you have to modify the openssl-san.cnf file to do so each time.

generating keys with emails inside

See the selfy -e flag above for details.

Delightfully, email addresses can be stored in certificates!

This provides a fantastically convenient way to identify the job and/or the owner of the job. Importantly, if you are supporting work loads from many people, this can be critical in telling who needs to know when something goes wrong with a job.

A localhost by any other name would not smell as sweet

The server name will always be 'localhost' on selfy generated certificates, and this is critical to allowing our processes to run on any machine. By doing so, we leverage the SNI technology[1], to break the troublesome adhesion of IP address and certificate. SNI was developed to let a single IP address host many different web sites. We want multiple IP addresses to run many different jobs, and to be able to migrate jobs between hosts. Brilliantly, SNI lets us move our clients and servers between machines without having to re-issue the certificates. The certs are always issued to 'localhost' and clients always request ServerName 'localhost' in their tls.Config.ServerName field.

This is much more like the convenience and usability ssh. To me ssh has always been so much easier to deal with than certificates. rpc25519 aims for usability on par with ssh. Leveraging SNI is one way we get there.

[1] https://en.wikipedia.org/wiki/Server_Name_Indication ,


Author: Jason E. Aten, Ph.D.

License: See the LICENSE file for the 3-clause BSD-style license; the same as Go.

Documentation

Index

Constants

View Source
const (
	CallIDReadMap    = 0
	CallIDErrorMap   = 1
	ToPeerIDReadMap  = 2
	ToPeerIDErrorMap = 3
)

whichmap meanings for UnregisterChannel

View Source
const (
	// Defaults used by HandleHTTP
	DefaultRPCPath = "/_goRPC_"
)
View Source
const DefaultUseCompressAlgo = "s2" // see magic7.go
View Source
const DefaultUseCompression = true
View Source
const (
	UserMaxPayload = 1_200_000 // users should chunk to this size, to be safe.

)

Variables

View Source
var DebugVerboseCompress bool //= true
View Source
var ErrAlreadyDone = fmt.Errorf("Uploader has already been marked done. No more sending is allowed.")
View Source
var ErrCancelReqSent = fmt.Errorf("cancellation request sent")
View Source
var ErrContextCancelled = fmt.Errorf("context cancelled")
View Source
var ErrDone = fmt.Errorf("done channel closed")
View Source
var ErrHaltRequested = fmt.Errorf("halt requested")
View Source
var ErrHandshakeQUIC = fmt.Errorf("quic handshake failure")
View Source
var ErrMagicWrong = fmt.Errorf("error: magic bytes not found at start of message")
View Source
var ErrNetConnectionNotFound = fmt.Errorf("error in SendMessage: net.Conn not found")
View Source
var ErrNetRpcShutdown = errors.New("connection is shut down")

ErrNetRpcShutdown is from net/rpc, and still distinct from ErrShutdown to help locate when and where the error was generated. It indicates the system, or at least the network connection or stream, is closed or shutting down.

View Source
var ErrNotFound = fmt.Errorf("known_tls_hosts file not found")
View Source
var ErrShutdown2 = fmt.Errorf("shutting down")
View Source
var ErrTimeout = fmt.Errorf("time-out waiting for call to complete")
View Source
var ErrTooLarge = fmt.Errorf("error: length of payload JobSerz is over maxMessage - 1024(for header) = %v bytes, which is the limit.", maxMessage-1024)
View Source
var ErrTooLong = fmt.Errorf("message message too long: over 1MB; encrypted client vs an un-encrypted server?")
View Source
var ErrWrongCallTypeForSendMessage = fmt.Errorf("error in SendMessage: msg.HDR.Typ must be CallOneWay or CallUploadBegin; or greater in number")

Functions

func ContextWithHDR added in v1.2.7

func ContextWithHDR(ctx context.Context, hdr *HDR) context.Context

ContextWithHDR returns a new Context that carries value hdr.

func EchoBistreamFunc added in v1.5.0

func EchoBistreamFunc(
	srv *Server,
	ctx context.Context,
	req *Message,
	uploadsFromClientCh <-chan *Message,
	sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error,
	lastReply *Message,
) (err error)

Echo anything we get from the client back. srv uses to test bistreaming.

func ErrShutdown

func ErrShutdown() error

func GetCertsDir added in v1.7.0

func GetCertsDir() (path string)

GetCertsDir tells us where to generate/look for certificates and key pairs, including the a nodes private keys. It also creates the directory if it does not exist, and panics if it cannot.

Use $HOME/.config/rpc25519/certs to store keys now, so we can find them in one location. (Actually use $XDG_CONFIG_HOME/rpc25519/certs if XDG_CONFIG_HOME is set, but that is less common).

If we cannot find either of those, we use the current working directory.

We will panic if we cannot make this essential directory.

func GetPrivateCertificateAuthDir added in v1.7.0

func GetPrivateCertificateAuthDir() (path string)

GetPrivateCertificateAuthDir says where to store the CA master private key, which should typically not be distributed with the working node key-pairs. It also creates the directory if it does not exist, and panics if it cannot.

Use $HOME/.config/rpc25519/certs to store keys now, so we can find them in one location. (Actually use $XDG_CONFIG_HOME/rpc25519/certs/my-keep-private-dir if XDG_CONFIG_HOME is set, but that is less common).

If we cannot find either of those, we use the current working directory.

We will panic if we cannot make this essential directory.

func GoroNumber added in v1.1.43

func GoroNumber() int

GoroNumber returns the calling goroutine's number.

func IsNil added in v1.1.32

func IsNil(face interface{}) bool

IsNil uses reflect to to return true iff the face contains a nil pointer, map, array, slice, or channel.

func MinimalBistreamFunc added in v1.4.3

func MinimalBistreamFunc(
	srv *Server,
	ctx context.Context,
	req *Message,
	uploadsFromClientCh <-chan *Message,
	sendDownloadPartToClient func(ctx context.Context, by []byte, last bool) error,
	lastReply *Message,
) (err error)

MinimalBistreamFunc stands in contract to the fleshed on ServeBistream example above. It attempts to illustrate only the bare minimal needed to implement a BistreamFunc, so the user can glimpse its conceptual elegance.

It provides a starter template for writing your own.

Typically you might want to make it a method on a struct that provides it with other application dependent state and helper methods. But that's not minimal.

func NewCallID added in v1.3.0

func NewCallID() string

func NewChaCha20CryptoRandKey added in v1.0.114

func NewChaCha20CryptoRandKey() []byte

func ParsePeerURL added in v1.9.0

func ParsePeerURL(peerURL string) (netAddr, serviceName, peerID, circuitID string, err error)

func SelfyNewKey added in v1.0.3

func SelfyNewKey(createKeyPairNamed, odir string) error

SelfyNewKey is only for testing, not production. It is used by the tests to check that certs are signed by the expected CA.

SelfyNewKey will generate a self-signed certificate authority, a new ed25519 key pair, sign the public key to create a cert, and write these four new files to disk. The directories odir/my-keep-private-dir and odir/certs will be created, based on the odir argument. For a given createKeyPairNamed name, we will create odir/certs/name.crt and odir/certs/name.key files. The odir/certs/name.key and my-keep-private-dir/ca.key files contain private keys and should be kept confidential. The `selfy` command in this package can be used to produce the same keys but with password protection, which is recommended.

Types

type Args added in v1.1.0

type Args struct {
	A int `zid:"0"`
	B int `zid:"1"`
}

Args in example.go is part of the tests.

func (*Args) DecodeMsg added in v1.1.0

func (z *Args) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Args) EncodeMsg added in v1.1.0

func (z Args) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Args) MarshalMsg added in v1.1.0

func (z Args) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Args) Msgsize added in v1.1.0

func (z Args) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Args) UnmarshalMsg added in v1.1.0

func (z *Args) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Args) UnmarshalMsgWithCfg added in v1.1.0

func (z *Args) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Arith added in v1.1.0

type Arith int

Arith in example.go is part of the tests.

func (*Arith) Add added in v1.1.0

func (t *Arith) Add(args Args, reply *Reply) error

Arith.Add in example.go is part of the tests.

func (*Arith) DecodeMsg added in v1.1.0

func (z *Arith) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*Arith) Div added in v1.1.0

func (t *Arith) Div(args Args, reply *Reply) error

Arith.Div in example.go is part of the tests.

func (Arith) EncodeMsg added in v1.1.0

func (z Arith) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Arith) Error added in v1.1.0

func (t *Arith) Error(args *Args, reply *Reply) error

Arith.Error in example.go is part of the tests.

func (Arith) MarshalMsg added in v1.1.0

func (z Arith) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Arith) Msgsize added in v1.1.0

func (z Arith) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Arith) Mul added in v1.1.0

func (t *Arith) Mul(args *Args, reply *Reply) error

Arith.Mul in example.go is part of the tests.

func (*Arith) Scan added in v1.1.0

func (t *Arith) Scan(args string, reply *Reply) (err error)

Arith.Scan in example.go is part of the tests.

func (*Arith) SleepMilli added in v1.1.0

func (t *Arith) SleepMilli(args *Args, reply *Reply) error

Arith.SleepMilli in example.go is part of the tests.

func (*Arith) String added in v1.1.0

func (t *Arith) String(args *Args, reply *string) error

Arith.String in example.go is part of the tests.

func (*Arith) UnmarshalMsg added in v1.1.0

func (z *Arith) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Arith) UnmarshalMsgWithCfg added in v1.1.0

func (z *Arith) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type BenchmarkMessage added in v1.1.30

type BenchmarkMessage struct {
	Field1   string   `zid:"0"`
	Field9   string   `zid:"1"`
	Field18  string   `zid:"2"`
	Field80  bool     `zid:"3"`
	Field81  bool     `zid:"4"`
	Field2   int32    `zid:"5"`
	Field3   int32    `zid:"6"`
	Field280 int32    `zid:"7"`
	Field6   int32    `zid:"8"`
	Field22  int64    `zid:"9"`
	Field4   string   `zid:"10"`
	Field5   []uint64 `zid:"11"`
	Field59  bool     `zid:"12"`
	Field7   string   `zid:"13"`
	Field16  int32    `zid:"14"`
	Field130 int32    `zid:"15"`
	Field12  bool     `zid:"16"`
	Field17  bool     `zid:"17"`
	Field13  bool     `zid:"18"`
	Field14  bool     `zid:"19"`
	Field104 int32    `zid:"20"`
	Field100 int32    `zid:"21"`
	Field101 int32    `zid:"22"`
	Field102 string   `zid:"23"`
	Field103 string   `zid:"24"`
	Field29  int32    `zid:"25"`
	Field30  bool     `zid:"26"`
	Field60  int32    `zid:"27"`
	Field271 int32    `zid:"28"`
	Field272 int32    `zid:"29"`
	Field150 int32    `zid:"30"`
	Field23  int32    `zid:"31"`
	Field24  bool     `zid:"32"`
	Field25  int32    `zid:"33"`
	Field78  bool     `zid:"34"`
	Field67  int32    `zid:"35"`
	Field68  int32    `zid:"36"`
	Field128 int32    `zid:"37"`
	Field129 string   `zid:"38"`
	Field131 int32    `zid:"39"`
}

BenchmarkMessage in example.go is part of the tests and benchmarks.

func (*BenchmarkMessage) DecodeMsg added in v1.1.30

func (z *BenchmarkMessage) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*BenchmarkMessage) EncodeMsg added in v1.1.30

func (z *BenchmarkMessage) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*BenchmarkMessage) MarshalMsg added in v1.1.30

func (z *BenchmarkMessage) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*BenchmarkMessage) Msgsize added in v1.1.30

func (z *BenchmarkMessage) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*BenchmarkMessage) UnmarshalMsg added in v1.1.30

func (z *BenchmarkMessage) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*BenchmarkMessage) UnmarshalMsgWithCfg added in v1.1.30

func (z *BenchmarkMessage) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type BistreamFunc added in v1.3.0

type BistreamFunc func(
	srv *Server,
	ctx context.Context,
	req *Message,
	uploadsFromClientCh <-chan *Message,
	sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error,
	lastReply *Message,
) (err error)

BistreamFunc aims to allow the user to implement server operations with full generality; it provies for uploads and downloads to the originating client, and for communication with other clients. Use Server.RegisterBistreamFunc() to register your BistreamFunc under a name. The BistreamFunc and its siblings the ServerSendsDownloadFunc and the UploadReaderFunc are only available for the Message based API; not in the net/rpc API.

On the client side, the Client.RequestBistreaming() call is used to create a Bistreamer that will call the BistreamFunc by its registered name (the name that Server.RegisterBistreamFunc() was called with).

In a BistreamFunc on the server, the full generality of interleaving upload and download handling is available. The initial Message in req will also be the first Message in the req.HDR.UploadsCh which receives all upload messages from the client.

To note, it may be more convenient for the user to use an UploadReaderFunc or ServerSendsDownloadFunc if the full generality of the BistreamFunc is not needed. For simplicity, the Server.RegisterServerSendsDownloadFunc() is used to register your ServerSendsDownloadFunc. Server.RegisterUploadReaderFunc() is used to register you UploadReaderFunc. Note in particular that the UploadReaderFunc is not persistent but rather receives a callback per Message received from the Client.Uploader. This may simplify the implementation of your server-side upload function. Note that persistent state between messages is still available by registering a method on your struct; see the ServerSideUploadFunc struct in example.go for example.

BistreamFunc, in contrast, are not a callback-per-message, but rather persist and would typically only exit if ctx.Done() is received, or if it wishes to finish the operation (say on an error, or by noting that a CallUploadEnd type Message has been received) so as to save goroutine resources on the server. The BistreamFunc is started on the server when the CallRequestBistreaming Message.HDR.Typ is received with the ServiceName matching the registered name. Each live instance of the BistreamFunc is identified by the req.HDR.CallID set by the originating client. All download messages sent will have this same CallID on them (for the client to match).

When the BistreamFunc finishes (returns), a final message will of type CallRPCReply will be sent back to the client. This is the lastReply *Message provided in the BistreamFunc. The BistreamFunc should fill in this lastReply with any final JobSerz payload it wishes to send; this is optional. On the client side, the Client.RequestBistreaming() is used to start bi-streaming. It returns a Bistreamer. This Bistreamer has a ReadCh that will receive this final message (as well as all other download messages). See the cli_test.go Test065_bidirectional_download_and_upload for example use.

A BistreamFunc is run on its own goroutine. It can start new goroutines, if it wishes, but this is not required. An additional (new) goroutine may be useful to reduce the latency of message handling while simultaneously reading from req.HDR.UploadsCh for uploads and writing to downloads with sendDownloadPartToClient(), as both of these are blocking, synchronous, operations. If you do so, be sure to handle goroutine cancellation and cleanup if the ctx provided is cancelled.

The sendDownloadPartToClient() helper function is used to write download Messages. It properly assigns the HDR.StreamPart sequence numbers and HDR.Typ as one of CallDownloadBegin, CallDownloadMore, and CallDownloadEnd). The BistreamFunc should call sendDownloadPartToClient() with last=true to signal the end of the download, in which case HDR.Typ CallDownloadEnd will be set on the sent Message.

To provide back-pressure by default, the sendDownloadPartToClient() call is synchronous and will return only when the message is sent. If you wish to continue to process uploads while sending a download part, your BistreamFunc can call the provided sendDownloadPartToClient() in a goroutine that you start for this purpose. The sendDownloadPartToClient() call is goroutine safe, as it uses its own internal sync.Mutex to ensure only one send is in progress at a time.

A BistreamFunc by default communicates download messages to its originating client. However other clients can also be sent messages. The Server.SendOneWayMessage() and Server.SendMessage() operations on the Server can be used for this purpose.

Visit the example.go implementation of ServeBistreamState.ServeBistream() to see it in action.

type Bistreamer added in v1.3.0

type Bistreamer struct {
	ReadDownloadsCh <-chan *Message
	WriteCh         chan<- *Message
	ErrorCh         <-chan *Message
	// contains filtered or unexported fields
}

Bistreamer is the client side handle to talking with a server func that does bistreaming: the client can stream to the server func, and the server func can, symmetrically, stream to the client. The basics of TCP are finally available to users.

func (*Bistreamer) Begin added in v1.5.0

func (b *Bistreamer) Begin(ctx context.Context, req *Message) (err error)

func (*Bistreamer) CallID added in v1.3.0

func (s *Bistreamer) CallID() string

func (*Bistreamer) Close added in v1.5.0

func (b *Bistreamer) Close()

cancel any outstanding call

func (*Bistreamer) Name added in v1.3.0

func (s *Bistreamer) Name() string

func (*Bistreamer) Seqno added in v1.3.0

func (s *Bistreamer) Seqno() uint64

func (*Bistreamer) UploadMore added in v1.3.0

func (s *Bistreamer) UploadMore(ctx context.Context, msg *Message, last bool) (err error)

type BuiltinTypes added in v1.1.0

type BuiltinTypes struct {
	Placeholder int `zid:"0"` // greenpack refuses to serialize an empty struct.
}

BuiltinTypes in example.go is part of the tests.

func (BuiltinTypes) Array added in v1.1.0

func (BuiltinTypes) Array(args *Args, reply *[2]int) error

BuiltinTypes.Array in example.go is part of the tests.

func (*BuiltinTypes) DecodeMsg added in v1.1.0

func (z *BuiltinTypes) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (BuiltinTypes) EncodeMsg added in v1.1.0

func (z BuiltinTypes) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (BuiltinTypes) Map added in v1.1.0

func (BuiltinTypes) Map(args *Args, reply *map[int]int) error

BuiltinTypes.Map in example.go is part of the tests.

func (BuiltinTypes) MarshalMsg added in v1.1.0

func (z BuiltinTypes) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (BuiltinTypes) Msgsize added in v1.1.0

func (z BuiltinTypes) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (BuiltinTypes) Slice added in v1.1.0

func (BuiltinTypes) Slice(args *Args, reply *[]int) error

BuiltinTypes.Slice in example.go is part of the tests.

func (*BuiltinTypes) UnmarshalMsg added in v1.1.0

func (z *BuiltinTypes) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*BuiltinTypes) UnmarshalMsgWithCfg added in v1.1.0

func (z *BuiltinTypes) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

func (BuiltinTypes) WantsContext added in v1.1.0

func (BuiltinTypes) WantsContext(ctx context.Context, args *Args, reply *[2]int) error

BuiltinTypes.WantsContext in example.go is part of the tests. Here, mimic Array's reply.

type Call added in v1.0.42

type Call struct {
	ServiceMethod string     // The name of the service and method to call.
	Args          Green      // The argument to the function (*struct).
	Reply         Green      // The reply from the function (*struct).
	Error         error      // After completion, the error status.
	Done          chan *Call // Receives *Call when Go is complete.
}

Call represents an active net/rpc RPC.

type CallType added in v1.1.29

type CallType int
const (
	CallNone CallType = 0

	CallRPC    CallType = 1
	CallNetRPC CallType = 2

	CallRequestBistreaming CallType = 3

	// All type numbers >= 100 are one-way calls.
	CallOneWay         CallType = 100
	CallRPCReply       CallType = 101
	CallKeepAlive      CallType = 102
	CallCancelPrevious CallType = 103
	CallError          CallType = 104 // we could not complete a request

	// client sends a stream to the server, in an Upload:
	CallUploadBegin CallType = 105 // one of these; and
	CallUploadMore  CallType = 106 // possibly many of these; and
	CallUploadEnd   CallType = 107 // just one of these to finish.

	// the opposite: when client wants to get a stream
	// from the server.
	CallRequestDownload CallType = 108

	// The server responds to CallRequestDownload with
	CallDownloadBegin CallType = 109 // one of these to start;
	CallDownloadMore  CallType = 110 // possibly many of these;
	CallDownloadEnd   CallType = 111 // and one of these to finish.

	// try to keep all peer traffic isolated
	// and only using these:
	CallPeerStart          CallType = 112
	CallPeerStartCircuit   CallType = 113
	CallPeerTraffic        CallType = 114
	CallPeerError          CallType = 115
	CallPeerFromIsShutdown CallType = 116
	CallPeerEndCircuit     CallType = 117
)

func (*CallType) DecodeMsg added in v1.1.29

func (z *CallType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (CallType) EncodeMsg added in v1.1.29

func (z CallType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (CallType) MarshalMsg added in v1.1.29

func (z CallType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (CallType) Msgsize added in v1.1.29

func (z CallType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (CallType) String added in v1.3.0

func (ct CallType) String() string

func (*CallType) UnmarshalMsg added in v1.1.29

func (z *CallType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*CallType) UnmarshalMsgWithCfg added in v1.1.29

func (z *CallType) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Circuit added in v1.7.0

type Circuit struct {
	LpbFrom *LocalPeer
	RpbTo   *RemotePeer

	LocalPeerID  string
	RemotePeerID string

	LocalServiceName  string
	RemoteServiceName string

	CircuitID string // aka Message.HDR.CallID
	Ctx       context.Context
	Canc      context.CancelFunc

	Name   string
	Reads  chan *Fragment // users should treat as read-only.
	Errors chan *Fragment // ditto.

	Halt *idem.Halter
}

Circuit is a handle to the two-way, asynchronous, communication channel between two Peers.

It is returned from RemotePeer.NewCircuit(), or from LocalPeer.NewCircuitToPeerURL().

~~~

Peer/Circuit/Fragment Design overview

Motivated by filesystem syncing, we envision a system that can both stream efficiently and utilize the same code on the client as on the server.

Syncing a filesystem needs efficient stream transmission. The total data far exceeds what will fit in any single message, and updates may be continuous or lumpy. We don't want to wait for one "call" to finish its round trip. We just want to send data when we have it. Hence the API is based on one-way messages and is asynchronous in that the methods and channels involved do not wait for network round trips to complete.

Once established, a circuit between peers is designed to persist until deliberately closed. A circuit can then handle any number of Fragments of data during its lifetime.

To organize communications, a peer can maintain multiple circuits, either with the same peer or with any number of other peers. We can then easily handle any arbitrary network topology.

Even between just two peers, multiple persistent channels facilities code organization. One could use a channel per file being synced, for instance. Multiple large files being scanned and their diffs streamed at once, in parallel, becomes practical.

By using lightweight goroutines and channels, circuit persistence is inexpensive and supports any number of data streams with markedly different lifetimes and update rates, over long periods.

Symmetry of code deployment is also a natural requirement. This is the git model. When syncing two repositories, the operations needed are the same on both sides, no matter who initiated or whether a push or pull was requested. Hence we want a way to register the same functionality on the client as on the server. This is not available in a typical RPC package.

Peer/Circuit/Fragment API essentials (utility methods omitted for compactness)

A) To establish circuits with new peers, use

  1. NewCircuitToPeerURL() for initiating a new circuit to a new peer.
  2. <-newPeerCh to recieve new initiations; then use the IncomingCircuit() method to get the Circuit.

B) To create additional circuits with an already connected peer:

  1. NewCircuit adds a new circuit with an existing RemotePeer, no URL needed.
  2. They get notified on <-newPeerCh too. (verify)

C) To communicate over a Circuit:

  1. get regular messages (called Fragments) from <-Circuit.Reads
  2. get error messages from <-Circuit.Errors
  3. send messages with SendOneWay(). It never blocks.
  4. Close() the circuit and the peer's ctx will be cancelled. (verify)
type Circuit struct {
	Reads  <-chan *Fragment
	Errors <-chan *Fragment
    Close() // when done
}
type LocalPeer interface {
	NewCircuitToPeerURL(peerURL string, frag *Fragment,
         errWriteDur *time.Duration) (ckt *Circuit, ctx context.Context, err error)
}
type RemotePeer interface {
	IncomingCircuit() (ckt *Circuit, ctx context.Context, err error) // gets the first.
	NewCircuit()      (ckt *Circuit, ctx context.Context, err error) // make 2nd, 3rd...
	SendOneWay(ckt *Circuit, frag *Fragment, errWriteDur *time.Duration) error
}

type PeerServiceFunc func(myPeer LocalPeer, ctx0 context.Context, newPeerCh <-chan RemotePeer) error

type Fragment struct {
           // system metadata
	  FromPeerID string
	    ToPeerID string
	   CircuitID string
	      Serial int64
	         Typ CallType
	 ServiceName string

           // user supplied data
          FragOp int
	 FragSubject string
	    FragPart int64
	        Args map[string]string
	     Payload []byte
	         Err string
}

D) boostrapping: registering your Peer implemenation and starting

 them up (from outside the PeerServiceFunc callback). The PeerAPI
 is available via Client.PeerAPI or Server.PeerAPI.
 The same facilities are available to peers running on either.

1. register:

   PeerAPI.RegisterPeerServiceFunc(peerServiceName string, peer PeerServiceFunc) error

2. start a previously registered PeerServiceFunc locally or remotely:

       PeerAPI.StartLocalPeer(
                   ctx context.Context,
       peerServiceName string) (lp *LocalPeer, err error)

   Starting a remote peer must also specify the host:port remoteAddr
   of the remote client/server. The user can call the RemoteAddr() and
   LocalAddr() methods on the Client/Server to obtain these.

       PeerAPI.StartRemotePeer(
                    ctx context.Context,
        peerServiceName string,
             remoteAddr string, // host:port
               waitUpTo time.Duration,
                           ) (remotePeerURL, remotePeerID string, err error)

    The returned URLs can be used in myPeer.NewCircuitToPeerURL() calls
    inside the PeerServiceFunc.

~~~

func (*Circuit) Close added in v1.7.0

func (h *Circuit) Close()

Close must be called on a Circuit to release resources when you are done with it.

func (*Circuit) ConvertFragmentToMessage added in v1.9.0

func (ckt *Circuit) ConvertFragmentToMessage(frag *Fragment) (msg *Message)

ConvertFragmentToMessage creates outgoing messages from the LocalPeer over the Circuit. If frag.{ToPeerID,FromPeerID,CircuitID} are not set on frag, they will be filled in from the ckt.

func (*Circuit) ConvertMessageToFragment added in v1.9.0

func (ckt *Circuit) ConvertMessageToFragment(msg *Message) (frag *Fragment)

incoming

func (*Circuit) ID2 added in v1.7.0

func (ckt *Circuit) ID2() (LocalPeerID, RemotePeerID string)

ID2 supplies the local and remote PeerIDs.

func (*Circuit) IsClosed added in v1.9.0

func (ckt *Circuit) IsClosed() bool

func (*Circuit) LocalCircuitURL added in v1.7.0

func (ckt *Circuit) LocalCircuitURL() string

CircuitURL format: tcp://x.x.x.x:port/peerServiceName/peerID/circuitID where peerID and circuitID (same as our CallID type), and are base64 URL encoded. The IDs do not include the '/' character, and thus are "URL safe".

(CircuitID is the CallID in the Message.HDR)

func (*Circuit) RemoteCircuitURL added in v1.7.0

func (ckt *Circuit) RemoteCircuitURL() string

type Client

type Client struct {
	PeerAPI *peerAPI // must be Exported to users!
	// contains filtered or unexported fields
}

A Client starts requests, and (might) wait for responses.

func NewClient

func NewClient(name string, config *Config) (c *Client, err error)

NewClient creates a new client. Call Start() to begin a connection. The name setting allows users to track multiple instances of Clients, and the Client.Name() method will retreive it.

func (*Client) Call added in v1.0.42

func (c *Client) Call(serviceMethod string, args, reply Green, octx context.Context) error

Call implements the net/rpc Client.Call() API; its docs:

Call invokes the named function, waits for it to complete, and returns its error status.

Added: octx is an optional context for cancelling the job. It can be nil.

func (*Client) Close

func (c *Client) Close() error

Close shuts down the Client.

func (*Client) Err

func (c *Client) Err() error

Err returns any Client stored error.

func (*Client) GetErrorChForCallID added in v1.5.1

func (c *Client) GetErrorChForCallID(callID string) (ch chan *Message)

func (*Client) GetErrorsForCallID added in v1.5.1

func (c *Client) GetErrorsForCallID(ch chan *Message, callID string)

func (*Client) GetErrorsForToPeerID added in v1.7.0

func (s *Client) GetErrorsForToPeerID(ch chan *Message, objID string)

func (*Client) GetOneRead

func (c *Client) GetOneRead(seqno uint64, ch *loquet.Chan[Message])

GetOneRead responds on ch with the first incoming message whose Seqno matches seqno, then auto unregisters itself after that single send on ch.

func (*Client) GetReadIncomingCh

func (c *Client) GetReadIncomingCh() (ch chan *Message)

GetReadIncomingCh creates and returns a buffered channel that reads incoming messages that are server-pushed (not associated with a round-trip rpc call request/response pair).

func (*Client) GetReadIncomingChForCallID added in v1.3.0

func (c *Client) GetReadIncomingChForCallID(callID string) (ch chan *Message)

GetReadIncomingChForCallID creates and returns a buffered channel that reads incoming messages that are server-pushed (not associated with a round-trip rpc call request/response pair). It filters for those with callID.

func (*Client) GetReads

func (c *Client) GetReads(ch chan *Message)

GetReads registers to get any received messages on ch. It is similar to GetReadIncomingCh but for when ch already exists and you do not want a new one.

func (*Client) GetReadsForCallID added in v1.3.0

func (c *Client) GetReadsForCallID(ch chan *Message, callID string)

GetReads registers to get any received messages on ch. It is similar to GetReadIncomingCh but for when ch already exists and you do not want a new one. It filters for CallID

func (*Client) GetReadsForToPeerID added in v1.7.0

func (s *Client) GetReadsForToPeerID(ch chan *Message, objID string)

func (*Client) Go added in v1.0.42

func (c *Client) Go(serviceMethod string, args Green, reply Green, done chan *Call, octx context.Context) *Call

Go implements the net/rpc Client.Go() API; its docs:

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

octx is an optional context, for early cancelling of a job. It can be nil.

func (*Client) IsDown added in v1.0.138

func (c *Client) IsDown() (down bool)

func (*Client) LocalAddr

func (c *Client) LocalAddr() string

LocalAddr retreives the local host/port that the Client is calling from.

func (*Client) Name added in v1.0.39

func (c *Client) Name() string

Name reports the name the Client was created with.

func (*Client) NewBistreamer added in v1.5.0

func (c *Client) NewBistreamer(bistreamerName string) (b *Bistreamer, err error)

NewBistream creates a new Bistreamer but does no communication yet. Just for setting up reader/writer goroutines before everything starts.

func (*Client) NewDownloader added in v1.6.0

func (c *Client) NewDownloader(ctx context.Context, streamerName string) (downloader *Downloader, err error)

func (*Client) OneWaySend

func (c *Client) OneWaySend(msg *Message, cancelJobCh <-chan struct{}) (err error)

OneWaySend sends a message without expecting or waiting for a response. The cancelJobCh is optional, and can be nil. If msg.HDR.CallID is set, we will preserve it.

func (*Client) RemoteAddr added in v1.7.0

func (c *Client) RemoteAddr() string

RemoteAddr retreives the remote host/port for the Server that the Client is connected to.

func (*Client) RequestBistreaming added in v1.3.0

func (c *Client) RequestBistreaming(ctx context.Context, bistreamerName string, req *Message) (b *Bistreamer, err error)

func (*Client) RequestDownload added in v1.3.0

func (c *Client) RequestDownload(ctx context.Context, streamerName, path string) (downloader *Downloader, err error)

func (*Client) SendAndGetReply

func (c *Client) SendAndGetReply(req *Message, cancelJobCh <-chan struct{}) (reply *Message, err error)

SendAndGetReply starts a round-trip RPC call. We will wait for a response before returning. The requestStopCh is optional; it can be nil. A context.Done() like channel can be supplied there to cancel the job before a reply comes back.

UPDATE: a DEFAULT timeout is in force now. Because server failure or blink (down then up) can leave us stalled forever, we put in a default timeout of 10 seconds, if not otherwise specified. If you expect your call to take more than a few seconds, you should set the timeout directly with SendAndGetReplyWithTimeout() or pass in a cancelJobCh here to manage it. Otherwise, to handle the common case when we expect very fast replies, if cancelJobCh is nil, we will cancel the job if it has not finished after 10 seconds.

func (*Client) SendAndGetReplyWithCtx added in v1.2.4

func (c *Client) SendAndGetReplyWithCtx(ctx context.Context, req *Message) (reply *Message, err error)

SendAndGetReplyWithCtx is like SendAndGetReply(), with the additional feature that it will send a remote cancellation request when the ctx is cancelled or if the ctx.Deadline() time (if set) is surpassed. Note that none of the Values inside ctx, if set, will be transmitted to the remote call, because the context.Context API provides no method to enumerate them. Such values are likely not serializable in any case.

A similar deadline effect can be acheived just by setting the req.HDR.Deadline field in a SendAndGetReply() call. This may also be more efficient on the client, because the client need not wait for the remote cancellation response to be sent and received. However this is a little racey: the server could suceed and be in the process of replying when the client hits the deadline. In this case the client might retry a call that actually did finish, and end up doing the call twice. This is also always a hazard with servers crashing before they can finish responding. Ideally server APIs are idempotent to guard against this.

If the req.HDR.Deadline is already set (not zero), then we do not touch it. If it is zero and ctx has a deadline, we set it as the req.HDR.Deadline. We leave it to the user to coordinate/update these two ways of setting a dealine, knowing that the req.HDR.Deadline will win, if set.

func (*Client) SendAndGetReplyWithTimeout

func (c *Client) SendAndGetReplyWithTimeout(timeout time.Duration, req *Message) (reply *Message, err error)

SendAndGetReplyWithTimeout expires the call after timeout.

func (*Client) SendOneWayMessage added in v1.7.0

func (cli *Client) SendOneWayMessage(ctx context.Context, msg *Message, errWriteDur time.Duration) error

for symmetry: see srv.go for details, under the same func name.

SendOneWayMessage only sets msg.HDR.From to its correct value.

func (*Client) Start added in v1.1.15

func (c *Client) Start() error

Start dials the server. That is, Start attemps to connect to config.ClientDialToHostPort. The err will come back with any problems encountered.

func (*Client) UngetOneRead added in v1.5.3

func (c *Client) UngetOneRead(seqno uint64, ch *loquet.Chan[Message])

func (*Client) UngetReads

func (c *Client) UngetReads(ch chan *Message)

UngetReads reverses what GetReads does: un-register and have ch be deaf from now on. Idempotent: if ch is already gone, no foul is reported.

func (*Client) UnregisterChannel added in v1.7.0

func (s *Client) UnregisterChannel(ID string, whichmap int)

func (*Client) UploadBegin added in v1.3.0

func (c *Client) UploadBegin(
	ctx context.Context,
	serviceName string,
	msg *Message,

) (strm *Uploader, err error)

UploadBegin sends the msg to the server to execute with the func that has registed with RegisterUploaderReadererFunc() -- at the moment there can only be one such func registered at a time. UploadBegin() will contact it, and Uploader.UploadMore() will, as it suggests, send another Message.

We maintain FIFO arrival of Messages at the server as follows (despite having each server side func callback executing in a goroutine).

1. Since the client side uses the same channel into the send loop for both UploadBegin and UploadMore, these calls will be properly ordered into the send loop on the client/sending side.

2. The TCP/QUIC stream maintains FIFO order of its messages as it delivers them to the server.

3. On the server, in the TCP/QUIC read loop, we queue messages in FIFO order into a large buffered channel before we spin up a goroutine once at UploadBegin time to handle all the subsequent messages in the order they were queued.

This also yeilds an efficient design. While normal OneWayFunc and TwoWayFunc messages each start their own new goroutine to avoid long-running functions starving the server's read loop, a UploadReadFunc only utilizes a single new goroutine to process all messages sent in a stream.

type ClientCodec added in v1.0.42

type ClientCodec interface {
	WriteRequest(*Request, Green) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(Green) error

	Close() error
}

ClientCodec is part of the net/rpc API. Its docs:

A ClientCodec implements writing of RPC requests and reading of RPC responses for the client side of an RPC session. The client calls [ClientCodec.WriteRequest] to write a request to the connection and calls [ClientCodec.ReadResponseHeader] and [ClientCodec.ReadResponseBody] in pairs to read responses. The client calls [ClientCodec.Close] when finished with the connection. ReadResponseBody may be called with a nil argument to force the body of the response to be read and then discarded. See NewClient's comment for information about concurrent access.

type Config

type Config struct {

	// ServerAddr host:port where the server should listen.
	ServerAddr string

	// optional. Can be used to suggest that the
	// client use a specific host:port. NB: For QUIC, by default, the client and
	// server will share the same port if they are in the same process.
	// In that case this setting will definitely be ignored.
	ClientHostPort string

	// Who the client should contact
	ClientDialToHostPort string

	// TCP false means TLS-1.3 secured. true here means do TCP only; with no encryption.
	TCPonly_no_TLS bool

	// UseQUIC cannot be true if TCPonly_no_TLS is true.
	UseQUIC bool

	// NoSharePortQUIC defaults false so sharing is allowed.
	// If true, then we do not share same UDP port between a QUIC
	// client and server (in the same process). Used
	// for testing client shutdown paths too.
	NoSharePortQUIC bool

	// path to certs/ like certificate
	// directory on the live filesystem.
	// defaults to GetCertsDir(); see config.go.
	CertPath string

	// SkipVerifyKeys true allows any incoming
	// key to be signed by
	// any CA; it does not have to be ours. Obviously
	// this discards almost all access control; it
	// should rarely be used unless communication
	// with the any random agent/hacker/public person
	// is desired.
	SkipVerifyKeys bool

	ClientKeyPairName string // default "client" means use certs/client.crt and certs/client.key
	ServerKeyPairName string // default "node" means use certs/node.crt and certs/node.key

	// PreSharedKeyPath locates an optional pre-shared
	// key. It must be 32 bytes (or more). Ideally
	// it should be generated from crypto/rand.
	// The `selfy -gensym outpath` command will
	// do this, writing 32 cryptographically random
	// bytes to output.
	PreSharedKeyPath string

	// These are timeouts for connection and transport tuning.
	// The defaults of 0 mean wait forever.
	//
	// Generally we want our send loops to wait forever because
	// if the cut off a send mid-message, it is hard to recover;
	// we don't pass back up the stack how much of the broken
	// message was sent, so the only thing we can do then is tear
	// down the connection pair and re-connect. It is much
	// better to just dedicate the sendLoops to writing for as
	// long as it takes than to set a WriteTimeout.
	ConnectTimeout time.Duration
	ReadTimeout    time.Duration
	WriteTimeout   time.Duration

	ServerSendKeepAlive time.Duration
	ClientSendKeepAlive time.Duration

	// CompressAlgo choices are in magic7.go;
	// The current choices are "" (default compression, "s2" at the moment),
	// or: "s2" (Klaus Post's faster SnappyV2, good for incompressibles);
	// "lz4", (a very fast compressor; see https://lz4.org/);
	// "zstd:01" (fastest setting for Zstandard, very little compression);
	// "zstd:03", (the Zstandard 'default' level; slower but more compression);
	// "zstd:07", (even more compression, even slower);
	// "zstd:11", (slowest version of Zstandard, the most compression).
	//
	// Note! empty string means we use DefaultUseCompressAlgo
	// (at the top of cli.go), which is currently "s2".
	// To turn off compression, you must use the
	// CompressionOff setting.
	CompressAlgo   string
	CompressionOff bool

	// Intially speak HTTP and only
	// accept CONNECT requests that
	// we turn into our protocol.
	// Only works with TCPonly_no_TLS true also,
	// at the moment. Also adds on another
	// round trip.
	HTTPConnectRequired bool
	// contains filtered or unexported fields
}

Config is the same struct type for both NewClient and NewServer setup.

Config says who to contact (for a client), or where to listen (for a server and/or client); and sets how strong a security posture we adopt.

Copying a Config is fine, but it should be a simple shallow copy to preserve the shared *sharedTransport struct. See/use the Config.Clone() method if in doubt.

nitty gritty details/dev note: the `shared` pointer here is the basis of port (and file handle) reuse where a single process can maintain a server and multiple clients in a "star" pattern. This only works with QUIC of course, and is one of the main reasons to use QUIC.

The shared pointer is reference counted and the underlying net.UDPConn is only closed when the last instance in use is Close()-ed.

func NewConfig added in v1.0.14

func NewConfig() *Config

NewConfig should be used to create Config for use in NewClient or NewServer setup.

func (*Config) Clone added in v1.0.72

func (cfg *Config) Clone() *Config

Clone returns a copy of cfg. This is a shallow copy to enable shared transport between a QUIC client and a QUIC server on the same port.

type Downloader added in v1.3.0

type Downloader struct {
	ReadDownloadsCh <-chan *Message
	ErrorCh         <-chan *Message
	// contains filtered or unexported fields
}

Downloader is used when the client receives stream from server. It is returned by RequestDownload() or NewDownloader().

func (*Downloader) BeginDownload added in v1.6.0

func (b *Downloader) BeginDownload(ctx context.Context, path string) (err error)

func (*Downloader) CallID added in v1.3.0

func (s *Downloader) CallID() string

func (*Downloader) Close added in v1.6.0

func (s *Downloader) Close()

cancel any outstanding call

func (*Downloader) Name added in v1.3.0

func (s *Downloader) Name() string

func (*Downloader) Seqno added in v1.3.0

func (s *Downloader) Seqno() uint64

type Embed added in v1.1.0

type Embed struct {
	Simple `zid:"0"`
}

Embed in example.go is part of the tests.

func (*Embed) DecodeMsg added in v1.1.0

func (z *Embed) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Embed) EncodeMsg added in v1.1.0

func (z Embed) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Embed) MarshalMsg added in v1.1.0

func (z Embed) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Embed) Msgsize added in v1.1.0

func (z Embed) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Embed) UnmarshalMsg added in v1.1.0

func (z *Embed) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Embed) UnmarshalMsgWithCfg added in v1.1.0

func (z *Embed) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Fragment added in v1.7.0

type Fragment struct {
	// system metadata
	FromPeerID  string   `zid:"0"` // who sent us this Fragment.
	ToPeerID    string   `zid:"1"`
	CircuitID   string   `zid:"2"` // maps to Message.HDR.CallID.
	Serial      int64    `zid:"3"`
	Typ         CallType `zid:"4"` // one of the CallPeer CallTypes of hdr.go
	ServiceName string   `zid:"5"` // the registered PeerServiceName.

	// user supplied data
	FragOp      int               `zid:"6"`
	FragSubject string            `zid:"7"`
	FragPart    int64             `zid:"8"`
	Args        map[string]string `zid:"9"` // nil by default; make() it if you need it.
	Payload     []byte            `zid:"10"`
	Err         string            `zid:"11"` // distinguished field for error messages.
}

Fragments are sent to, and read from, a Circuit by implementers of PeerServiceFunc. They are a simplified version of the underlying Message infrastructure.

Note the first three fields are set by the sending machinery; any user settings will be overridden for FromPeerID, ToPeerID, and CircuitID.

func NewFragment added in v1.7.0

func NewFragment() *Fragment

func (*Fragment) DecodeMsg added in v1.7.0

func (z *Fragment) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*Fragment) EncodeMsg added in v1.7.0

func (z *Fragment) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Fragment) MarshalMsg added in v1.7.0

func (z *Fragment) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Fragment) Msgsize added in v1.7.0

func (z *Fragment) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Fragment) String added in v1.7.0

func (f *Fragment) String() string

func (*Fragment) ToMessage added in v1.7.0

func (frag *Fragment) ToMessage() (msg *Message)

func (*Fragment) UnmarshalMsg added in v1.7.0

func (z *Fragment) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Fragment) UnmarshalMsgWithCfg added in v1.7.0

func (z *Fragment) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Green added in v1.1.0

type Green interface {
	// avoid EncodeMsg using until the
	// pointer dedup greenpack issue can be fixed.
	// Also it makes our greenpack use msgpack
	// extensions that are far from universal;
	// so we may just leave it out.
	//msgp.Encodable
	msgp.Decodable
	msgp.Marshaler
	msgp.Unmarshaler
}

type HDR added in v1.0.42

type HDR struct {

	// Nc is supplied to reveal the LocalAddr() or RemoteAddr() end points.
	// Do not read from, or write to, this connection;
	// that will cause the RPC connection to fail.
	Nc net.Conn `msg:"-"`

	Created time.Time `zid:"0"` // HDR creation time stamp.
	From    string    `zid:"1"` // originator host:port address.
	To      string    `zid:"2"` // destination host:port address.

	ServiceName string `zid:"11"` // registered name to call.

	// arguments/parameters for the call. should be short to keep the HDR small.
	// big stuff should be serialized in JobSerz.
	Args map[string]string `zid:"12"`

	Subject string   `zid:"3"` // in net/rpc, the "Service.Method" ServiceName
	Seqno   uint64   `zid:"4"` // user (client) set sequence number for each call (same on response).
	Typ     CallType `zid:"5"` // see constants above.
	CallID  string   `zid:"6"` // 20 bytes pseudo random base-64 coded string (same on response).
	Serial  int64    `zid:"7"` // system serial number

	LocalRecvTm time.Time `zid:"8"`

	// allow standard []byte oriented message to cancel too.
	Ctx context.Context `msg:"-"`

	// Deadline is optional, but if it is set on the client,
	// the server side context.Context will honor it.
	Deadline time.Time `zid:"9"` // if non-zero, set this deadline in the remote Ctx

	// The CallID will be identical on
	// all parts of the same stream.
	StreamPart int64 `zid:"10"`

	// NoSystemCompression turns off any usual
	// compression that the rpc25519 system
	// applies, for just sending this one Message.
	//
	// Not normally a needed (or a good idea),
	// this flag is for efficiency when the
	// user has implemented their own custom compression
	// scheme for the JobSerz data payload.
	//
	// By checking this flag, the system can
	// avoid wasting time attempting
	// to compress a second time; since the
	// user has, hereby, marked this Message
	// as incompressible.
	//
	// Not matched in reply compression;
	// this flag will not affect the usual
	// compression-matching in responses.
	// For those purposes, it is ignored.
	NoSystemCompression bool `zid:"13"`

	// ToPeerID and FromPeerID help maintain stateful sub-calls
	// allowing client/server symmetry when
	// implementing stateful protocols like
	// the rsync-like protocol herein.
	// Also known as PeerID in the Fragment/Peer/Circuit API.
	ToPeerID   string `zid:"14"`
	FromPeerID string `zid:"15"`
	FragOp     int    `zid:"16"`
	// contains filtered or unexported fields
}

HDR provides header information and details about the transport. It is the first thing in every Message. It is public so that clients can understand the context of their calls. Traditional `net/rpc` API users can use the `ctx context.Context` first argument form of callback methods and get an *HDR with HDRFromContext() as in the README.md introduction. Reproduced here:

func (s *Service) GetsContext(ctx context.Context, args *Args, reply *Reply) error {
    if hdr, ok := HDRFromContext(ctx); ok {
       fmt.Printf("GetsContext called with HDR = '%v'; "+
          "HDR.Nc.RemoteAddr() gives '%v'; HDR.Nc.LocalAddr() gives '%v'\n",
          hdr.String(), hdr.Nc.RemoteAddr(), hdr.Nc.LocalAddr())
    }
}

func HDRFromBytes added in v1.0.42

func HDRFromBytes(jsonData []byte) (*HDR, error)

func HDRFromContext added in v1.2.7

func HDRFromContext(ctx context.Context) (*HDR, bool)

HDRFromContext returns the User value stored in ctx, if any.

func HDRFromGreenpack added in v1.0.42

func HDRFromGreenpack(header []byte) (*HDR, error)

HDRFromGreenpack will unmarshal the header into the returned struct. The [greenpack format](https://github.com/glycerine/greenpack) is expected.

func NewHDR added in v1.0.42

func NewHDR(from, to, serviceName string, typ CallType, streamPart int64) (m *HDR)

NewHDR creates a new HDR header.

func Unbytes

func Unbytes(jsonData []byte) *HDR

Unbytes reverses Bytes.

func (*HDR) AsGreenpack added in v1.0.42

func (hdr *HDR) AsGreenpack(scratch []byte) (o []byte, err error)

AsGreenpack will marshall hdr into the o output bytes. The scratch bytes can be nil or reused and returned to avoid allocation. The [greenpack format](https://github.com/glycerine/greenpack) is used.

func (*HDR) Bytes added in v1.0.42

func (m *HDR) Bytes() []byte

Bytes serializes to compact JSON formatted bytes.

func (*HDR) Compact added in v1.0.42

func (m *HDR) Compact() string

Compact is all on one line.

func (*HDR) DecodeMsg added in v1.0.42

func (z *HDR) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*HDR) EncodeMsg added in v1.0.42

func (z *HDR) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*HDR) Equal added in v1.0.42

func (a *HDR) Equal(b *HDR) bool

Equal compares two *HDR structs field by field for structural equality

func (*HDR) JSON added in v1.0.42

func (m *HDR) JSON() []byte

JSON serializes to JSON.

func (*HDR) MarshalMsg added in v1.0.42

func (z *HDR) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*HDR) Msgsize added in v1.0.42

func (z *HDR) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*HDR) Pretty added in v1.0.42

func (m *HDR) Pretty() string

Pretty shows in pretty-printed JSON format.

func (*HDR) String added in v1.0.42

func (m *HDR) String() string

func (*HDR) UnmarshalMsg added in v1.0.42

func (z *HDR) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*HDR) UnmarshalMsgWithCfg added in v1.0.42

func (z *HDR) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Hello added in v1.1.30

type Hello struct {
	Placeholder int `zid:"0"` // must have public field or greenpack will ignore it.
}

Hello in example.go is part of the tests.

func (*Hello) DecodeMsg added in v1.1.30

func (z *Hello) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Hello) EncodeMsg added in v1.1.30

func (z Hello) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Hello) MarshalMsg added in v1.1.30

func (z Hello) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Hello) Msgsize added in v1.1.30

func (z Hello) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Hello) UnmarshalMsg added in v1.1.30

func (z *Hello) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Hello) UnmarshalMsgWithCfg added in v1.1.30

func (z *Hello) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type InvalidRequest added in v1.1.0

type InvalidRequest struct {
	Placeholder int `zid:"0"`
}

InvalidRequest used instead of struct{} since greenpack needs one member element.

func (*InvalidRequest) DecodeMsg added in v1.1.0

func (z *InvalidRequest) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (InvalidRequest) EncodeMsg added in v1.1.0

func (z InvalidRequest) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (InvalidRequest) MarshalMsg added in v1.1.0

func (z InvalidRequest) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (InvalidRequest) Msgsize added in v1.1.0

func (z InvalidRequest) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*InvalidRequest) UnmarshalMsg added in v1.1.0

func (z *InvalidRequest) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*InvalidRequest) UnmarshalMsgWithCfg added in v1.1.0

func (z *InvalidRequest) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type LocalPeer added in v1.7.0

type LocalPeer struct {
	Halt            *idem.Halter
	NetAddr         string
	PeerServiceName string
	PeerAPI         *peerAPI
	Ctx             context.Context
	Canc            context.CancelFunc
	PeerID          string // renamed from PeerID to avoid conflict with method
	U               UniversalCliSrv
	NewPeerCh       chan *RemotePeer
	ReadsIn         chan *Message
	ErrorsIn        chan *Message

	Remotes               *Mutexmap[string, *RemotePeer]
	HandleChansNewCircuit chan *Circuit
	HandleCircuitClose    chan *Circuit
	QueryCh               chan *QueryLocalPeerPump

	// should we shut ourselves down when no more peers?
	AutoShutdownWhenNoMorePeers    bool
	AutoShutdownWhenNoMoreCircuits bool
}

LocalPeer in the backing behind each local instantiation of a PeerServiceFunc. local peers do reads on ch, get notified of new connections on newPeerChan. and create new outgoing connections with

func (*LocalPeer) Close added in v1.9.0

func (s *LocalPeer) Close()

func (*LocalPeer) GetPeerID added in v1.9.0

func (s *LocalPeer) GetPeerID() string

func (*LocalPeer) ID added in v1.8.0

func (s *LocalPeer) ID() string

func (*LocalPeer) IsClosed added in v1.9.0

func (lpb *LocalPeer) IsClosed() bool

func (*LocalPeer) NewCircuitToPeerURL added in v1.7.0

func (s *LocalPeer) NewCircuitToPeerURL(
	circuitName string,
	peerURL string,
	frag *Fragment,
	errWriteDur time.Duration,
) (ckt *Circuit, ctx context.Context, err error)

NewCircuitToPeerURL sets up a persistent communication path called a Circuit. The frag can be nil, or set to send it immediately.

func (*LocalPeer) OpenCircuitCount added in v1.9.0

func (lpb *LocalPeer) OpenCircuitCount() int

OpenCircuitCount returns the number of open circuits or -1 if this was unavailable because of shutdown. Inherently this count is a point in time snapshot and may be stale by the time it is actually returned.

func (*LocalPeer) SendOneWay added in v1.9.0

func (s *LocalPeer) SendOneWay(ckt *Circuit, frag *Fragment, errWriteDur time.Duration) error

SendOneWayMessage sends a Frament on the given Circuit.

func (*LocalPeer) ServiceName added in v1.8.0

func (s *LocalPeer) ServiceName() string

ServiceName is the string used when we were registered/invoked.

func (*LocalPeer) TellRemoteWeShutdown added in v1.9.0

func (pb *LocalPeer) TellRemoteWeShutdown(rem *RemotePeer)

func (*LocalPeer) URL added in v1.8.0

func (s *LocalPeer) URL() string

URL give the network address, the service name, and the PeerID in a URL safe string, suitable for contacting the peer. e.g. tcp://x.x.x.x:port/peerServiceName/peerID

type Message

type Message struct {

	// HDR contains header information.
	HDR HDR `zid:"0"`

	// JobSerz is the "body" of the message.
	// The user provides and interprets this.
	JobSerz []byte `zid:"1"`

	// JobErrs returns error information from the server-registered
	// user-defined callback functions.
	JobErrs string `zid:"2"`

	// LocalErr is not serialized on the wire by the server.
	// It communicates only local (client/server side) information.
	//
	// Callback functions convey
	// errors in JobErrs (by returning an error);
	// or in-band within JobSerz.
	LocalErr error `msg:"-"`

	// DoneCh.WhenClosed will be closed on the client when the one-way is
	// sent or the round-trip call completes.
	// NewMessage() automatically allocates DoneCh correctly and
	// should be used when creating a new Message (on the client to send).
	DoneCh *loquet.Chan[Message] `msg:"-"`
	// contains filtered or unexported fields
}

Message transports JobSerz []byte slices for the user, who can de-serialize them they wish. The HDR header field provides transport details.

func MessageFromGreenpack added in v1.0.28

func MessageFromGreenpack(by []byte) (*Message, error)

MessageFromGreenpack unmarshals the by slice into a Message and returns it. The [greenpack format](https://github.com/glycerine/greenpack) is expected.

func NewMessage

func NewMessage() *Message

NewMessage allocates a new Message with a DoneCh properly created.

func NewMessageFromBytes

func NewMessageFromBytes(by []byte) (msg *Message)

NewMessageFromBytes calls NewMessage() and sets by as the JobSerz field.

func (*Message) AsGreenpack added in v1.0.29

func (m *Message) AsGreenpack(scratch []byte) (o []byte, err error)

AsGreenpack marshalls m into o. The scratch workspace can be nil or reused to avoid allocation. The [greenpack format](https://github.com/glycerine/greenpack) is used. The m.JobSerz payload must be <= maxMessage-1024, or we will return ErrTooLarge without trying to serialize it.

func (*Message) AsJSON added in v1.1.54

func (m *Message) AsJSON(scratch []byte) (o []byte, err error)

AsJSON returns JSON bytes via msgp.CopyToJSON() or msgp.UnmarshalAsJSON()

func (*Message) DecodeMsg added in v1.0.26

func (z *Message) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*Message) EncodeMsg added in v1.0.26

func (z *Message) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Message) MarshalMsg added in v1.0.26

func (z *Message) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Message) Msgsize added in v1.0.26

func (z *Message) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Message) String

func (msg *Message) String() string

String returns a string representation of msg.

func (*Message) UnmarshalMsg added in v1.0.26

func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Message) UnmarshalMsgWithCfg added in v1.0.26

func (z *Message) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type MustBeCancelled added in v1.2.0

type MustBeCancelled struct {
	// as greenpack efficiently does nothing without any member elements.
	Placeholder int `zid:"0"`
}

The MustBeCancelled struct in example.go is part of the tests. See cli_test.go Test040 for details.

func NewMustBeCancelled added in v1.2.0

func NewMustBeCancelled() *MustBeCancelled

NewMustBeCancelled in example.go is part of the tests. See cli_test.go Test040 for details.

func (*MustBeCancelled) DecodeMsg added in v1.2.0

func (z *MustBeCancelled) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (MustBeCancelled) EncodeMsg added in v1.2.0

func (z MustBeCancelled) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (MustBeCancelled) MarshalMsg added in v1.2.0

func (z MustBeCancelled) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*MustBeCancelled) MessageAPI_HangUntilCancel added in v1.2.0

func (s *MustBeCancelled) MessageAPI_HangUntilCancel(req, reply *Message) error

MessageAPI_HangUntilCancel in example.go is part of the tests. See cli_test.go Test040 for details.

func (MustBeCancelled) Msgsize added in v1.2.0

func (z MustBeCancelled) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*MustBeCancelled) UnmarshalMsg added in v1.2.0

func (z *MustBeCancelled) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*MustBeCancelled) UnmarshalMsgWithCfg added in v1.2.0

func (z *MustBeCancelled) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

func (*MustBeCancelled) WillHangUntilCancel added in v1.2.0

func (s *MustBeCancelled) WillHangUntilCancel(ctx context.Context, args *Args, reply *Reply) error

WillHangUntilCancel in example.go is part of the tests. See cli_test.go Test040 for details.

type Mutexmap added in v1.9.0

type Mutexmap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewMutexmap added in v1.9.0

func NewMutexmap[K comparable, V any]() *Mutexmap[K, V]

func (*Mutexmap[K, V]) Clear added in v1.9.0

func (m *Mutexmap[K, V]) Clear()

func (*Mutexmap[K, V]) Del added in v1.9.0

func (m *Mutexmap[K, V]) Del(key K)

func (*Mutexmap[K, V]) Get added in v1.9.0

func (m *Mutexmap[K, V]) Get(key K) (val V, ok bool)

func (*Mutexmap[K, V]) GetN added in v1.9.0

func (m *Mutexmap[K, V]) GetN() (n int)

getN returns the number of keys in the map.

func (*Mutexmap[K, V]) GetValNDel added in v1.9.0

func (m *Mutexmap[K, V]) GetValNDel(key K) (val V, n int, ok bool)

n gives the count of items left in map after deleting key.

func (*Mutexmap[K, V]) GetValSlice added in v1.9.0

func (m *Mutexmap[K, V]) GetValSlice() (slc []V)

func (*Mutexmap[K, V]) Set added in v1.9.0

func (m *Mutexmap[K, V]) Set(key K, val V)

type NetConnWrapper added in v1.0.5

type NetConnWrapper struct {
	quic.Stream
	quic.Connection
}

NetConnWrapper is exported so that clients like `goq` and others that want to inspect that context of their calls can do so.

func (*NetConnWrapper) Close added in v1.1.33

func (w *NetConnWrapper) Close() error

type OneWayFunc added in v1.0.39

type OneWayFunc func(req *Message)

OneWayFunc is the simpler sibling to the above. A OneWayFunc will not return anything to the sender.

As above req.JobSerz [] byte contains the job payload.

type PeerImpl added in v1.7.0

type PeerImpl struct {
	KnownPeers           []string
	StartCount           atomic.Int64 `msg:"-"`
	DoEchoToThisPeerURL  chan string
	ReportEchoTestCanSee chan string
}

PeerImpl is used in testing and also demonstrates how a user can implement a Peer. Users write a PeerServiceFunc, here called Start(), which can be a method to keep state in a struct, as PeerImpl does.

func (*PeerImpl) Start added in v1.7.0

func (me *PeerImpl) Start(

	myPeer *LocalPeer,

	ctx0 context.Context,

	newPeerCh <-chan *RemotePeer,

) error

Start is an example of PeerServiceFunc in action.

type PeerServiceFunc added in v1.7.0

type PeerServiceFunc func(

	myPeer *LocalPeer,

	ctx0 context.Context,

	newPeerCh <-chan *RemotePeer,

) error

PeerServiceFunc is implemented by user's peer services, and registered on a Client or a Server under a specific peerServiceName by using the PeerAPI.RegisterPeerServiceFunc() call.

type PerCallID_FileToDiskState added in v1.5.0

type PerCallID_FileToDiskState struct {
	CallID string
	T0     time.Time

	OverrideFilename string // if set, use instead of "readFile" in Args.
	FnameTmp         string
	FnameFinal       string
	Randomness       string

	Fd        *os.File
	BytesWrit int64

	Blake3hash *myblake3.Blake3

	PartsSeen map[int64]bool
	SeenCount int
}

func NewPerCallID_FileToDiskState added in v1.5.0

func NewPerCallID_FileToDiskState(callID string) *PerCallID_FileToDiskState

func (*PerCallID_FileToDiskState) WriteOneMsgToFile added in v1.5.0

func (s *PerCallID_FileToDiskState) WriteOneMsgToFile(req *Message, suffix string, last bool) (err error)

type QueryLocalPeerPump added in v1.9.0

type QueryLocalPeerPump struct {
	OpenCircuitCount int
	Ready            chan struct{}
}

QueryLocalPeerPump asks the LocalPeer about its OpenCircuitCount.

func NewQueryLocalPeerPump added in v1.9.0

func NewQueryLocalPeerPump() *QueryLocalPeerPump

NewQueryLocalPeerPump creates a new QueryLocalPeerPump to enquire about the number of open Circuits.

func (*QueryLocalPeerPump) DecodeMsg added in v1.9.0

func (z *QueryLocalPeerPump) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (QueryLocalPeerPump) EncodeMsg added in v1.9.0

func (z QueryLocalPeerPump) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (QueryLocalPeerPump) MarshalMsg added in v1.9.0

func (z QueryLocalPeerPump) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (QueryLocalPeerPump) Msgsize added in v1.9.0

func (z QueryLocalPeerPump) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*QueryLocalPeerPump) UnmarshalMsg added in v1.9.0

func (z *QueryLocalPeerPump) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*QueryLocalPeerPump) UnmarshalMsgWithCfg added in v1.9.0

func (z *QueryLocalPeerPump) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type RemotePeer added in v1.7.0

type RemotePeer struct {
	LocalPeer         *LocalPeer
	PeerID            string
	NetAddr           string
	RemoteServiceName string
	PeerURL           string
	IncomingCkt       *Circuit
}

RemotePeer is the user facing interface to communicating with network-remote Peers. Peers exchange Fragments over Circuits, and generally implement finite-state-machine behavior more complex than can be efficiently modeled with simple call-and-response RPC.

In particular, we support infinite streams of Fragments in order to convey large files and filesystem (r)sync operations.

RemotePeer is a proxy. It is the local representation of a remote peer.

RemotePeer is passed over the newPeerCh channel to a PeerServiceFunc.

The adjective "remote" means we a handle/proxy to the actual remote Peer living on a remote node.

Locally, a RemotePeer is always a child of a LocalPeers. A RemotePeer can be requested by calling NewCircuit, or received on newPeerChan from a remote peer who called NewCircuitToPeerURL.

func (*RemotePeer) GetPeerID added in v1.9.0

func (rpb *RemotePeer) GetPeerID() string

func (*RemotePeer) ID added in v1.9.0

func (rpb *RemotePeer) ID() string

func (*RemotePeer) IncomingCircuit added in v1.7.0

func (rpb *RemotePeer) IncomingCircuit() (ckt *Circuit, ctx context.Context, err error)

IncomingCircuit is the first one that arrives with with an incoming remote peer connection.

func (*RemotePeer) NewCircuit added in v1.7.0

func (rpb *RemotePeer) NewCircuit(circuitName string) (ckt *Circuit, ctx2 context.Context, err error)

NewCircuit generates a Circuit between two Peers, and tells the SendOneWay machinery how to reply to you. It makes a new CircuitID (CallID), and manages it for you. It gives you two channels to get normal and error replies on. Using this Circuit, you can make as many one way calls as you like to the remote Peer. The returned ctx will be cancelled in case of broken/shutdown connection or this application shutting down.

You must call Close() on the ckt when you are done with it.

The circuitName is a convenience and debugging aid. The CircuitID (a.k.a. CallID in Message) determines delivery.

When select{}-ing on ckt.Reads and ckt.Errors, always also select on ctx.Done() and in order to shutdown gracefully.

Allow cID to specify the Call/CircuitID if desired, or empty to get a new one.

func (*RemotePeer) SendOneWay added in v1.9.0

func (s *RemotePeer) SendOneWay(
	ckt *Circuit, frag *Fragment, errWriteDur time.Duration) error

SendOneWay sends a Frament on the given Circuit.

type Reply added in v1.1.0

type Reply struct {
	C int `zid:"0"`
}

Reply in example.go is part of the tests.

func (*Reply) DecodeMsg added in v1.1.0

func (z *Reply) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Reply) EncodeMsg added in v1.1.0

func (z Reply) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Reply) MarshalMsg added in v1.1.0

func (z Reply) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Reply) Msgsize added in v1.1.0

func (z Reply) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Reply) UnmarshalMsg added in v1.1.0

func (z *Reply) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Reply) UnmarshalMsgWithCfg added in v1.1.0

func (z *Reply) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Request added in v1.0.42

type Request struct {
	ServiceMethod string `zid:"0"` // format: "Service.Method"
	Seq           uint64 `zid:"1"` // sequence number chosen by client
	// contains filtered or unexported fields
}

Request is part of the net/rpc API. Its docs:

Request is a header written before every RPC call. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.

func (*Request) DecodeMsg added in v1.0.42

func (z *Request) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Request) EncodeMsg added in v1.0.42

func (z Request) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Request) MarshalMsg added in v1.0.42

func (z Request) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Request) Msgsize added in v1.0.42

func (z Request) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Request) UnmarshalMsg added in v1.0.42

func (z *Request) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Request) UnmarshalMsgWithCfg added in v1.0.42

func (z *Request) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type Response added in v1.0.42

type Response struct {
	ServiceMethod string `zid:"0"` // echoes that of the Request
	Seq           uint64 `zid:"1"` // echoes that of the request
	Error         string `zid:"2"` // error, if any.
	// contains filtered or unexported fields
}

Response is part of the net/rpc API. Its docs:

Response is a header written before every RPC return. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.

func (*Response) DecodeMsg added in v1.0.42

func (z *Response) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Response) EncodeMsg added in v1.0.42

func (z Response) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (Response) MarshalMsg added in v1.0.42

func (z Response) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Response) Msgsize added in v1.0.42

func (z Response) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Response) UnmarshalMsg added in v1.0.42

func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Response) UnmarshalMsgWithCfg added in v1.0.42

func (z *Response) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type ServeBistreamState added in v1.4.7

type ServeBistreamState struct{}

ServeBistreamState is used by Test065_bidirectional_streaming test.

func (*ServeBistreamState) ServeBistream added in v1.4.7

func (bi *ServeBistreamState) ServeBistream(
	srv *Server,
	ctx context.Context,
	req *Message,
	uploadsFromClientCh <-chan *Message,
	sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error,
	lastReply *Message,
) (err error)

ServeBistream is an example of a BistreamFunc, a server side registered function for bi-streaming (doing both upload and download simultaneously). See cli_test.go Test065_bidirectional_download_and_upload for a test that uses this method.

type Server

type Server struct {
	PeerAPI *peerAPI // must be Exported to users!

	// RemoteConnectedCh sends the remote host:port address
	// when the server gets a new client,
	// See srv_test.go Test004_server_push for example,
	// where it is used to avoid a race/panic.
	RemoteConnectedCh chan *ServerClient
	// contains filtered or unexported fields
}

Servers read and respond to requests. Two APIs are available.

Using the rpc25519.Message based API:

Register1Func() and Register2Func() register callbacks.

Using the net/rpc API:

Server.Register() registers structs with callback methods on them.

The net/rpc API is implemented as a layer on top of the rpc25519.Message based API. Both can be used concurrently if desired.

func NewServer

func NewServer(name string, config *Config) *Server

NewServer will keep its own copy of config. If config is nil, the server will make its own upon Start().

func (*Server) Close

func (s *Server) Close() error

Close asks the Server to shut down.

func (*Server) GetErrorsForCallID added in v1.7.0

func (s *Server) GetErrorsForCallID(ch chan *Message, callID string)

func (*Server) GetErrorsForToPeerID added in v1.7.0

func (s *Server) GetErrorsForToPeerID(ch chan *Message, objID string)

func (*Server) GetReadsForCallID added in v1.7.0

func (s *Server) GetReadsForCallID(ch chan *Message, callID string)

GetReads registers to get any received messages on ch. It is similar to GetReadIncomingCh but for when ch already exists and you do not want a new one. It filters for CallID

func (*Server) GetReadsForToPeerID added in v1.7.0

func (s *Server) GetReadsForToPeerID(ch chan *Message, objID string)

func (*Server) LocalAddr added in v1.7.0

func (s *Server) LocalAddr() string

LocalAddr retreives the local host/port that the Client is calling from.

func (*Server) Register added in v1.0.42

func (s *Server) Register(rcvr msgp.Encodable) error

Register implements the net/rpc Server.Register() API. Its docs:

Register publishes in the server the set of methods of the receiver value that satisfy the following conditions:

  • exported method of exported type
  • two arguments, both of exported type
  • the second argument is a pointer
  • one return value, of type error

It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.

rpc25519 addendum:

Callback methods in the `net/rpc` style traditionally look like this first `NoContext` example below. We now allow a context.Context as an additional first parameter. The ctx will a pointer to the `rpc25519.HDR` header from the incoming Message set on it. Call rpc25519.HDRFromContext() to retreive it.

func (s *Service) NoContext(args *Args, reply *Reply) error

* new:

func (s *Service) GetsContext(ctx context.Context, args *Args, reply *Reply) error {
  if hdr, ok := rpc25519.HDRFromContext(ctx); ok {
        fmt.Printf("GetsContext called with HDR = '%v'; "+
             "HDR.Nc.RemoteAddr() gives '%v'; HDR.Nc.LocalAddr() gives '%v'\n",
             h.String(), h.Nc.RemoteAddr(), h.Nc.LocalAddr())
   } else {
        fmt.Println("HDR not found")
   }
}

func (*Server) Register1Func added in v1.0.39

func (s *Server) Register1Func(serviceName string, callme1 OneWayFunc)

Register1Func tells the server about a func or method that will not reply. See the OneWayFunc definition.

func (*Server) Register2Func added in v1.0.39

func (s *Server) Register2Func(serviceName string, callme2 TwoWayFunc)

Register2Func tells the server about a func or method that will have a returned Message value. See the TwoWayFunc definition.

func (*Server) RegisterBistreamFunc added in v1.3.5

func (s *Server) RegisterBistreamFunc(name string, callme BistreamFunc)

func (*Server) RegisterName added in v1.0.42

func (s *Server) RegisterName(name string, rcvr msgp.Encodable) error

RegisterName is like [Register] but uses the provided name for the type instead of the receiver's concrete type.

func (*Server) RegisterServerSendsDownloadFunc added in v1.3.0

func (s *Server) RegisterServerSendsDownloadFunc(name string, callme ServerSendsDownloadFunc)

func (*Server) RegisterUploadReaderFunc added in v1.3.0

func (s *Server) RegisterUploadReaderFunc(name string, callmeUploadReader UploadReaderFunc)

RegisterUploadReaderFunc tells the server about a func or method to handle uploads. See the UploadReaderFunc definition.

func (*Server) RemoteAddr added in v1.7.0

func (s *Server) RemoteAddr() string

RemoteAddr returns "" on the Server, because it is not well defined (which client? there can be many!) We still want to satisfy the UniveralCliSrv interface and let the Client help Peer users. Hence this stub.

func (*Server) SendMessage

func (s *Server) SendMessage(callID, subject, destAddr string, data []byte, seqno uint64,
	errWriteDur *time.Duration) error

SendMessage can be used on the server to push data to one of the connected clients.

The Message msg should have msg.JobSerz set, as well as the HDR fields Subject, CallID, and Seqno. The NewCallID() can be used to generate a random (without conflicts with prior CallID if needed/not matching a previous call.

If the HDR.To destination address is not already connected to the server, the ErrNetConnectionNotFound error will be returned.

errWriteDur is how long we pause waiting for the writing goroutine to send the message or give us a fast error reply. Early discovery of client disconnect can allow us to try other (worker) clients, rather than wait for pings or other slow error paths.

The errWriteDur can be set to a few seconds if this would save the caller a minute of two of waiting to discover the send is unlikely to suceed; or to time.Duration(0) if they want no pause after writing Message to the connection. The default is 30 msec. It is a guess and aims at balance: allowing enough time to get an error back from quic-go if we are going to discover "Application error 0x0 (remote)" right away, and not wanting to stall the caller too much.

goq uses this srv.SendMessage(), cli.OneWaySend(), cli.SendAndGetReply(), cli.GetReadIncomingCh(), cli.LocalAddr(), and cli.Close().

func (*Server) SendOneWayMessage added in v1.3.0

func (s *Server) SendOneWayMessage(ctx context.Context, msg *Message, errWriteDur time.Duration) error

SendOneWayMessage is the same as SendMessage above except that it takes a fully prepared msg to avoid API churn when new HDR fields are added/needed. msg.HDR.Type must be >= CallOneWay (100), to try and catch mis-use of this when the user actually wants a round-trip call.

SendOneWayMessage only sets msg.HDR.From to its correct value.

func (*Server) ServeHTTP added in v1.1.33

func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP implements an http.Handler that answers RPC requests.

func (*Server) Start

func (s *Server) Start() (serverAddr net.Addr, err error)

Start has the Server begin receiving and processing RPC calls. The Config.ServerAddr tells us what host:port to bind and listen on.

func (*Server) UnregisterChannel added in v1.7.0

func (s *Server) UnregisterChannel(ID string, whichmap int)

type ServerClient added in v1.0.135

type ServerClient struct {
	Remote string
	GoneCh chan struct{}
}

type ServerCodec added in v1.0.42

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(Green) error
	WriteResponse(*Response, Green) error

	// Close can be called multiple times and must be idempotent.
	Close() error
}

ServerCodec is part of the net/rpc API. Its docs:

A ServerCodec implements reading of RPC requests and writing of RPC responses for the server side of an RPC session. The server calls [ServerCodec.ReadRequestHeader] and [ServerCodec.ReadRequestBody] in pairs to read requests from the connection, and it calls [ServerCodec.WriteResponse] to write a response back. The server calls [ServerCodec.Close] when finished with the connection. ReadRequestBody may be called with a nil argument to force the body of the request to be read and discarded. See NewClient's comment for information about concurrent access.

type ServerError added in v1.0.42

type ServerError string

ServerError represents an error that has been returned from the remote side of the RPC connection.

func (ServerError) Error added in v1.0.42

func (e ServerError) Error() string

type ServerSendsDownloadFunc added in v1.3.0

type ServerSendsDownloadFunc func(
	srv *Server,
	ctx context.Context,
	req *Message,
	sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error,
	lastReply *Message,
) (err error)

ServerSendsDownloadFunc is used to send a stream to the client on the streamToClientChan. Use Server.RegisterServerSendsDownloadFunc() to register it.

type ServerSendsDownloadState added in v1.3.0

type ServerSendsDownloadState struct{}

func NewServerSendsDownloadState added in v1.6.0

func NewServerSendsDownloadState() *ServerSendsDownloadState

func (*ServerSendsDownloadState) ServerSendsDownload added in v1.3.0

func (ssss *ServerSendsDownloadState) ServerSendsDownload(
	srv *Server,
	ctx context.Context,
	req *Message,
	sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error,
	lastReply *Message,
) (err error)

ServerSendsDownload is used by cmd/srv/server.go; so when

srv -serve

serves downloads to

cli -download path

ServerSendsDownload has type ServerSendsDownloadFunc, and gets registered on the server with srv.RegisterServerSendsDownloadFunc().

type ServerSendsDownloadStateTest added in v1.6.0

type ServerSendsDownloadStateTest struct{}

func NewServerSendsDownloadStateTest added in v1.6.0

func NewServerSendsDownloadStateTest() *ServerSendsDownloadStateTest

func (*ServerSendsDownloadStateTest) ServerSendsDownloadTest added in v1.6.0

func (ssss *ServerSendsDownloadStateTest) ServerSendsDownloadTest(srv *Server, ctx context.Context, req *Message, sendStreamPart func(ctx context.Context, msg *Message, last bool) error, lastReply *Message) (err error)

ServerSendsDownload is used by Test055_streaming_server_to_client. It demonstrates how a registered server func can stream to the client. ServerSendsDownload has type ServerSendsDownloadFunc, and gets registered on the server with srv.RegisterServerSendsDownloadFunc().

type ServerSideUploadState added in v1.4.7

type ServerSideUploadState struct {
	// contains filtered or unexported fields
}

ServerSideUploadState is used by Test045_streaming_client_to_server (upload) in cli_test.go and cmd/srv/server.go to demonstrate streaming a large (or infinite) file in small parts, from client to server, all while keeping FIFO message order.

func NewServerSideUploadState added in v1.4.7

func NewServerSideUploadState() *ServerSideUploadState

NewServerSideUploadState returns a new ServerSideUploadState. This is part of the cli_test.go Test045 mechanics.

func (*ServerSideUploadState) ReceiveFileInParts added in v1.4.7

func (st *ServerSideUploadState) ReceiveFileInParts(ctx context.Context, req *Message, lastReply *Message, deadCallID string) (err error)

ReceiveFileInParts is used by Test045_streaming_client_to_server in cli_test.go to demonstrate streaming from client to server.

See the cmd/cli/client.go and cmd/srv/server.go and their (cli -sendfile) and (srv -readfile) flags for a full implementation of an scp-like utility that uses this method.

ReceiveFileInParts is an UploadReaderFunc and is registered on the Server with the Server.RegisterUploadReaderFunc() call.

Notice that since we get a callback-per-message, and these messages can be from different clients, we must track the state of each of them in their own PerCallIDUploadState. This is a counterpoint in the design space: compare the Bistream and Download versions that are invoked once per call initiation. We have to manage the state of all clients, whereas they have to handle channels.

What happens if the client connection goes down? We still have state here. We'd like it to get cleaned up if the connection is lost. We use the deadCallID for that. It it is set, then clean up after that CallID and return. ctx, req, and lastReply will be nil, so no other work is possible.

type Simple added in v1.1.0

type Simple int

Simple in example.go is part of the tests.

func (*Simple) DecodeMsg added in v1.1.0

func (z *Simple) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (Simple) EncodeMsg added in v1.1.0

func (z Simple) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Simple) Exported added in v1.1.0

func (t *Simple) Exported(args Args, reply *Reply) error

Simple.Exported in example.go is part of the tests.

func (Simple) MarshalMsg added in v1.1.0

func (z Simple) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (Simple) Msgsize added in v1.1.0

func (z Simple) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Simple) UnmarshalMsg added in v1.1.0

func (z *Simple) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Simple) UnmarshalMsgWithCfg added in v1.1.0

func (z *Simple) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

type TwoWayFunc added in v1.0.39

type TwoWayFunc func(req *Message, reply *Message) error

TwoWayFunc is the user's own function that they register with the server for remote procedure calls.

The user's Func may not want to return anything. In that case they should register a OneWayFunc instead.

req.JobSerz []byte contains the job payload.

Implementers of TwoWayFunc should assign their return []byte to reply.JobSerz. reply.Jobserz can also be left nil, of course.

Any errors can be returned on reply.JobErrs; this is optional. Note that JobErrs is a string value rather than an error.

The system will overwrite the reply.HDR.{To,From} fields when sending the reply, so the user should not bother setting those. The one exception to this rule is the reply.HDR.Subject string, which can be set by the user to return user-defined information. The reply will still be matched to the request on the HDR.Seqno, so a change of HDR.Subject will not change which goroutine receives the reply.

type UniversalCliSrv added in v1.7.0

type UniversalCliSrv interface {
	SendOneWayMessage(ctx context.Context, msg *Message, errWriteDur time.Duration) error

	GetReadsForCallID(ch chan *Message, callID string)
	GetErrorsForCallID(ch chan *Message, callID string)

	// for Peer/Object systems; ToPeerID get priority over CallID
	// to allow such systems to implement custom message
	// types. An example is the Fragment/Peer/Circuit system.
	// (This priority is implemented in notifies.handleReply_to_CallID_ToPeerID).
	GetReadsForToPeerID(ch chan *Message, objID string)
	GetErrorsForToPeerID(ch chan *Message, objID string)

	UnregisterChannel(ID string, whichmap int)
	LocalAddr() string
	RemoteAddr() string // client provides, server gives ""
}

UniversalCliSrv allows protocol objects to exist and run on either Client or Server. They can be implemented to just use this interface.

type UploadReaderFunc added in v1.3.0

type UploadReaderFunc func(ctx context.Context, req *Message, lastReply *Message, deadCallID string) error

A UploadReaderFunc receives messages from a Client's upload. It corresponds to the client-side Uploader, created by Client.UploadBegin().

For a quick example, see the ReceiveFileInParts() implementation in the example.go file. It is a method on the ServerSideUploadFunc struct that holds state between the callbacks to ReceiveFileInParts().

A UploadReaderFunc is like a OneWayFunc, but it generally should also be a method or closure to capture the state it needs, as it will receive multiple req *Message up-calls from the same client Stream. It should return a non-nil error to tell the client to stop sending. A nil return means we are fine and want to continue to receive more Messages from the same Stream. The req.HDR.CallID can be used to identify distinct Streams, and the req.HDR.StreamPart will convey their order which will start at 0 and count up.

The lastReply argument will be nil until the Client calls Stream.More() with the last argument set to true. The user/client is telling the UploadReaderFunc not to expect any further messages. The UploadReaderFunc can then fill in the lastReply message with any finishing detail, and it will be sent back to the client.

Note that even when lastReply is not nil, req may still have the tail content of the stream, and so generally req should be processed before considering if this is the last message and a final lastReply should also be filled out.

For cleanup/avoiding memory leaks: If deadCallID is not the empty string, then the connection for this CallID has died and we should cleanup its resources. ctx, req, and lastReply will all be nil in this case.

type Uploader added in v1.3.0

type Uploader struct {
	ReadCh <-chan *Message

	ErrorCh <-chan *Message
	// contains filtered or unexported fields
}

Uploader helps the client to make a series of non-blocking (one-way) calls to a remote server's UploadReaderFunc which must have been already registered on the server.

func (*Uploader) CallID added in v1.3.0

func (s *Uploader) CallID() string

func (*Uploader) UploadMore added in v1.3.0

func (s *Uploader) UploadMore(ctx context.Context, msg *Message, last bool) (err error)

Directories

Path Synopsis
_attic
cmd
cli
srv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL