Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithHDR(ctx context.Context, hdr *HDR) context.Context
- func EchoBistreamFunc(srv *Server, ctx context.Context, req *Message, ...) (err error)
- func GoroNumber() int
- func IsNil(face interface{}) bool
- func MinimalBistreamFunc(srv *Server, ctx context.Context, req *Message, ...) (err error)
- func NewCallID() string
- func NewChaCha20CryptoRandKey() []byte
- func SelfyNewKey(createKeyPairNamed, odir string) error
- type Args
- func (z *Args) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Args) EncodeMsg(en *msgp.Writer) (err error)
- func (z Args) MarshalMsg(b []byte) (o []byte, err error)
- func (z Args) Msgsize() (s int)
- func (z *Args) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Args) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Arith
- func (t *Arith) Add(args Args, reply *Reply) error
- func (z *Arith) DecodeMsg(dc *msgp.Reader) (err error)
- func (t *Arith) Div(args Args, reply *Reply) error
- func (z Arith) EncodeMsg(en *msgp.Writer) (err error)
- func (t *Arith) Error(args *Args, reply *Reply) error
- func (z Arith) MarshalMsg(b []byte) (o []byte, err error)
- func (z Arith) Msgsize() (s int)
- func (t *Arith) Mul(args *Args, reply *Reply) error
- func (t *Arith) Scan(args string, reply *Reply) (err error)
- func (t *Arith) SleepMilli(args *Args, reply *Reply) error
- func (t *Arith) String(args *Args, reply *string) error
- func (z *Arith) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Arith) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type BenchmarkMessage
- func (z *BenchmarkMessage) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *BenchmarkMessage) EncodeMsg(en *msgp.Writer) (err error)
- func (z *BenchmarkMessage) MarshalMsg(b []byte) (o []byte, err error)
- func (z *BenchmarkMessage) Msgsize() (s int)
- func (z *BenchmarkMessage) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *BenchmarkMessage) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type BistreamFunc
- type Bistreamer
- func (b *Bistreamer) Begin(ctx context.Context, req *Message) (err error)
- func (s *Bistreamer) CallID() string
- func (b *Bistreamer) Close()
- func (s *Bistreamer) Name() string
- func (s *Bistreamer) Seqno() uint64
- func (s *Bistreamer) UploadMore(ctx context.Context, msg *Message, last bool) (err error)
- type BuiltinTypes
- func (BuiltinTypes) Array(args *Args, reply *[2]int) error
- func (z *BuiltinTypes) DecodeMsg(dc *msgp.Reader) (err error)
- func (z BuiltinTypes) EncodeMsg(en *msgp.Writer) (err error)
- func (BuiltinTypes) Map(args *Args, reply *map[int]int) error
- func (z BuiltinTypes) MarshalMsg(b []byte) (o []byte, err error)
- func (z BuiltinTypes) Msgsize() (s int)
- func (BuiltinTypes) Slice(args *Args, reply *[]int) error
- func (z *BuiltinTypes) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *BuiltinTypes) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- func (BuiltinTypes) WantsContext(ctx context.Context, args *Args, reply *[2]int) error
- type Call
- type CallType
- func (z *CallType) DecodeMsg(dc *msgp.Reader) (err error)
- func (z CallType) EncodeMsg(en *msgp.Writer) (err error)
- func (z CallType) MarshalMsg(b []byte) (o []byte, err error)
- func (z CallType) Msgsize() (s int)
- func (ct CallType) String() string
- func (z *CallType) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *CallType) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Client
- func (c *Client) Call(serviceMethod string, args, reply Green, octx context.Context) error
- func (c *Client) Close() error
- func (c *Client) Err() error
- func (c *Client) GetErrorChForCallID(callID string) (ch chan *Message)
- func (c *Client) GetErrorsForCallID(ch chan *Message, callID string)
- func (c *Client) GetOneRead(seqno uint64, ch *loquet.Chan[Message])
- func (c *Client) GetReadIncomingCh() (ch chan *Message)
- func (c *Client) GetReadIncomingChForCallID(callID string) (ch chan *Message)
- func (c *Client) GetReads(ch chan *Message)
- func (c *Client) GetReadsForCallID(ch chan *Message, callID string)
- func (c *Client) Go(serviceMethod string, args Green, reply Green, done chan *Call, ...) *Call
- func (c *Client) IsDown() (down bool)
- func (c *Client) LocalAddr() string
- func (c *Client) Name() string
- func (c *Client) NewBistreamer(bistreamerName string) (b *Bistreamer, err error)
- func (c *Client) NewDownloader(ctx context.Context, streamerName string) (downloader *Downloader, err error)
- func (c *Client) OneWaySend(msg *Message, cancelJobCh <-chan struct{}) (err error)
- func (c *Client) RequestBistreaming(ctx context.Context, bistreamerName string, req *Message) (b *Bistreamer, err error)
- func (c *Client) RequestDownload(ctx context.Context, streamerName, path string) (downloader *Downloader, err error)
- func (cli *Client) RsyncClientSide()
- func (c *Client) SendAndGetReply(req *Message, cancelJobCh <-chan struct{}) (reply *Message, err error)
- func (c *Client) SendAndGetReplyWithCtx(ctx context.Context, req *Message) (reply *Message, err error)
- func (c *Client) SendAndGetReplyWithTimeout(timeout time.Duration, req *Message) (reply *Message, err error)
- func (c *Client) Start() error
- func (c *Client) UngetOneRead(seqno uint64, ch *loquet.Chan[Message])
- func (c *Client) UngetReads(ch chan *Message)
- func (c *Client) UploadBegin(ctx context.Context, serviceName string, msg *Message) (strm *Uploader, err error)
- type ClientCodec
- type Config
- type Downloader
- type Embed
- func (z *Embed) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Embed) EncodeMsg(en *msgp.Writer) (err error)
- func (z Embed) MarshalMsg(b []byte) (o []byte, err error)
- func (z Embed) Msgsize() (s int)
- func (z *Embed) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Embed) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Green
- type HDR
- func (hdr *HDR) AsGreenpack(scratch []byte) (o []byte, err error)
- func (m *HDR) Bytes() []byte
- func (m *HDR) Compact() string
- func (z *HDR) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *HDR) EncodeMsg(en *msgp.Writer) (err error)
- func (a *HDR) Equal(b *HDR) bool
- func (m *HDR) JSON() []byte
- func (z *HDR) MarshalMsg(b []byte) (o []byte, err error)
- func (z *HDR) Msgsize() (s int)
- func (m *HDR) Pretty() string
- func (m *HDR) String() string
- func (z *HDR) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *HDR) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Hello
- func (z *Hello) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Hello) EncodeMsg(en *msgp.Writer) (err error)
- func (z Hello) MarshalMsg(b []byte) (o []byte, err error)
- func (z Hello) Msgsize() (s int)
- func (z *Hello) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Hello) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type InvalidRequest
- func (z *InvalidRequest) DecodeMsg(dc *msgp.Reader) (err error)
- func (z InvalidRequest) EncodeMsg(en *msgp.Writer) (err error)
- func (z InvalidRequest) MarshalMsg(b []byte) (o []byte, err error)
- func (z InvalidRequest) Msgsize() (s int)
- func (z *InvalidRequest) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *InvalidRequest) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type MatchHashPair
- func (z *MatchHashPair) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *MatchHashPair) EncodeMsg(en *msgp.Writer) (err error)
- func (z *MatchHashPair) MarshalMsg(b []byte) (o []byte, err error)
- func (z *MatchHashPair) Msgsize() (s int)
- func (z *MatchHashPair) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *MatchHashPair) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Message
- func (m *Message) AsGreenpack(scratch []byte) (o []byte, err error)
- func (m *Message) AsJSON(scratch []byte) (o []byte, err error)
- func (z *Message) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *Message) EncodeMsg(en *msgp.Writer) (err error)
- func (z *Message) MarshalMsg(b []byte) (o []byte, err error)
- func (z *Message) Msgsize() (s int)
- func (msg *Message) String() string
- func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Message) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type MustBeCancelled
- func (z *MustBeCancelled) DecodeMsg(dc *msgp.Reader) (err error)
- func (z MustBeCancelled) EncodeMsg(en *msgp.Writer) (err error)
- func (z MustBeCancelled) MarshalMsg(b []byte) (o []byte, err error)
- func (s *MustBeCancelled) MessageAPI_HangUntilCancel(req, reply *Message) error
- func (z MustBeCancelled) Msgsize() (s int)
- func (z *MustBeCancelled) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *MustBeCancelled) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- func (s *MustBeCancelled) WillHangUntilCancel(ctx context.Context, args *Args, reply *Reply) error
- type NetConnWrapper
- type OneWayFunc
- type PerCallID_FileToDiskState
- type Reply
- func (z *Reply) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Reply) EncodeMsg(en *msgp.Writer) (err error)
- func (z Reply) MarshalMsg(b []byte) (o []byte, err error)
- func (z Reply) Msgsize() (s int)
- func (z *Reply) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Reply) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Request
- func (z *Request) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Request) EncodeMsg(en *msgp.Writer) (err error)
- func (z Request) MarshalMsg(b []byte) (o []byte, err error)
- func (z Request) Msgsize() (s int)
- func (z *Request) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Request) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type Response
- func (z *Response) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Response) EncodeMsg(en *msgp.Writer) (err error)
- func (z Response) MarshalMsg(b []byte) (o []byte, err error)
- func (z Response) Msgsize() (s int)
- func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Response) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncChunk
- func (z *RsyncChunk) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncChunk) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncChunk) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncChunk) Msgsize() (s int)
- func (z *RsyncChunk) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncChunk) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncDiff
- func (z *RsyncDiff) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncDiff) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncDiff) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncDiff) Msgsize() (s int)
- func (d *RsyncDiff) String() string
- func (z *RsyncDiff) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncDiff) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncHashes
- func (z *RsyncHashes) DecodeMsg(dc *msgp.Reader) (err error)
- func (a *RsyncHashes) Diff(b *RsyncHashes) (d *RsyncDiff)
- func (z *RsyncHashes) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncHashes) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncHashes) Msgsize() (s int)
- func (h *RsyncHashes) String() string
- func (z *RsyncHashes) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncHashes) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncStep0_ClientRequestsRead
- func (z *RsyncStep0_ClientRequestsRead) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncStep0_ClientRequestsRead) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncStep0_ClientRequestsRead) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncStep0_ClientRequestsRead) Msgsize() (s int)
- func (z *RsyncStep0_ClientRequestsRead) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncStep0_ClientRequestsRead) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncStep1_SenderOverview
- func (z *RsyncStep1_SenderOverview) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncStep1_SenderOverview) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncStep1_SenderOverview) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncStep1_SenderOverview) Msgsize() (s int)
- func (z *RsyncStep1_SenderOverview) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncStep1_SenderOverview) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncStep2_AckOverview
- func (z *RsyncStep2_AckOverview) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncStep2_AckOverview) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncStep2_AckOverview) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncStep2_AckOverview) Msgsize() (s int)
- func (z *RsyncStep2_AckOverview) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncStep2_AckOverview) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncStep3_SenderProvidesDeltas
- func (z *RsyncStep3_SenderProvidesDeltas) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncStep3_SenderProvidesDeltas) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncStep3_SenderProvidesDeltas) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncStep3_SenderProvidesDeltas) Msgsize() (s int)
- func (z *RsyncStep3_SenderProvidesDeltas) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncStep3_SenderProvidesDeltas) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type RsyncStep4_ReaderAcksDeltasFin
- func (z *RsyncStep4_ReaderAcksDeltasFin) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *RsyncStep4_ReaderAcksDeltasFin) EncodeMsg(en *msgp.Writer) (err error)
- func (z *RsyncStep4_ReaderAcksDeltasFin) MarshalMsg(b []byte) (o []byte, err error)
- func (z *RsyncStep4_ReaderAcksDeltasFin) Msgsize() (s int)
- func (z *RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type ServeBistreamState
- type Server
- func (s *Server) Close() error
- func (s *Server) Register(rcvr msgp.Encodable) error
- func (s *Server) Register1Func(serviceName string, callme1 OneWayFunc)
- func (s *Server) Register2Func(serviceName string, callme2 TwoWayFunc)
- func (s *Server) RegisterBistreamFunc(name string, callme BistreamFunc)
- func (s *Server) RegisterName(name string, rcvr msgp.Encodable) error
- func (s *Server) RegisterServerSendsDownloadFunc(name string, callme ServerSendsDownloadFunc)
- func (s *Server) RegisterUploadReaderFunc(name string, callmeUploadReader UploadReaderFunc)
- func (Server *Server) RsyncServerSide(srv *Server, ctx context.Context, req *Message, ...) (err error)
- func (s *Server) SendMessage(callID, subject, destAddr string, data []byte, seqno uint64, ...) error
- func (s *Server) SendOneWayMessage(ctx context.Context, msg *Message, errWriteDur *time.Duration) error
- func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (s *Server) Start() (serverAddr net.Addr, err error)
- type ServerClient
- type ServerCodec
- type ServerError
- type ServerSendsDownloadFunc
- type ServerSendsDownloadState
- type ServerSendsDownloadStateTest
- type ServerSideUploadState
- type Simple
- func (z *Simple) DecodeMsg(dc *msgp.Reader) (err error)
- func (z Simple) EncodeMsg(en *msgp.Writer) (err error)
- func (t *Simple) Exported(args Args, reply *Reply) error
- func (z Simple) MarshalMsg(b []byte) (o []byte, err error)
- func (z Simple) Msgsize() (s int)
- func (z *Simple) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Simple) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- type TwoWayFunc
- type UploadReaderFunc
- type Uploader
Constants ¶
const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"
)
const DefaultUseCompressAlgo = "s2" // see magic7.go
const DefaultUseCompression = true
const (
UserMaxPayload = 67106000 // users should chunk to this size, to be safe.
)
Variables ¶
var DebugVerboseCompress bool
var ErrAlreadyDone = fmt.Errorf("Uploader has already been marked done. No more sending is allowed.")
var ErrCancelReqSent = fmt.Errorf("cancellation request sent")
var ErrContextCancelled = fmt.Errorf("context cancelled")
var ErrDone = fmt.Errorf("done channel closed")
var ErrHandshakeQUIC = fmt.Errorf("quic handshake failure")
var ErrMagicWrong = fmt.Errorf("error: magic bytes not found at start of message")
var ErrNetConnectionNotFound = fmt.Errorf("error in SendMessage: net.Conn not found")
var ErrNetRpcShutdown = errors.New("connection is shut down")
ErrNetRpcShutdown is from net/rpc, and still distinct from ErrShutdown to help locate when and where the error was generated. It indicates the system, or at least the network connection or stream, is closed or shutting down.
var ErrNotFound = fmt.Errorf("known_tls_hosts file not found")
var ErrShutdown = fmt.Errorf("shutting down")
var ErrTimeout = fmt.Errorf("time-out waiting for call to complete")
var ErrTooLarge = fmt.Errorf("error: length of payload JobSerz is over maxMessage - 1024(for header) = %v bytes, which is the limit.", maxMessage-1024)
var ErrTooLong = fmt.Errorf("message message too long: over 64MB; encrypted client vs an un-encrypted server?")
var ErrWrongCallTypeForSendMessage = fmt.Errorf("error in SendMessage: msg.HDR.Typ must be CallOneWay or CallUploadBegin; or greater in number")
Functions ¶
func ContextWithHDR ¶ added in v1.2.7
ContextWithHDR returns a new Context that carries value hdr.
func EchoBistreamFunc ¶ added in v1.5.0
func EchoBistreamFunc( srv *Server, ctx context.Context, req *Message, uploadsFromClientCh <-chan *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
Echo anything we get from the client back. srv uses to test bistreaming.
func GoroNumber ¶ added in v1.1.43
func GoroNumber() int
GoroNumber returns the calling goroutine's number.
func IsNil ¶ added in v1.1.32
func IsNil(face interface{}) bool
IsNil uses reflect to to return true iff the face contains a nil pointer, map, array, slice, or channel.
func MinimalBistreamFunc ¶ added in v1.4.3
func MinimalBistreamFunc( srv *Server, ctx context.Context, req *Message, uploadsFromClientCh <-chan *Message, sendDownloadPartToClient func(ctx context.Context, by []byte, last bool) error, lastReply *Message, ) (err error)
MinimalBistreamFunc stands in contract to the fleshed on ServeBistream example above. It attempts to illustrate only the bare minimal needed to implement a BistreamFunc, so the user can glimpse its conceptual elegance.
It provides a starter template for writing your own.
Typically you might want to make it a method on a struct that provides it with other application dependent state and helper methods. But that's not minimal.
func NewChaCha20CryptoRandKey ¶ added in v1.0.114
func NewChaCha20CryptoRandKey() []byte
func SelfyNewKey ¶ added in v1.0.3
SelfyNewKey is only for testing, not production. It is used by the tests to check that certs are signed by the expected CA.
SelfyNewKey will generate a self-signed certificate authority, a new ed25519 key pair, sign the public key to create a cert, and write these four new files to disk. The directories odir/my-keep-private-dir and odir/certs will be created, based on the odir argument. For a given createKeyPairNamed name, we will create odir/certs/name.crt and odir/certs/name.key files. The odir/certs/name.key and my-keep-private-dir/ca.key files contain private keys and should be kept confidential. The `selfy` command in this package can be used to produce the same keys but with password protection, which is recommended.
Types ¶
type Args ¶ added in v1.1.0
Args in example.go is part of the tests.
func (*Args) DecodeMsg ¶ added in v1.1.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Args) MarshalMsg ¶ added in v1.1.0
MarshalMsg implements msgp.Marshaler
func (Args) Msgsize ¶ added in v1.1.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Args) UnmarshalMsg ¶ added in v1.1.0
UnmarshalMsg implements msgp.Unmarshaler
func (*Args) UnmarshalMsgWithCfg ¶ added in v1.1.0
type Arith ¶ added in v1.1.0
type Arith int
Arith in example.go is part of the tests.
func (*Arith) DecodeMsg ¶ added in v1.1.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Arith) MarshalMsg ¶ added in v1.1.0
MarshalMsg implements msgp.Marshaler
func (Arith) Msgsize ¶ added in v1.1.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Arith) SleepMilli ¶ added in v1.1.0
Arith.SleepMilli in example.go is part of the tests.
func (*Arith) UnmarshalMsg ¶ added in v1.1.0
UnmarshalMsg implements msgp.Unmarshaler
func (*Arith) UnmarshalMsgWithCfg ¶ added in v1.1.0
type BenchmarkMessage ¶ added in v1.1.30
type BenchmarkMessage struct { Field1 string `zid:"0"` Field9 string `zid:"1"` Field18 string `zid:"2"` Field80 bool `zid:"3"` Field81 bool `zid:"4"` Field2 int32 `zid:"5"` Field3 int32 `zid:"6"` Field280 int32 `zid:"7"` Field6 int32 `zid:"8"` Field22 int64 `zid:"9"` Field4 string `zid:"10"` Field5 []uint64 `zid:"11"` Field59 bool `zid:"12"` Field7 string `zid:"13"` Field16 int32 `zid:"14"` Field130 int32 `zid:"15"` Field12 bool `zid:"16"` Field17 bool `zid:"17"` Field13 bool `zid:"18"` Field14 bool `zid:"19"` Field104 int32 `zid:"20"` Field100 int32 `zid:"21"` Field101 int32 `zid:"22"` Field102 string `zid:"23"` Field103 string `zid:"24"` Field29 int32 `zid:"25"` Field30 bool `zid:"26"` Field60 int32 `zid:"27"` Field271 int32 `zid:"28"` Field272 int32 `zid:"29"` Field150 int32 `zid:"30"` Field23 int32 `zid:"31"` Field24 bool `zid:"32"` Field25 int32 `zid:"33"` Field78 bool `zid:"34"` Field67 int32 `zid:"35"` Field68 int32 `zid:"36"` Field128 int32 `zid:"37"` Field129 string `zid:"38"` Field131 int32 `zid:"39"` }
BenchmarkMessage in example.go is part of the tests and benchmarks.
func (*BenchmarkMessage) DecodeMsg ¶ added in v1.1.30
func (z *BenchmarkMessage) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*BenchmarkMessage) EncodeMsg ¶ added in v1.1.30
func (z *BenchmarkMessage) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*BenchmarkMessage) MarshalMsg ¶ added in v1.1.30
func (z *BenchmarkMessage) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*BenchmarkMessage) Msgsize ¶ added in v1.1.30
func (z *BenchmarkMessage) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*BenchmarkMessage) UnmarshalMsg ¶ added in v1.1.30
func (z *BenchmarkMessage) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*BenchmarkMessage) UnmarshalMsgWithCfg ¶ added in v1.1.30
func (z *BenchmarkMessage) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type BistreamFunc ¶ added in v1.3.0
type BistreamFunc func( srv *Server, ctx context.Context, req *Message, uploadsFromClientCh <-chan *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
BistreamFunc aims to allow the user to implement server operations with full generality; it provies for uploads and downloads to the originating client, and for communication with other clients. Use Server.RegisterBistreamFunc() to register your BistreamFunc under a name. The BistreamFunc and its siblings the ServerSendsDownloadFunc and the UploadReaderFunc are only available for the Message based API; not in the net/rpc API.
On the client side, the Client.RequestBistreaming() call is used to create a Bistreamer that will call the BistreamFunc by its registered name (the name that Server.RegisterBistreamFunc() was called with).
In a BistreamFunc on the server, the full generality of interleaving upload and download handling is available. The initial Message in req will also be the first Message in the req.HDR.UploadsCh which receives all upload messages from the client.
To note, it may be more convenient for the user to use an UploadReaderFunc or ServerSendsDownloadFunc if the full generality of the BistreamFunc is not needed. For simplicity, the Server.RegisterServerSendsDownloadFunc() is used to register your ServerSendsDownloadFunc. Server.RegisterUploadReaderFunc() is used to register you UploadReaderFunc. Note in particular that the UploadReaderFunc is not persistent but rather receives a callback per Message received from the Client.Uploader. This may simplify the implementation of your server-side upload function. Note that persistent state between messages is still available by registering a method on your struct; see the ServerSideUploadFunc struct in example.go for example.
BistreamFunc, in contrast, are not a callback-per-message, but rather persist and would typically only exit if ctx.Done() is received, or if it wishes to finish the operation (say on an error, or by noting that a CallUploadEnd type Message has been received) so as to save goroutine resources on the server. The BistreamFunc is started on the server when the CallRequestBistreaming Message.HDR.Typ is received with the ServiceName matching the registered name. Each live instance of the BistreamFunc is identified by the req.HDR.CallID set by the originating client. All download messages sent will have this same CallID on them (for the client to match).
When the BistreamFunc finishes (returns), a final message will of type CallRPCReply will be sent back to the client. This is the lastReply *Message provided in the BistreamFunc. The BistreamFunc should fill in this lastReply with any final JobSerz payload it wishes to send; this is optional. On the client side, the Client.RequestBistreaming() is used to start bi-streaming. It returns a Bistreamer. This Bistreamer has a ReadCh that will receive this final message (as well as all other download messages). See the cli_test.go Test065_bidirectional_download_and_upload for example use.
A BistreamFunc is run on its own goroutine. It can start new goroutines, if it wishes, but this is not required. An additional (new) goroutine may be useful to reduce the latency of message handling while simultaneously reading from req.HDR.UploadsCh for uploads and writing to downloads with sendDownloadPartToClient(), as both of these are blocking, synchronous, operations. If you do so, be sure to handle goroutine cancellation and cleanup if the ctx provided is cancelled.
The sendDownloadPartToClient() helper function is used to write download Messages. It properly assigns the HDR.StreamPart sequence numbers and HDR.Typ as one of CallDownloadBegin, CallDownloadMore, and CallDownloadEnd). The BistreamFunc should call sendDownloadPartToClient() with last=true to signal the end of the download, in which case HDR.Typ CallDownloadEnd will be set on the sent Message.
To provide back-pressure by default, the sendDownloadPartToClient() call is synchronous and will return only when the message is sent. If you wish to continue to process uploads while sending a download part, your BistreamFunc can call the provided sendDownloadPartToClient() in a goroutine that you start for this purpose. The sendDownloadPartToClient() call is goroutine safe, as it uses its own internal sync.Mutex to ensure only one send is in progress at a time.
A BistreamFunc by default communicates download messages to its originating client. However other clients can also be sent messages. The Server.SendOneWayMessage() and Server.SendMessage() operations on the Server can be used for this purpose.
Visit the example.go implementation of ServeBistreamState.ServeBistream() to see it in action.
type Bistreamer ¶ added in v1.3.0
type Bistreamer struct { ReadDownloadsCh <-chan *Message WriteCh chan<- *Message ErrorCh <-chan *Message // contains filtered or unexported fields }
Bistreamer is the client side handle to talking with a server func that does bistreaming: the client can stream to the server func, and the server func can, symmetrically, stream to the client. The basics of TCP are finally available to users.
func (*Bistreamer) Begin ¶ added in v1.5.0
func (b *Bistreamer) Begin(ctx context.Context, req *Message) (err error)
func (*Bistreamer) CallID ¶ added in v1.3.0
func (s *Bistreamer) CallID() string
func (*Bistreamer) Name ¶ added in v1.3.0
func (s *Bistreamer) Name() string
func (*Bistreamer) Seqno ¶ added in v1.3.0
func (s *Bistreamer) Seqno() uint64
func (*Bistreamer) UploadMore ¶ added in v1.3.0
type BuiltinTypes ¶ added in v1.1.0
type BuiltinTypes struct {
Placeholder int `zid:"0"` // greenpack refuses to serialize an empty struct.
}
BuiltinTypes in example.go is part of the tests.
func (BuiltinTypes) Array ¶ added in v1.1.0
func (BuiltinTypes) Array(args *Args, reply *[2]int) error
BuiltinTypes.Array in example.go is part of the tests.
func (*BuiltinTypes) DecodeMsg ¶ added in v1.1.0
func (z *BuiltinTypes) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (BuiltinTypes) EncodeMsg ¶ added in v1.1.0
func (z BuiltinTypes) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (BuiltinTypes) Map ¶ added in v1.1.0
func (BuiltinTypes) Map(args *Args, reply *map[int]int) error
BuiltinTypes.Map in example.go is part of the tests.
func (BuiltinTypes) MarshalMsg ¶ added in v1.1.0
func (z BuiltinTypes) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (BuiltinTypes) Msgsize ¶ added in v1.1.0
func (z BuiltinTypes) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (BuiltinTypes) Slice ¶ added in v1.1.0
func (BuiltinTypes) Slice(args *Args, reply *[]int) error
BuiltinTypes.Slice in example.go is part of the tests.
func (*BuiltinTypes) UnmarshalMsg ¶ added in v1.1.0
func (z *BuiltinTypes) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*BuiltinTypes) UnmarshalMsgWithCfg ¶ added in v1.1.0
func (z *BuiltinTypes) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
func (BuiltinTypes) WantsContext ¶ added in v1.1.0
BuiltinTypes.WantsContext in example.go is part of the tests. Here, mimic Array's reply.
type Call ¶ added in v1.0.42
type Call struct { ServiceMethod string // The name of the service and method to call. Args Green // The argument to the function (*struct). Reply Green // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Receives *Call when Go is complete. }
Call represents an active net/rpc RPC.
type CallType ¶ added in v1.1.29
type CallType int
const ( CallNone CallType = 0 CallRPC CallType = 1 CallNetRPC CallType = 2 CallRequestBistreaming CallType = 3 // these are also RPCs from the client to the // server to start an rsync operation. CallRsyncStep0_ClientRequestsRead CallType = 4 CallRsyncStep1_SenderOverview CallType = 5 CallRsyncStep2_AckOverview CallType = 22 CallRsyncStep3_SenderProvidesDeltas CallType = 23 CallRsyncStep4_ReaderAcksDeltasFin CallType = 24 // All type numbers >= 10 are one-way calls. CallOneWay CallType = 10 CallRPCReply CallType = 11 CallKeepAlive CallType = 12 CallCancelPrevious CallType = 13 // we could not complete a request CallError CallType = 14 // client sends a stream to the server, in an Upload: CallUploadBegin CallType = 15 // one of these; and CallUploadMore CallType = 16 // possibly many of these; and CallUploadEnd CallType = 17 // just one of these to finish. // the opposite: when client wants to get a stream // from the server. CallRequestDownload CallType = 18 // The server responds to CallRequestDownload with CallDownloadBegin CallType = 19 // one of these to start; CallDownloadMore CallType = 20 // possibly many of these; CallDownloadEnd CallType = 21 // and one of these to finish. )
func (*CallType) DecodeMsg ¶ added in v1.1.29
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (CallType) MarshalMsg ¶ added in v1.1.29
MarshalMsg implements msgp.Marshaler
func (CallType) Msgsize ¶ added in v1.1.29
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*CallType) UnmarshalMsg ¶ added in v1.1.29
UnmarshalMsg implements msgp.Unmarshaler
func (*CallType) UnmarshalMsgWithCfg ¶ added in v1.1.29
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
A Client starts requests, and (might) wait for responses.
func NewClient ¶
NewClient creates a new client. Call Start() to begin a connection. The name setting allows users to track multiple instances of Clients, and the Client.Name() method will retreive it.
func (*Client) Call ¶ added in v1.0.42
Call implements the net/rpc Client.Call() API; its docs:
Call invokes the named function, waits for it to complete, and returns its error status.
Added: octx is an optional context for cancelling the job. It can be nil.
func (*Client) GetErrorChForCallID ¶ added in v1.5.1
func (*Client) GetErrorsForCallID ¶ added in v1.5.1
func (*Client) GetOneRead ¶
GetOneRead responds on ch with the first incoming message whose Seqno matches seqno, then auto unregisters itself after that single send on ch.
func (*Client) GetReadIncomingCh ¶
GetReadIncomingCh creates and returns a buffered channel that reads incoming messages that are server-pushed (not associated with a round-trip rpc call request/response pair).
func (*Client) GetReadIncomingChForCallID ¶ added in v1.3.0
GetReadIncomingChForCallID creates and returns a buffered channel that reads incoming messages that are server-pushed (not associated with a round-trip rpc call request/response pair). It filters for those with callID.
func (*Client) GetReads ¶
GetReads registers to get any received messages on ch. It is similar to GetReadIncomingCh but for when ch already exists and you do not want a new one.
func (*Client) GetReadsForCallID ¶ added in v1.3.0
GetReads registers to get any received messages on ch. It is similar to GetReadIncomingCh but for when ch already exists and you do not want a new one. It filters for CallID
func (*Client) Go ¶ added in v1.0.42
func (c *Client) Go(serviceMethod string, args Green, reply Green, done chan *Call, octx context.Context) *Call
Go implements the net/rpc Client.Go() API; its docs:
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
octx is an optional context, for early cancelling of a job. It can be nil.
func (*Client) NewBistreamer ¶ added in v1.5.0
func (c *Client) NewBistreamer(bistreamerName string) (b *Bistreamer, err error)
NewBistream creates a new Bistreamer but does no communication yet. Just for setting up reader/writer goroutines before everything starts.
func (*Client) NewDownloader ¶ added in v1.6.0
func (*Client) OneWaySend ¶
OneWaySend sends a message without expecting or waiting for a response. The cancelJobCh is optional, and can be nil. If msg.HDR.CallID is set, we will preserve it.
func (*Client) RequestBistreaming ¶ added in v1.3.0
func (*Client) RequestDownload ¶ added in v1.3.0
func (*Client) RsyncClientSide ¶ added in v1.6.0
func (cli *Client) RsyncClientSide()
func (*Client) SendAndGetReply ¶
func (c *Client) SendAndGetReply(req *Message, cancelJobCh <-chan struct{}) (reply *Message, err error)
SendAndGetReply starts a round-trip RPC call. We will wait for a response before retuning. The requestStopCh is optional; it can be nil. A context.Done() like channel can be supplied there to cancel the job before a reply comes back.
UPDATE: a DEFAULT timeout is in force now. Because server failure or blink (down then up) can leave us stalled forever, we put in a default timeout of 10 seconds, if not otherwise specified. If you expect your call to take more than a few seconds, you should set the timeout directly with SendAndGetReplyWithTimeout() or pass in a cancelJobCh here to manage it. Otherwise, to handle the common case when we expect very fast replies, if cancelJobCh is nil, we will cancel the job if it has not finished after 10 seconds.
func (*Client) SendAndGetReplyWithCtx ¶ added in v1.2.4
func (c *Client) SendAndGetReplyWithCtx(ctx context.Context, req *Message) (reply *Message, err error)
SendAndGetReplyWithCtx is like SendAndGetReply(), with the additional feature that it will send a remote cancellation request when the ctx is cancelled or if the ctx.Deadline() time (if set) is surpassed. Note that none of the Values inside ctx, if set, will be transmitted to the remote call, because the context.Context API provides no method to enumerate them. Such values are likely not serializable in any case.
A similar deadline effect can be acheived just by setting the req.HDR.Deadline field in a SendAndGetReply() call. This may also be more efficient on the client, because the client need not wait for the remote cancellation response to be sent and received. However this is a little racey: the server could suceed and be in the process of replying when the client hits the deadline. In this case the client might retry a call that actually did finish, and end up doing the call twice. This is also always a hazard with servers crashing before they can finish responding. Ideally server APIs are idempotent to guard against this.
If the req.HDR.Deadline is already set (not zero), then we do not touch it. If it is zero and ctx has a deadline, we set it as the req.HDR.Deadline. We leave it to the user to coordinate/update these two ways of setting a dealine, knowing that the req.HDR.Deadline will win, if set.
func (*Client) SendAndGetReplyWithTimeout ¶
func (c *Client) SendAndGetReplyWithTimeout(timeout time.Duration, req *Message) (reply *Message, err error)
SendAndGetReplyWithTimeout expires the call after timeout.
func (*Client) Start ¶ added in v1.1.15
Start dials the server. That is, Start attemps to connect to config.ClientDialToHostPort. The err will come back with any problems encountered.
func (*Client) UngetOneRead ¶ added in v1.5.3
func (*Client) UngetReads ¶
UngetReads reverses what GetReads does: un-register and have ch be deaf from now on. Idempotent: if ch is already gone, no foul is reported.
func (*Client) UploadBegin ¶ added in v1.3.0
func (c *Client) UploadBegin( ctx context.Context, serviceName string, msg *Message, ) (strm *Uploader, err error)
UploadBegin sends the msg to the server to execute with the func that has registed with RegisterUploaderReadererFunc() -- at the moment there can only be one such func registered at a time. UploadBegin() will contact it, and Uploader.UploadMore() will, as it suggests, send another Message.
We maintain FIFO arrival of Messages at the server as follows (despite having each server side func callback executing in a goroutine).
1. Since the client side uses the same channel into the send loop for both UploadBegin and UploadMore, these calls will be properly ordered into the send loop on the client/sending side.
2. The TCP/QUIC stream maintains FIFO order of its messages as it delivers them to the server.
3. On the server, in the TCP/QUIC read loop, we queue messages in FIFO order into a large buffered channel before we spin up a goroutine once at UploadBegin time to handle all the subsequent messages in the order they were queued.
This also yeilds an efficient design. While normal OneWayFunc and TwoWayFunc messages each start their own new goroutine to avoid long-running functions starving the server's read loop, a UploadReadFunc only utilizes a single new goroutine to process all messages sent in a stream.
type ClientCodec ¶ added in v1.0.42
type ClientCodec interface { WriteRequest(*Request, msgp.Encodable) error ReadResponseHeader(*Response) error ReadResponseBody(msgp.Decodable) error Close() error }
ClientCodec is part of the net/rpc API. Its docs:
A ClientCodec implements writing of RPC requests and reading of RPC responses for the client side of an RPC session. The client calls [ClientCodec.WriteRequest] to write a request to the connection and calls [ClientCodec.ReadResponseHeader] and [ClientCodec.ReadResponseBody] in pairs to read responses. The client calls [ClientCodec.Close] when finished with the connection. ReadResponseBody may be called with a nil argument to force the body of the response to be read and then discarded. See NewClient's comment for information about concurrent access.
type Config ¶
type Config struct { // ServerAddr host:port where the server should listen. ServerAddr string // optional. Can be used to suggest that the // client use a specific host:port. NB: For QUIC, by default, the client and // server will share the same port if they are in the same process. // In that case this setting will definitely be ignored. ClientHostPort string // Who the client should contact ClientDialToHostPort string // TCP false means TLS-1.3 secured. true here means do TCP only; with no encryption. TCPonly_no_TLS bool // UseQUIC cannot be true if TCPonly_no_TLS is true. UseQUIC bool // If true, then we do not share same UDP port between a QUIC // client and server (in the same process). Used // for testing client shutdown paths too. NoSharePortQUIC bool // path to certs/ like certificate // directory on the live filesystem. CertPath string // SkipVerifyKeys true allows any incoming // key to be signed by // any CA; it does not have to be ours. Obviously // this discards almost all access control; it // should rarely be used unless communication // with the any random agent/hacker/public person // is desired. SkipVerifyKeys bool ClientKeyPairName string // default "client" means use certs/client.crt and certs/client.key ServerKeyPairName string // default "node" means use certs/node.crt and certs/node.key // key. It must be 32 bytes (or more). Ideally // it should be generated from crypto/rand. // The `selfy -gensym outpath` command will // write 32 randomly bytes to output. PreSharedKeyPath string // These are timeouts for connection and transport tuning. // The defaults of 0 mean wait forever. // // Generally we want our send loops to wait forever because // if the cut off a send mid-message, it is hard to recover; // we don't pass back up the stack how much of the broken // message was sent, so the only thing we can do then is tear // down the connection pair and re-connect. It is much // better to just dedicate the sendLoops to writing for as // long as it takes than to set a WriteTimeout. ConnectTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration ServerSendKeepAlive time.Duration ClientSendKeepAlive time.Duration // CompressAlgo choices are in magic7.go; // The current choices are "" (default compression, "s2" at the moment), // or: "s2" (Klaus Post's faster SnappyV2, good for incompressibles); // "lz4", (a very fast compressor; see https://lz4.org/); // "zstd:01" (fastest setting for Zstandard, very little compression); // "zstd:03", (the Zstandard 'default' level; slower but more compression); // "zstd:07", (even more compression, even slower); // "zstd:11", (slowest version of Zstandard, the most compression). // // Note! empty string means we use DefaultUseCompressAlgo // (at the top of cli.go), which is currently "s2". // To turn off compression, you must use the // CompressionOff setting. CompressAlgo string CompressionOff bool // Intially speak HTTP and only // accept CONNECT requests that // we turn into our protocol. // Only works with TCPonly_no_TLS true also, // at the moment. Also adds on another // round trip. HTTPConnectRequired bool // contains filtered or unexported fields }
Config is the same struct type for both NewClient and NewServer setup.
Config says who to contact (for a client), or where to listen (for a server and/or client); and sets how strong a security posture we adopt.
Copying a Config is fine, but it should be a simple shallow copy to preserve the shared *sharedTransport struct. See/use the Config.Clone() method if in doubt.
nitty gritty details/dev note: the `shared` pointer here is the basis of port (and file handle) reuse where a single process can maintain a server and multiple clients in a "star" pattern. This only works with QUIC of course, and is one of the main reasons to use QUIC.
The shared pointer is reference counted and the underlying net.UDPConn is only closed when the last instance in use is Close()-ed.
type Downloader ¶ added in v1.3.0
type Downloader struct { ReadDownloadsCh <-chan *Message ErrorCh <-chan *Message // contains filtered or unexported fields }
Downloader is used when the client receives stream from server. It is returned by RequestDownload() or NewDownloader().
func (*Downloader) BeginDownload ¶ added in v1.6.0
func (b *Downloader) BeginDownload(ctx context.Context, path string) (err error)
func (*Downloader) CallID ¶ added in v1.3.0
func (s *Downloader) CallID() string
func (*Downloader) Name ¶ added in v1.3.0
func (s *Downloader) Name() string
func (*Downloader) Seqno ¶ added in v1.3.0
func (s *Downloader) Seqno() uint64
type Embed ¶ added in v1.1.0
type Embed struct {
Simple `zid:"0"`
}
Embed in example.go is part of the tests.
func (*Embed) DecodeMsg ¶ added in v1.1.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Embed) MarshalMsg ¶ added in v1.1.0
MarshalMsg implements msgp.Marshaler
func (Embed) Msgsize ¶ added in v1.1.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Embed) UnmarshalMsg ¶ added in v1.1.0
UnmarshalMsg implements msgp.Unmarshaler
func (*Embed) UnmarshalMsgWithCfg ¶ added in v1.1.0
type HDR ¶ added in v1.0.42
type HDR struct { // Nc is supplied to reveal the LocalAddr() or RemoteAddr() end points. // Do not read from, or write to, this connection; // that will cause the RPC connection to fail. Nc net.Conn `msg:"-"` Created time.Time `zid:"0"` // HDR creation time stamp. From string `zid:"1"` // originator host:port address. To string `zid:"2"` // destination host:port address. ServiceName string `zid:"11"` // registered name to call. // arguments/parameters for the call. should be short to keep the HDR small. // big stuff should be serialized in JobSerz. Args map[string]string `zid:"12"` Subject string `zid:"3"` // in net/rpc, the "Service.Method" ServiceName Seqno uint64 `zid:"4"` // user (client) set sequence number for each call (same on response). Typ CallType `zid:"5"` // see constants above. CallID string `zid:"6"` // 20 bytes pseudo random base-64 coded string (same on response). Serial int64 `zid:"7"` // system serial number LocalRecvTm time.Time `zid:"8"` // allow standard []byte oriented message to cancel too. Ctx context.Context `msg:"-"` // Deadline is optional, but if it is set on the client, // the server side context.Context will honor it. Deadline time.Time `zid:"9"` // if non-zero, set this deadline in the remote Ctx // The CallID will be identical on // all parts of the same stream. StreamPart int64 `zid:"10"` // NoSystemCompression turns off any usual // compression that the rpc25519 system // applies, for just sending this one Message. // // Not normally a needed (or a good idea), // this flag is for efficiency when the // user has implemented their own custom compression // scheme for the JobSerz data payload. // // By checking this flag, the system can // avoid wasting time attempting // to compress a second time; since the // user has, hereby, marked this Message // as incompressible. // // Not matched in reply compression; // this flag will not affect the usual // compression-matching in responses. // For those purposes, it is ignored. NoSystemCompression bool `zid:"13"` // contains filtered or unexported fields }
HDR provides header information and details about the transport. It is the first thing in every Message. It is public so that clients can understand the context of their calls. Traditional `net/rpc` API users can use the `ctx context.Context` first argument form of callback methods and get an *HDR with HDRFromContext() as in the README.md introduction. Reproduced here:
func (s *Service) GetsContext(ctx context.Context, args *Args, reply *Reply) error { if hdr, ok := HDRFromContext(ctx); ok { fmt.Printf("GetsContext called with HDR = '%v'; "+ "HDR.Nc.RemoteAddr() gives '%v'; HDR.Nc.LocalAddr() gives '%v'\n", hdr.String(), hdr.Nc.RemoteAddr(), hdr.Nc.LocalAddr()) } }
func HDRFromBytes ¶ added in v1.0.42
func HDRFromContext ¶ added in v1.2.7
HDRFromContext returns the User value stored in ctx, if any.
func HDRFromGreenpack ¶ added in v1.0.42
HDRFromGreenpack will unmarshal the header into the returned struct. The [greenpack format](https://github.com/glycerine/greenpack) is expected.
func (*HDR) AsGreenpack ¶ added in v1.0.42
AsGreenpack will marshall hdr into the o output bytes. The scratch bytes can be nil or reused and returned to avoid allocation. The [greenpack format](https://github.com/glycerine/greenpack) is used.
func (*HDR) DecodeMsg ¶ added in v1.0.42
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*HDR) Equal ¶ added in v1.0.42
Equal compares two *HDR structs field by field for structural equality
func (*HDR) MarshalMsg ¶ added in v1.0.42
MarshalMsg implements msgp.Marshaler
func (*HDR) Msgsize ¶ added in v1.0.42
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*HDR) UnmarshalMsg ¶ added in v1.0.42
UnmarshalMsg implements msgp.Unmarshaler
func (*HDR) UnmarshalMsgWithCfg ¶ added in v1.0.42
type Hello ¶ added in v1.1.30
type Hello struct {
Placeholder int `zid:"0"` // must have public field or greenpack will ignore it.
}
Hello in example.go is part of the tests.
func (*Hello) DecodeMsg ¶ added in v1.1.30
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Hello) MarshalMsg ¶ added in v1.1.30
MarshalMsg implements msgp.Marshaler
func (Hello) Msgsize ¶ added in v1.1.30
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Hello) UnmarshalMsg ¶ added in v1.1.30
UnmarshalMsg implements msgp.Unmarshaler
func (*Hello) UnmarshalMsgWithCfg ¶ added in v1.1.30
type InvalidRequest ¶ added in v1.1.0
type InvalidRequest struct {
Placeholder int `zid:"0"`
}
InvalidRequest used instead of struct{} since greenpack needs one member element.
func (*InvalidRequest) DecodeMsg ¶ added in v1.1.0
func (z *InvalidRequest) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (InvalidRequest) EncodeMsg ¶ added in v1.1.0
func (z InvalidRequest) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (InvalidRequest) MarshalMsg ¶ added in v1.1.0
func (z InvalidRequest) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (InvalidRequest) Msgsize ¶ added in v1.1.0
func (z InvalidRequest) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*InvalidRequest) UnmarshalMsg ¶ added in v1.1.0
func (z *InvalidRequest) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*InvalidRequest) UnmarshalMsgWithCfg ¶ added in v1.1.0
func (z *InvalidRequest) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type MatchHashPair ¶ added in v1.6.0
type MatchHashPair struct { A *RsyncChunk `zid:"0"` B *RsyncChunk `zid:"1"` }
func (*MatchHashPair) DecodeMsg ¶ added in v1.6.0
func (z *MatchHashPair) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*MatchHashPair) EncodeMsg ¶ added in v1.6.0
func (z *MatchHashPair) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*MatchHashPair) MarshalMsg ¶ added in v1.6.0
func (z *MatchHashPair) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*MatchHashPair) Msgsize ¶ added in v1.6.0
func (z *MatchHashPair) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*MatchHashPair) UnmarshalMsg ¶ added in v1.6.0
func (z *MatchHashPair) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*MatchHashPair) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *MatchHashPair) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type Message ¶
type Message struct { // HDR contains header information. HDR HDR `zid:"0"` // JobSerz is the "body" of the message. // The user provides and interprets this. JobSerz []byte `zid:"1"` // JobErrs returns error information from the server-registered // user-defined callback functions. JobErrs string `zid:"2"` // LocalErr is not serialized on the wire by the server. // It communicates only local (client/server side) information. // // Callback functions convey // errors in JobErrs (by returning an error); // or in-band within JobSerz. LocalErr error `msg:"-"` // DoneCh.WhenClosed will be closed on the client when the one-way is // sent or the round-trip call completes. // NewMessage() automatically allocates DoneCh correctly and // should be used when creating a new Message (on the client to send). DoneCh *loquet.Chan[Message] `msg:"-"` // contains filtered or unexported fields }
Message transports JobSerz []byte slices for the user, who can de-serialize them they wish. The HDR header field provides transport details.
func MessageFromGreenpack ¶ added in v1.0.28
MessageFromGreenpack unmarshals the by slice into a Message and returns it. The [greenpack format](https://github.com/glycerine/greenpack) is expected.
func NewMessage ¶
func NewMessage() *Message
NewMessage allocates a new Message with a DoneCh properly created.
func NewMessageFromBytes ¶
NewMessageFromBytes calls NewMessage() and sets by as the JobSerz field.
func (*Message) AsGreenpack ¶ added in v1.0.29
AsGreenpack marshalls m into o. The scratch workspace can be nil or reused to avoid allocation. The [greenpack format](https://github.com/glycerine/greenpack) is used. The m.JobSerz payload must be <= maxMessage-1024, or we will return ErrTooLarge without trying to serialize it.
func (*Message) AsJSON ¶ added in v1.1.54
AsJSON returns JSON bytes via msgp.CopyToJSON() or msgp.UnmarshalAsJSON()
func (*Message) DecodeMsg ¶ added in v1.0.26
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*Message) MarshalMsg ¶ added in v1.0.26
MarshalMsg implements msgp.Marshaler
func (*Message) Msgsize ¶ added in v1.0.26
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Message) UnmarshalMsg ¶ added in v1.0.26
UnmarshalMsg implements msgp.Unmarshaler
func (*Message) UnmarshalMsgWithCfg ¶ added in v1.0.26
type MustBeCancelled ¶ added in v1.2.0
type MustBeCancelled struct { // as greenpack efficiently does nothing without any member elements. Placeholder int `zid:"0"` }
The MustBeCancelled struct in example.go is part of the tests. See cli_test.go Test040 for details.
func NewMustBeCancelled ¶ added in v1.2.0
func NewMustBeCancelled() *MustBeCancelled
NewMustBeCancelled in example.go is part of the tests. See cli_test.go Test040 for details.
func (*MustBeCancelled) DecodeMsg ¶ added in v1.2.0
func (z *MustBeCancelled) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (MustBeCancelled) EncodeMsg ¶ added in v1.2.0
func (z MustBeCancelled) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (MustBeCancelled) MarshalMsg ¶ added in v1.2.0
func (z MustBeCancelled) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*MustBeCancelled) MessageAPI_HangUntilCancel ¶ added in v1.2.0
func (s *MustBeCancelled) MessageAPI_HangUntilCancel(req, reply *Message) error
MessageAPI_HangUntilCancel in example.go is part of the tests. See cli_test.go Test040 for details.
func (MustBeCancelled) Msgsize ¶ added in v1.2.0
func (z MustBeCancelled) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*MustBeCancelled) UnmarshalMsg ¶ added in v1.2.0
func (z *MustBeCancelled) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*MustBeCancelled) UnmarshalMsgWithCfg ¶ added in v1.2.0
func (z *MustBeCancelled) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
func (*MustBeCancelled) WillHangUntilCancel ¶ added in v1.2.0
WillHangUntilCancel in example.go is part of the tests. See cli_test.go Test040 for details.
type NetConnWrapper ¶ added in v1.0.5
type NetConnWrapper struct { quic.Stream quic.Connection }
NetConnWrapper is exported so that clients like `goq` and others that want to inspect that context of their calls can do so.
func (*NetConnWrapper) Close ¶ added in v1.1.33
func (w *NetConnWrapper) Close() error
type OneWayFunc ¶ added in v1.0.39
type OneWayFunc func(req *Message)
OneWayFunc is the simpler sibling to the above. A OneWayFunc will not return anything to the sender.
As above req.JobSerz [] byte contains the job payload.
type PerCallID_FileToDiskState ¶ added in v1.5.0
type PerCallID_FileToDiskState struct { CallID string T0 time.Time OverrideFilename string // if set, use instead of "readFile" in Args. FnameTmp string FnameFinal string Randomness string Fd *os.File BytesWrit int64 Blake3hash *myblake3.Blake3 PartsSeen map[int64]bool SeenCount int }
func NewPerCallID_FileToDiskState ¶ added in v1.5.0
func NewPerCallID_FileToDiskState(callID string) *PerCallID_FileToDiskState
func (*PerCallID_FileToDiskState) WriteOneMsgToFile ¶ added in v1.5.0
func (s *PerCallID_FileToDiskState) WriteOneMsgToFile(req *Message, suffix string, last bool) (err error)
type Reply ¶ added in v1.1.0
type Reply struct {
C int `zid:"0"`
}
Reply in example.go is part of the tests.
func (*Reply) DecodeMsg ¶ added in v1.1.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Reply) MarshalMsg ¶ added in v1.1.0
MarshalMsg implements msgp.Marshaler
func (Reply) Msgsize ¶ added in v1.1.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Reply) UnmarshalMsg ¶ added in v1.1.0
UnmarshalMsg implements msgp.Unmarshaler
func (*Reply) UnmarshalMsgWithCfg ¶ added in v1.1.0
type Request ¶ added in v1.0.42
type Request struct { ServiceMethod string `zid:"0"` // format: "Service.Method" Seq uint64 `zid:"1"` // sequence number chosen by client // contains filtered or unexported fields }
Request is part of the net/rpc API. Its docs:
Request is a header written before every RPC call. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.
func (*Request) DecodeMsg ¶ added in v1.0.42
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Request) MarshalMsg ¶ added in v1.0.42
MarshalMsg implements msgp.Marshaler
func (Request) Msgsize ¶ added in v1.0.42
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Request) UnmarshalMsg ¶ added in v1.0.42
UnmarshalMsg implements msgp.Unmarshaler
func (*Request) UnmarshalMsgWithCfg ¶ added in v1.0.42
type Response ¶ added in v1.0.42
type Response struct { ServiceMethod string `zid:"0"` // echoes that of the Request Seq uint64 `zid:"1"` // echoes that of the request Error string `zid:"2"` // error, if any. // contains filtered or unexported fields }
Response is part of the net/rpc API. Its docs:
Response is a header written before every RPC return. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.
func (*Response) DecodeMsg ¶ added in v1.0.42
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Response) MarshalMsg ¶ added in v1.0.42
MarshalMsg implements msgp.Marshaler
func (Response) Msgsize ¶ added in v1.0.42
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Response) UnmarshalMsg ¶ added in v1.0.42
UnmarshalMsg implements msgp.Unmarshaler
func (*Response) UnmarshalMsgWithCfg ¶ added in v1.0.42
type RsyncChunk ¶ added in v1.6.0
type RsyncChunk struct { ChunkNumber int `zid:"0"` // zero based index into Chunks slice. Beg int `zid:"1"` Endx int `zid:"2"` Hash string `zid:"3"` Len int `zid:"4"` }
func (*RsyncChunk) DecodeMsg ¶ added in v1.6.0
func (z *RsyncChunk) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncChunk) EncodeMsg ¶ added in v1.6.0
func (z *RsyncChunk) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncChunk) MarshalMsg ¶ added in v1.6.0
func (z *RsyncChunk) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncChunk) Msgsize ¶ added in v1.6.0
func (z *RsyncChunk) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncChunk) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncChunk) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncChunk) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncChunk) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncDiff ¶ added in v1.6.0
type RsyncDiff struct { HostA string `zid:"0"` PathA string `zid:"1"` HashesA *RsyncHashes `zid:"2"` HostB string `zid:"3"` PathB string `zid:"4"` HashesB *RsyncHashes `zid:"5"` Both []*MatchHashPair `zid:"6"` OnlyA []*RsyncChunk `zid:"7"` OnlyB []*RsyncChunk `zid:"8"` }
func (*RsyncDiff) DecodeMsg ¶ added in v1.6.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncDiff) MarshalMsg ¶ added in v1.6.0
MarshalMsg implements msgp.Marshaler
func (*RsyncDiff) Msgsize ¶ added in v1.6.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncDiff) UnmarshalMsg ¶ added in v1.6.0
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncDiff) UnmarshalMsgWithCfg ¶ added in v1.6.0
type RsyncHashes ¶ added in v1.6.0
type RsyncHashes struct { // uniquely idenitify this hash set. Rsync0CallID string `zid:"16"` IsFromSender bool `zid:"17"` Created time.Time `zid:"18"` Host string `zid:"0"` Path string `zid:"1"` ModTime time.Time `zid:"2"` FileSize int64 `zid:"3"` FileMode uint32 `zid:"4"` FileOwner string `zid:"5"` FileOwnerID uint32 `zid:"6"` FileGroup string `zid:"7"` FileGroupID uint32 `zid:"8"` // other data, extension mechanism. Not used presently. FileMeta []byte `zid:"9"` // HashName is e.g. "blake3.32B" HashName string `zid:"10"` FullFileHashSum string `zid:"11"` // ChunkerName is e.g. "fastcdc", or "ultracdc" ChunkerName string `zid:"12"` CDC_Config *jcdc.CDC_Config `zid:"13"` // NumChunks gives len(Chunks) for convenience. NumChunks int `zid:"14"` Chunks []*RsyncChunk `zid:"15"` }
RsyncHashes stores CDC (Content Dependent Chunking) chunks for a given Path on a given Host, using a specified chunking algorithm (e.g. "jcdc"), its parameters, and a specified hash function (e.g. "blake3.32B"
func SummarizeBytesInCDCHashes ¶ added in v1.6.0
func SummarizeFileInCDCHashes ¶ added in v1.6.0
func SummarizeFileInCDCHashes(host, path string) (hashes *RsyncHashes, err error)
func (*RsyncHashes) DecodeMsg ¶ added in v1.6.0
func (z *RsyncHashes) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncHashes) Diff ¶ added in v1.6.0
func (a *RsyncHashes) Diff(b *RsyncHashes) (d *RsyncDiff)
func (*RsyncHashes) EncodeMsg ¶ added in v1.6.0
func (z *RsyncHashes) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncHashes) MarshalMsg ¶ added in v1.6.0
func (z *RsyncHashes) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncHashes) Msgsize ¶ added in v1.6.0
func (z *RsyncHashes) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncHashes) String ¶ added in v1.6.0
func (h *RsyncHashes) String() string
func (*RsyncHashes) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncHashes) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncHashes) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncHashes) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncStep0_ClientRequestsRead ¶ added in v1.6.0
type RsyncStep0_ClientRequestsRead struct { ReaderHost string `zid:"0"` ReaderPath string `zid:"1"` ReaderLenBytes int64 `zid:"2"` ReaderModTime time.Time `zid:"3"` // if available/cheap, send ReaderFullHash string `zid:"4"` }
rsync operation for a single file. Steps and the structs that go with each step:
NB: the client always has to start a request; the server cannot reach out to a client. But the client can still request a file, and thus request to be the reader.
Optional step 0: client requests to be the reader. The server should send the file requested.
Or in step 1: client requests to send a file. The client can begin immediately with step 1, there really is no step 0 when the client is sending. The client just sends the RsyncStep1_SenderOverview.
So only if the client wants to read a file from the server does the client need to send this:
func (*RsyncStep0_ClientRequestsRead) DecodeMsg ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncStep0_ClientRequestsRead) EncodeMsg ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncStep0_ClientRequestsRead) MarshalMsg ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncStep0_ClientRequestsRead) Msgsize ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncStep0_ClientRequestsRead) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncStep0_ClientRequestsRead) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncStep0_ClientRequestsRead) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncStep1_SenderOverview ¶ added in v1.6.0
type RsyncStep1_SenderOverview struct { SenderHost string `zid:"0"` SenderPath string `zid:"1"` SenderLenBytes int64 `zid:"2"` SenderModTime time.Time `zid:"3"` // if available/cheap, send SenderFullHash string `zid:"4"` }
1) sender sends path, length, mod time of file. Sender sends RsyncStep1_SenderOverview to reader. This starts the first of two round-trips. Note only 0 or 1 are are "RPC" like-calls in our lingo. The other steps/Messages (2,3,4) are one-ways with the same CallID.
Note that the server will also send a final CallRPCReply when the Call from 0 or 1 finishes; but it should carry no content; the cli still needs to process 4 even if it gets such a reply during a client read. In detail:
So our rsync-like protocol is either:
for client read:
cli(0)->srv(1)->cli(2)->srv(3 + CallRPCReply to 0)->cli(4); or
for client send:
cli(1)->srv(2)->cli(3)->srv(4 + CallRPCReply to 1);
This means that the server has to be ready to listen for and handle 1,2,3,4; while the client has to listen for and handle 2,3,4. The client is always the one sending 0 or 1.
(We use the same code on both cli and srv to processes these, to keep symmetrical correctness.)
To do step 1 (client acts as sender, it sends: .
func (*RsyncStep1_SenderOverview) DecodeMsg ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncStep1_SenderOverview) EncodeMsg ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncStep1_SenderOverview) MarshalMsg ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncStep1_SenderOverview) Msgsize ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncStep1_SenderOverview) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncStep1_SenderOverview) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncStep1_SenderOverview) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncStep2_AckOverview ¶ added in v1.6.0
type RsyncStep2_AckOverview struct { // if true, no further action needed. // ReaderHashes can be nil then. ReaderMatchesSenderAllGood bool `zid:"0"` ReaderHashes *RsyncHashes `zid:"1"` }
2) receiver/reader end gets path to the file, its length and modification time stamp. If length and time stamp math, stop. Ack back all good. Else ack back with RsyncHashes, "here are the chunks I have" and the whole file checksum. Reader replies to sender with RsyncStep2_AckOverview.
func (*RsyncStep2_AckOverview) DecodeMsg ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncStep2_AckOverview) EncodeMsg ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncStep2_AckOverview) MarshalMsg ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncStep2_AckOverview) Msgsize ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncStep2_AckOverview) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncStep2_AckOverview) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncStep2_AckOverview) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncStep3_SenderProvidesDeltas ¶ added in v1.6.0
type RsyncStep3_SenderProvidesDeltas struct { SenderHashes *RsyncHashes `zid:"0"` // needs to be streamed too. ChunkDiff *RsyncDiff `zid:"1"` DeltaHashesPreCompression []string `zid:"2"` CompressionAlgo string `zid:"3"` // Bistream this separately as a file, // because it can be so big; if we have // no diff compression available it will // be the whole file anyway(!) // Hence we'll want to add compression to // the bistream download/upload actions. //DeltaData [][]byte `zid:"4"` DeltaDataStreamedPath string `zid:"4"` }
3) sender chunks the file, does the diff, and then sends along just the changed chunks, along with the chunk structure of the file on the sender so reader can reassemble it; and the whole file checksum. Sender sends RsyncStep3_SenderProvidesDeltas to reader.
This step rsync may well send a very large message, much larger than our 1MB or so maxMessage size. So rsync may need to use a Bistream that can handle lots of separate messages and reassemble them. For that matter, the RsyncHashes in step 2 can be large too. As a part of the rsync protocol we want to be able to send "large files" that are actually large, streamed messages. We observe these may need to be backed by disk rather than memory to keep memory requirements sane.
Thought/possibility: we could save them to /tmp since that might be memory backed or storage backed. Make that an option, but if we use a streaming Bistream download to disk then we'll handle the large message of step 2/3 problem.
The thing is, we would like to use rsync underneath the bistream of a big file transparently. Circular. Ideally the rsync part can be used transparently by any streaming large file need. Lets start by layering rsync on top of Bistreaming, but we can add a separate header idea of a whole message worth of meta data for the stream file that can give the rsync step message so we know what to do with the file. TODO: Add compression to the built in Download/upload protocols; use a Args["compression"] setting to indicate how to uncompress it before writing to disk. That would mean leaving the headers uncompressed! Right now they are serialized inline with the message.
func (*RsyncStep3_SenderProvidesDeltas) DecodeMsg ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncStep3_SenderProvidesDeltas) EncodeMsg ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncStep3_SenderProvidesDeltas) MarshalMsg ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncStep3_SenderProvidesDeltas) Msgsize ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncStep3_SenderProvidesDeltas) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncStep3_SenderProvidesDeltas) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncStep3_SenderProvidesDeltas) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type RsyncStep4_ReaderAcksDeltasFin ¶ added in v1.6.0
type RsyncStep4_ReaderAcksDeltasFin struct { ReaderHost string `zid:"0"` ReaderPath string `zid:"1"` ReaderLenBytes int64 `zid:"2"` ReaderModTime time.Time `zid:"3"` ReaderFullHash string `zid:"4"` }
4) reader gets the diff, the changed chunks (DeltaData), and it already has the current file structure; write out a new file with the correct chunks in the correct order. (Decompressing chunks before writing them). Reader verifies the final blake3 checksum, and sets the new ModTime on the file. Reader does a final ack of sending back RsyncStep4_ReaderAcksDeltasFin to the sender. This completes the rsync operation, which took two round-trips from sender to reader.
func (*RsyncStep4_ReaderAcksDeltasFin) DecodeMsg ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*RsyncStep4_ReaderAcksDeltasFin) EncodeMsg ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RsyncStep4_ReaderAcksDeltasFin) MarshalMsg ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*RsyncStep4_ReaderAcksDeltasFin) Msgsize ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsg ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
func (*RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsgWithCfg ¶ added in v1.6.0
func (z *RsyncStep4_ReaderAcksDeltasFin) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
type ServeBistreamState ¶ added in v1.4.7
type ServeBistreamState struct{}
ServeBistreamState is used by Test065_bidirectional_streaming test.
func (*ServeBistreamState) ServeBistream ¶ added in v1.4.7
func (bi *ServeBistreamState) ServeBistream( srv *Server, ctx context.Context, req *Message, uploadsFromClientCh <-chan *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
ServeBistream is an example of a BistreamFunc, a server side registered function for bi-streaming (doing both upload and download simultaneously). See cli_test.go Test065_bidirectional_download_and_upload for a test that uses this method.
type Server ¶
type Server struct { // RemoteConnectedCh sends the remote host:port address // when the server gets a new client, // See srv_test.go Test004_server_push for example, // where it is used to avoid a race/panic. RemoteConnectedCh chan *ServerClient // contains filtered or unexported fields }
Servers read and respond to requests. Two APIs are available.
Using the rpc25519.Message based API:
Register1Func() and Register2Func() register callbacks.
Using the net/rpc API:
Server.Register() registers structs with callback methods on them.
The net/rpc API is implemented as a layer on top of the rpc25519.Message based API. Both can be used concurrently if desired.
func NewServer ¶
NewServer will keep its own copy of config. If config is nil, the server will make its own upon Start().
func (*Server) Register ¶ added in v1.0.42
Register implements the net/rpc Server.Register() API. Its docs:
Register publishes in the server the set of methods of the receiver value that satisfy the following conditions:
- exported method of exported type
- two arguments, both of exported type
- the second argument is a pointer
- one return value, of type error
It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.
rpc25519 addendum:
Callback methods in the `net/rpc` style traditionally look like this first `NoContext` example below. We now allow a context.Context as an additional first parameter. The ctx will a pointer to the `rpc25519.HDR` header from the incoming Message set on it. Call rpc25519.HDRFromContext() to retreive it.
func (s *Service) NoContext(args *Args, reply *Reply) error
* new:
func (s *Service) GetsContext(ctx context.Context, args *Args, reply *Reply) error { if hdr, ok := rpc25519.HDRFromContext(ctx); ok { fmt.Printf("GetsContext called with HDR = '%v'; "+ "HDR.Nc.RemoteAddr() gives '%v'; HDR.Nc.LocalAddr() gives '%v'\n", h.String(), h.Nc.RemoteAddr(), h.Nc.LocalAddr()) } else { fmt.Println("HDR not found") } }
func (*Server) Register1Func ¶ added in v1.0.39
func (s *Server) Register1Func(serviceName string, callme1 OneWayFunc)
Register1Func tells the server about a func or method that will not reply. See the OneWayFunc definition.
func (*Server) Register2Func ¶ added in v1.0.39
func (s *Server) Register2Func(serviceName string, callme2 TwoWayFunc)
Register2Func tells the server about a func or method that will have a returned Message value. See the TwoWayFunc definition.
func (*Server) RegisterBistreamFunc ¶ added in v1.3.5
func (s *Server) RegisterBistreamFunc(name string, callme BistreamFunc)
func (*Server) RegisterName ¶ added in v1.0.42
RegisterName is like [Register] but uses the provided name for the type instead of the receiver's concrete type.
func (*Server) RegisterServerSendsDownloadFunc ¶ added in v1.3.0
func (s *Server) RegisterServerSendsDownloadFunc(name string, callme ServerSendsDownloadFunc)
func (*Server) RegisterUploadReaderFunc ¶ added in v1.3.0
func (s *Server) RegisterUploadReaderFunc(name string, callmeUploadReader UploadReaderFunc)
RegisterUploadReaderFunc tells the server about a func or method to handle uploads. See the UploadReaderFunc definition.
func (*Server) RsyncServerSide ¶ added in v1.6.0
func (Server *Server) RsyncServerSide( srv *Server, ctx context.Context, req *Message, uploadsFromClientCh <-chan *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
RsyncServerSide implements the server side of our rsync-like protocol. It is a BistreamFunc.
func (*Server) SendMessage ¶
func (s *Server) SendMessage(callID, subject, destAddr string, data []byte, seqno uint64, errWriteDur *time.Duration) error
SendMessage can be used on the server to push data to one of the connected clients.
The Message msg should have msg.JobSerz set, as well as the HDR fields Subject, CallID, and Seqno. The NewCallID() can be used to generate a random (without conflicts with prior CallID if needed/not matching a previous call.
If the HDR.To destination address is not already connected to the server, the ErrNetConnectionNotFound error will be returned.
errWriteDur is how long we pause waiting for the writing goroutine to send the message or give us a fast error reply. Early discovery of client disconnect can allow us to try other (worker) clients, rather than wait for pings or other slow error paths.
The errWriteDur can be set to a few seconds if this would save the caller a minute of two of waiting to discover the send is unlikely to suceed; or to time.Duration(0) if they want no pause after writing Message to the connection. The default is 30 msec. It is a guess and aims at balance: allowing enough time to get an error back from quic-go if we are going to discover "Application error 0x0 (remote)" right away, and not wanting to stall the caller too much.
func (*Server) SendOneWayMessage ¶ added in v1.3.0
func (s *Server) SendOneWayMessage(ctx context.Context, msg *Message, errWriteDur *time.Duration) error
SendOneWayMessage is the same as SendMessage above except that it takes a fully prepared msg to avoid API churn when new HDR fields are added/needed. msg.HDR.Type must be >= CallOneWay (10), to try and catch mis-use of this when the user actually wants a round-trip call.
func (*Server) ServeHTTP ¶ added in v1.1.33
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
ServeHTTP implements an http.Handler that answers RPC requests.
type ServerClient ¶ added in v1.0.135
type ServerClient struct { Remote string GoneCh chan struct{} }
type ServerCodec ¶ added in v1.0.42
type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(msgp.Decodable) error WriteResponse(*Response, msgp.Encodable) error // Close can be called multiple times and must be idempotent. Close() error }
ServerCodec is part of the net/rpc API. Its docs:
A ServerCodec implements reading of RPC requests and writing of RPC responses for the server side of an RPC session. The server calls [ServerCodec.ReadRequestHeader] and [ServerCodec.ReadRequestBody] in pairs to read requests from the connection, and it calls [ServerCodec.WriteResponse] to write a response back. The server calls [ServerCodec.Close] when finished with the connection. ReadRequestBody may be called with a nil argument to force the body of the request to be read and discarded. See NewClient's comment for information about concurrent access.
type ServerError ¶ added in v1.0.42
type ServerError string
ServerError represents an error that has been returned from the remote side of the RPC connection.
func (ServerError) Error ¶ added in v1.0.42
func (e ServerError) Error() string
type ServerSendsDownloadFunc ¶ added in v1.3.0
type ServerSendsDownloadFunc func( srv *Server, ctx context.Context, req *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
ServerSendsDownloadFunc is used to send a stream to the client on the streamToClientChan. Use Server.RegisterServerSendsDownloadFunc() to register it.
type ServerSendsDownloadState ¶ added in v1.3.0
type ServerSendsDownloadState struct{}
func NewServerSendsDownloadState ¶ added in v1.6.0
func NewServerSendsDownloadState() *ServerSendsDownloadState
func (*ServerSendsDownloadState) ServerSendsDownload ¶ added in v1.3.0
func (ssss *ServerSendsDownloadState) ServerSendsDownload( srv *Server, ctx context.Context, req *Message, sendDownloadPartToClient func(ctx context.Context, msg *Message, last bool) error, lastReply *Message, ) (err error)
ServerSendsDownload is used by cmd/srv/server.go; so when
srv -serve
serves downloads to
cli -download path
ServerSendsDownload has type ServerSendsDownloadFunc, and gets registered on the server with srv.RegisterServerSendsDownloadFunc().
type ServerSendsDownloadStateTest ¶ added in v1.6.0
type ServerSendsDownloadStateTest struct{}
func NewServerSendsDownloadStateTest ¶ added in v1.6.0
func NewServerSendsDownloadStateTest() *ServerSendsDownloadStateTest
func (*ServerSendsDownloadStateTest) ServerSendsDownloadTest ¶ added in v1.6.0
func (ssss *ServerSendsDownloadStateTest) ServerSendsDownloadTest(srv *Server, ctx context.Context, req *Message, sendStreamPart func(ctx context.Context, msg *Message, last bool) error, lastReply *Message) (err error)
ServerSendsDownload is used by Test055_streaming_server_to_client. It demonstrates how a registered server func can stream to the client. ServerSendsDownload has type ServerSendsDownloadFunc, and gets registered on the server with srv.RegisterServerSendsDownloadFunc().
type ServerSideUploadState ¶ added in v1.4.7
type ServerSideUploadState struct {
// contains filtered or unexported fields
}
ServerSideUploadState is used by Test045_streaming_client_to_server (upload) in cli_test.go and cmd/srv/server.go to demonstrate streaming a large (or infinite) file in small parts, from client to server, all while keeping FIFO message order.
func NewServerSideUploadState ¶ added in v1.4.7
func NewServerSideUploadState() *ServerSideUploadState
NewServerSideUploadState returns a new ServerSideUploadState. This is part of the cli_test.go Test045 mechanics.
func (*ServerSideUploadState) ReceiveFileInParts ¶ added in v1.4.7
func (st *ServerSideUploadState) ReceiveFileInParts(ctx context.Context, req *Message, lastReply *Message, deadCallID string) (err error)
ReceiveFileInParts is used by Test045_streaming_client_to_server in cli_test.go to demonstrate streaming from client to server.
See the cmd/cli/client.go and cmd/srv/server.go and their (cli -sendfile) and (srv -readfile) flags for a full implementation of an scp-like utility that uses this method.
ReceiveFileInParts is an UploadReaderFunc and is registered on the Server with the Server.RegisterUploadReaderFunc() call.
Notice that since we get a callback-per-message, and these messages can be from different clients, we must track the state of each of them in their own PerCallIDUploadState. This is a counterpoint in the design space: compare the Bistream and Download versions that are invoked once per call initiation. We have to manage the state of all clients, whereas they have to handle channels.
What happens if the client connection goes down? We still have state here. We'd like it to get cleaned up if the connection is lost. We use the deadCallID for that. It it is set, then clean up after that CallID and return. ctx, req, and lastReply will be nil, so no other work is possible.
type Simple ¶ added in v1.1.0
type Simple int
Simple in example.go is part of the tests.
func (*Simple) DecodeMsg ¶ added in v1.1.0
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (Simple) MarshalMsg ¶ added in v1.1.0
MarshalMsg implements msgp.Marshaler
func (Simple) Msgsize ¶ added in v1.1.0
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Simple) UnmarshalMsg ¶ added in v1.1.0
UnmarshalMsg implements msgp.Unmarshaler
func (*Simple) UnmarshalMsgWithCfg ¶ added in v1.1.0
type TwoWayFunc ¶ added in v1.0.39
TwoWayFunc is the user's own function that they register with the server for remote procedure calls.
The user's Func may not want to return anything. In that case they should register a OneWayFunc instead.
req.JobSerz []byte contains the job payload.
Implementers of TwoWayFunc should assign their return []byte to reply.JobSerz. reply.Jobserz can also be left nil, of course.
Any errors can be returned on reply.JobErrs; this is optional. Note that JobErrs is a string value rather than an error.
The system will overwrite the reply.HDR.{To,From} fields when sending the reply, so the user should not bother setting those. The one exception to this rule is the reply.HDR.Subject string, which can be set by the user to return user-defined information. The reply will still be matched to the request on the HDR.Seqno, so a change of HDR.Subject will not change which goroutine receives the reply.
type UploadReaderFunc ¶ added in v1.3.0
type UploadReaderFunc func(ctx context.Context, req *Message, lastReply *Message, deadCallID string) error
A UploadReaderFunc receives messages from a Client's upload. It corresponds to the client-side Uploader, created by Client.UploadBegin().
For a quick example, see the ReceiveFileInParts() implementation in the example.go file. It is a method on the ServerSideUploadFunc struct that holds state between the callbacks to ReceiveFileInParts().
A UploadReaderFunc is like a OneWayFunc, but it generally should also be a method or closure to capture the state it needs, as it will receive multiple req *Message up-calls from the same client Stream. It should return a non-nil error to tell the client to stop sending. A nil return means we are fine and want to continue to receive more Messages from the same Stream. The req.HDR.CallID can be used to identify distinct Streams, and the req.HDR.StreamPart will convey their order which will start at 0 and count up.
The lastReply argument will be nil until the Client calls Stream.More() with the last argument set to true. The user/client is telling the UploadReaderFunc not to expect any further messages. The UploadReaderFunc can then fill in the lastReply message with any finishing detail, and it will be sent back to the client.
Note that even when lastReply is not nil, req may still have the tail content of the stream, and so generally req should be processed before considering if this is the last message and a final lastReply should also be filled out.
For cleanup/avoiding memory leaks: If deadCallID is not the empty string, then the connection for this CallID has died and we should cleanup its resources. ctx, req, and lastReply will all be nil in this case.
type Uploader ¶ added in v1.3.0
type Uploader struct { ReadCh <-chan *Message ErrorCh <-chan *Message // contains filtered or unexported fields }
Uploader helps the client to make a series of non-blocking (one-way) calls to a remote server's UploadReaderFunc which must have been already registered on the server.