Documentation ¶
Overview ¶
Package swp implements the same Sliding Window Protocol that TCP uses for flow-control and reliable, ordered delivery.
The Nats event bus (https://nats.io/) is a software model of a hardware multicast switch. Nats provides multicast, but no guarantees of delivery and no flow-control. This works fine as long as your downstream read/subscribe capacity is larger than your publishing rate.
If your nats publisher evers produces faster than your subscriber can keep up, you may overrun your buffers and drop messages. If your sender is local and replaying a disk file of traffic over nats, you are guanateed to exhaust even the largest of the internal nats client buffers. In addition you may wish guaranteed order of delivery (even with dropped messages), which swp provides.
Hence swp was built to provide flow-control and reliable, ordered delivery on top of the nats event bus. It reproduces the TCP sliding window and flow-control mechanism in a Session between two nats clients. It provides flow control between exactly two nats endpoints; in many cases this is sufficient to allow all subscribers to keep up. If you have a wide variation in consumer performance, establish the rate-controlling swp Session between your producer and your slowest consumer.
There is also a Session.RegisterAsap() API that can be used to obtain possibly-out-of-order and possibly-duplicated but as-soon-as-possible delivery (similar to that which nats give you natively), while retaining the flow-control required to avoid client-buffer overrun. This can be used in tandem with the main always-ordered-and-lossless API if so desired.
Index ¶
- Variables
- func Blake2bOfBytes(by []byte) []byte
- func GetSubscripCap(s *nats.Subscription) (bytecap int64, msgcap int64)
- func HistoryEqual(a, b []*Packet) bool
- func InWindow(seqno, min, max int64) bool
- func NewSessionNonce() string
- func SetSubscriptionLimits(sub *nats.Subscription, msgLimit int64, bytesLimit int64) error
- func StartGnatsd(host string, port int) (*server.Server, error)
- type AsapHelper
- type BigFile
- type ByteAccount
- type CertConfig
- type Clock
- type ConnectReq
- type EventRingBuf
- func (b *EventRingBuf) AddEventCheckOverflow(e time.Time) bool
- func (b *EventRingBuf) Adopt(me []time.Time)
- func (b *EventRingBuf) Advance(n int)
- func (b *EventRingBuf) ReadPtrs(p []time.Time) (n int, err error)
- func (b *EventRingBuf) ReadWithoutAdvance(p []time.Time) (n int, err error)
- func (b *EventRingBuf) Reset()
- func (r *EventRingBuf) String() string
- func (b *EventRingBuf) TwoContig(makeCopy bool) (first []time.Time, second []time.Time)
- func (b *EventRingBuf) WritePtrs(p []time.Time) (n int, err error)
- type Flow
- type FlowCtrl
- type InOrderSeq
- type NatsClient
- type NatsClientConfig
- type NatsNet
- type Network
- type Packet
- type RTT
- type ReadRequest
- type RealClock
- type RecvState
- func (r *RecvState) Close(zr *closeReq) error
- func (r *RecvState) Connect(dest string, simulateUnderTestLostSynCount int, timeout time.Duration, ...) (remoteNonce string, err error)
- func (r *RecvState) HeldAsString() string
- func (r *RecvState) Start() error
- func (r *RecvState) Stop()
- func (r *RecvState) UpdateControl(pack *Packet)
- type RxqSlot
- type SWP
- type SenderState
- func (s *SenderState) ComputeInflight() (bytesInflight int64, msgInflight int64)
- func (s *SenderState) GetDeadlineDur(flow Flow) time.Duration
- func (s *SenderState) GetErr() (err error)
- func (s *SenderState) GetRecvLastFrameClientConsumed() int64
- func (s *SenderState) SetErr(err error)
- func (s *SenderState) SetRecvLastFrameClientConsumed(nfe int64)
- func (s *SenderState) Start(sess *Session)
- func (s *SenderState) Stop()
- func (s *SenderState) UpdateRTT(pack *Packet)
- type Session
- func (sess *Session) Close() error
- func (s *Session) Connect(dest string) error
- func (s *Session) ConnectIfNeeded(dest string, simulateLostSynCount int) error
- func (s *Session) CountPacketsReadConsumed() int64
- func (s *Session) CountPacketsSentForTransfer() int64
- func (s *Session) GetErr() (err error)
- func (s *Session) IncrPacketsReadConsumed(n int64) int64
- func (s *Session) IncrPacketsSentForTransfer(n int64) int64
- func (s *Session) IncrementClockOnReceiveForTesting()
- func (s *Session) Push(pack *Packet)
- func (s *Session) Read(fillme []byte) (n int, err error)
- func (sess *Session) RecvFile() (*BigFile, error)
- func (s *Session) RegisterAsap(rcvUnordered chan *Packet, limit int64) error
- func (s *Session) SelfConsumeForTesting()
- func (sess *Session) SendFile(path string, writeme []byte, tm time.Time) (*BigFile, error)
- func (s *Session) SetConnectDefaults()
- func (s *Session) SetErr(err error)
- func (s *Session) Stop()
- func (s *Session) Write(payload []byte) (n int, err error)
- type SessionConfig
- type SimClock
- type SimNet
- type SubReport
- type Sum
- type SynAckAck
- type TcpAction
- func (z *TcpAction) DecodeMsg(dc *msgp.Reader) (err error)
- func (z TcpAction) EncodeMsg(en *msgp.Writer) (err error)
- func (z TcpAction) MarshalMsg(b []byte) (o []byte, err error)
- func (z TcpAction) Msgsize() (s int)
- func (i TcpAction) String() string
- func (z *TcpAction) UnmarshalMsg(bts []byte) (o []byte, err error)
- type TcpEvent
- func (z *TcpEvent) DecodeMsg(dc *msgp.Reader) (err error)
- func (z TcpEvent) EncodeMsg(en *msgp.Writer) (err error)
- func (z TcpEvent) MarshalMsg(b []byte) (o []byte, err error)
- func (z TcpEvent) Msgsize() (s int)
- func (i TcpEvent) String() string
- func (z *TcpEvent) UnmarshalMsg(bts []byte) (o []byte, err error)
- type TcpState
- func (z *TcpState) DecodeMsg(dc *msgp.Reader) (err error)
- func (z TcpState) EncodeMsg(en *msgp.Writer) (err error)
- func (z TcpState) MarshalMsg(b []byte) (o []byte, err error)
- func (z TcpState) Msgsize() (s int)
- func (i TcpState) String() string
- func (z *TcpState) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (s *TcpState) UpdateTcp(e TcpEvent, fromState TcpState) TcpAction
- type TermConfig
- type TerminatedError
- func (z *TerminatedError) DecodeMsg(dc *msgp.Reader) (err error)
- func (z TerminatedError) EncodeMsg(en *msgp.Writer) (err error)
- func (t *TerminatedError) Error() string
- func (z TerminatedError) MarshalMsg(b []byte) (o []byte, err error)
- func (z TerminatedError) Msgsize() (s int)
- func (z *TerminatedError) UnmarshalMsg(bts []byte) (o []byte, err error)
- type TxqSlot
Constants ¶
This section is empty.
Variables ¶
var ErrConnectWhenNotListen = fmt.Errorf("connect request when receiver was not in Listen state")
var ErrSenderClosed = fmt.Errorf("got senderClosed")
var ErrSessDone = fmt.Errorf("got session Done")
var ErrShutdown = fmt.Errorf("shutdown in progress")
var ErrTimeoutClose = fmt.Errorf("timed-out after 10 seconds waiting to get Close() message through.")
Functions ¶
func Blake2bOfBytes ¶
func GetSubscripCap ¶
func GetSubscripCap(s *nats.Subscription) (bytecap int64, msgcap int64)
GetSubscipCaps returns the byte and messge count capacity left for this subscription.
func HistoryEqual ¶
HistoryEqual lets one easily compare and send and a recv history
func NewSessionNonce ¶
func NewSessionNonce() string
func SetSubscriptionLimits ¶
func SetSubscriptionLimits(sub *nats.Subscription, msgLimit int64, bytesLimit int64) error
SetSubscriptionLimits changes the limits in the subscription sub.
Types ¶
type AsapHelper ¶
type AsapHelper struct { ReqStop chan bool Done chan bool // drop packets at this size limite, // but discard the first rather than // the last, so new info can be seen // rather than stale. Limit int64 // contains filtered or unexported fields }
AsapHelper is a simple queue goroutine that delivers packets to ASAP clients as soon as they become avaialable. Packets may be dropped, duplicated, or misordered, but they will be delivered as soon as possible.
func NewAsapHelper ¶
func NewAsapHelper(rcvUnordered chan *Packet, max int64) *AsapHelper
NewAsapHelper creates a new AsapHelper. Callers provide rcvUnordered which which they should then do blocking receives on to aquire new *Packets out of order but As Soon As Possible.
func (*AsapHelper) Start ¶
func (r *AsapHelper) Start()
Start starts the AsapHelper tiny queuing service.
type BigFile ¶
type BigFile struct { Filepath string SizeInBytes int64 Blake2b []byte SendTime time.Time Data []byte }
BigFile represents the transmission of a file; size is limited only by memory. Since the primary use is to capture the state that is being held in memory, this is a reasonable limit.
If you requirer larger than memory transfers, can always do infinitely sized streams by using the Session implementations of the stardard io.Reader and io.Writer interfaces (the Read() and Write() methods) directly.
For example, you could use io.Copy() to copy a file on one end and a file on the other end of two nats clients joined by *Session.
func (*BigFile) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type ByteAccount ¶
type ByteAccount struct {
NumBytesAcked int64
}
ByteAccount should be accessed with atomics to avoid data races.
func (*ByteAccount) DecodeMsg ¶
func (z *ByteAccount) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (ByteAccount) EncodeMsg ¶
func (z ByteAccount) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (ByteAccount) MarshalMsg ¶
func (z ByteAccount) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (ByteAccount) Msgsize ¶
func (z ByteAccount) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*ByteAccount) UnmarshalMsg ¶
func (z *ByteAccount) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
type CertConfig ¶
type CertConfig struct {
// contains filtered or unexported fields
}
func (*CertConfig) CertLoad ¶
func (cc *CertConfig) CertLoad() error
func (*CertConfig) Init ¶
func (cc *CertConfig) Init(tlsDir string)
type ConnectReq ¶
type ConnectReq struct { DestInbox string Err error RemoteNonce string Done chan bool // contains filtered or unexported fields }
func NewConnectReq ¶
func NewConnectReq(dest string) *ConnectReq
type EventRingBuf ¶
type EventRingBuf struct { A []time.Time N int // MaxView, the total size of A, whether or not in use. Window time.Duration Beg int // start of in-use data in A Readable int // number of pointers available in A (in use) }
EventRingBuf:
a fixed-size circular ring buffer of interface{}
func NewEventRingBuf ¶
func NewEventRingBuf(maxEventsStored int, maxTimeWindow time.Duration) *EventRingBuf
constructor. NewEventRingBuf will allocate internally a slice of size maxViewInBytes.
func (*EventRingBuf) AddEventCheckOverflow ¶
func (b *EventRingBuf) AddEventCheckOverflow(e time.Time) bool
AddEventCheckOverflow adds event e and removes any events that are older than event e in the ring by more than Window. Returns true if the ring is full after the addition of e. Hence a true response indicates we have seen N events within Window of time.
func (*EventRingBuf) Adopt ¶
func (b *EventRingBuf) Adopt(me []time.Time)
Adopt(): non-standard.
For efficiency's sake, (possibly) take ownership of already allocated slice offered in me.
If me is large we will adopt it, and we will potentially then write to the me buffer. If we already have a bigger buffer, copy me into the existing buffer instead.
func (*EventRingBuf) Advance ¶
func (b *EventRingBuf) Advance(n int)
Advance(): non-standard, but better than Next(), because we don't have to unwrap our buffer and pay the cpu time for the copy that unwrapping may need. Useful in conjuction/after ReadWithoutAdvance() above.
func (*EventRingBuf) ReadPtrs ¶
func (b *EventRingBuf) ReadPtrs(p []time.Time) (n int, err error)
ReadPtrs():
from bytes.Buffer.Read(): Read reads the next len(p) time.Time pointers from the buffer or until the buffer is drained. The return value n is the number of bytes read. If the buffer has no data to return, err is io.EOF (unless len(p) is zero); otherwise it is nil.
func (*EventRingBuf) ReadWithoutAdvance ¶
func (b *EventRingBuf) ReadWithoutAdvance(p []time.Time) (n int, err error)
ReadWithoutAdvance(): if you want to Read the data and leave it in the buffer, so as to peek ahead for example.
func (*EventRingBuf) Reset ¶
func (b *EventRingBuf) Reset()
Reset quickly forgets any data stored in the ring buffer. The data is still there, but the ring buffer will ignore it and overwrite those buffers as new data comes in.
func (*EventRingBuf) String ¶
func (r *EventRingBuf) String() string
func (*EventRingBuf) TwoContig ¶
TwoContig returns all readable pointers, but in two separate slices, to avoid copying. The two slices are from the same buffer, but are not contiguous. Either or both may be empty slices.
func (*EventRingBuf) WritePtrs ¶
func (b *EventRingBuf) WritePtrs(p []time.Time) (n int, err error)
WritePtrs writes len(p) time.Time values from p to the underlying ring, b.A. It returns the number of bytes written from p (0 <= n <= len(p)) and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p).
type Flow ¶
type Flow struct { // Control messages such as acks and keepalives // should not be blocked by flow-control (for // correctness/resumption from no-flow), so we need // to reserve extra headroom in the nats // subscription limits of this much to // allow resumption of flow. // // These reserved headroom settings can be // manually made larger before calling Start() // -- and might need to be if you are running // very large windowMsgSz and/or windowByteSz; or // if you have large messages. After Start() // the nats buffer sizes on the subscription are // fixed and ReservedByteCap and ReservedMsgCap // are not consulted again. ReservedByteCap int64 ReservedMsgCap int64 // flow control params: // These to advertised to senders with // both acks and data segments, and kept // up to date as conditions change. AvailReaderBytesCap int64 AvailReaderMsgCap int64 // Estimate of the round-trip-time from // the remote-end point of view. In nanoseconds. RemoteRttEstNsec int64 // Estimate of the standard deviation of // the round-trip-time from the remote-end // point of view. In nanoseconds. RemoteRttSdNsec int64 // The number of RTT observations in the RemoteRtt // estimates above, also avoids double counting // by acting like a (possibly gappy) sequence number. RemoteRttN int64 // figure out if we should close connection // by tracking LastHeardFromDownstream LastHeardFromDownstream time.Time }
FlowCtrl data is shared by sender and receiver, so use the sender.FlowCt.UpdateFlow() method to safely serialize access.
type FlowCtrl ¶
type FlowCtrl struct { Flow Flow // contains filtered or unexported fields }
FlowCtrl serializes access to the Flow state information so that sender and receiver don't trample reads/writes.
func (*FlowCtrl) GetFlow ¶
GetFlow atomically returns a copy of the current Flow; it does not itself call UpdateFlow, but one should have done so recently to get the most up-to-date info
func (*FlowCtrl) UpdateFlow ¶
func (r *FlowCtrl) UpdateFlow(who string, net Network, availReaderMsgCap int64, availReaderBytesCap int64, pack *Packet) Flow
UpdateFlow updates the flow information. It returns the latest info in the Flow structure.
Updates r.Flow.RemoteRttEstNsec from pack, since if pack is non-nil, it is coming from the receiver.
NB: availReaderMsgCap is ignored if < 0, so use -1 to indicate no update (just query existing values). Same with availReaderBytesCap.
type InOrderSeq ¶
type InOrderSeq struct {
Seq []*Packet
}
InOrderSeq represents ordered (and gapless) data as delivered to the consumer application. The appliation requests it by asking on the Session.ReadMessagesCh channel.
type NatsClient ¶
type NatsClient struct { Nc *nats.Conn Scrip *nats.Subscription Cfg *NatsClientConfig Subject string }
NatsClient wraps a nats.Conn, a nats.Subscription, and a message arrival channel into an easy to use and easy to configure (even with TLS) structure.
func NewNatsClient ¶
func NewNatsClient(cfg *NatsClientConfig) *NatsClient
NewNatsClient creates a new NatsClient.
func NewNatsClientAlreadyStarted ¶
func NewNatsClientAlreadyStarted(cfg *NatsClientConfig, nc *nats.Conn) *NatsClient
func (*NatsClient) Close ¶
func (s *NatsClient) Close()
Close unsubscribes from the nats subscription and closes the nats.Conn connection.
func (*NatsClient) MakeSub ¶
func (s *NatsClient) MakeSub(subject string, hand nats.MsgHandler) error
MakeSub creates a nats subscription on subject with the hand as the callback handler.
type NatsClientConfig ¶
type NatsClientConfig struct { // ==================== // user supplied // ==================== Host string Port int NatsNodeName string // client node name, for nats-top. Subject string SkipTLS bool // helpful for test code to auto-crash on error AsyncErrPanics bool ErrorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error) ReportSlowConsumerErrors bool // ==================== // Init() fills in: // ==================== ServerList string NatsAsyncErrCh chan asyncErr NatsConnClosedCh chan *nats.Conn NatsConnDisconCh chan *nats.Conn NatsConnReconCh chan *nats.Conn Opts []nats.Option Certs CertConfig }
NatsClientConfig provides the nats configuration for the NatsClient.
func NewNatsClientConfig ¶
func NewNatsClientConfig( host string, port int, myname string, subject string, skipTLS bool, asyncErrCrash bool, errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error), ) *NatsClientConfig
NewNatsClientConfig creates a new config struct to provide to NewNatsClient.
func (*NatsClientConfig) Init ¶
func (cfg *NatsClientConfig) Init()
Init initializes the nats options and loads the TLS certificates, if any.
type NatsNet ¶
type NatsNet struct { Cli *NatsClient Halt *idem.Halter // contains filtered or unexported fields }
NatsNet connects to nats using the Network interface.
func NewNatsNet ¶
func NewNatsNet(cli *NatsClient) *NatsNet
NewNatsNet makes a new NataNet based on an actual nats client.
func (*NatsNet) BufferCaps ¶
BufferCaps returns the byte and message limits currently in effect, so that flow control can be used to avoid sender overrunning them.
func (*NatsNet) Listen ¶
Listen starts receiving packets addressed to inbox on the returned channel.
type Network ¶
type Network interface { // Send transmits the packet. It is send and pray; no // guarantee of delivery is made by the Network. Send(pack *Packet, why string) error // Listen starts receiving packets addressed to inbox on the returned channel. Listen(inbox string) (chan *Packet, error) // Flush waits for roundtrip to gnatsd broker to complete; or // for 60 seconds to elapse. Flush() }
Network describes our network abstraction, and is implemented by SimNet and NatsNet.
type Packet ¶
type Packet struct { From string Dest string // uniquely identify a file/session with a randomly // chosen then fixed nonce. See NewSessionNonce() to generate. FromSessNonce string DestSessNonce string // ArrivedAtDestTm is timestamped by // the receiver immediately when the // packet arrives at the Dest receiver. ArrivedAtDestTm time.Time // DataSendTm is stamped anew on each data send and retry // from the sender. Acks by the receiver are // stamped with the AckReplyTm field, and DataSendTm // is copied to the ack to make one-way estimates (if // your clocks are synced) viable. Currently we do // as TCP does and compute RTT based on the roundtrip // from the originating endpoint, which avoids issues // around clock skew. // // For the rationale for updating the timestamp on each // retry, see the discussion of the Karn/Partridge // algorithm, p302 of Peterson and Davie 1996. // // Summary: Accurate RTT requires accurate association of // ack with which retry it is responding to, and thus // time of the retry is the relevant one for RTT computation. // If internally you need to know when a data packet was // first/originally transmitted, see the TxqSlot.OrigSendTime // value. DataSendTm time.Time SeqNum int64 // SeqRetry: 0, 1, 2: allow // accurate RTT estimates. Only on // data frames; acks/control will be negative. SeqRetry int64 AckNum int64 // AckRetry: which SeqRetry attempt for // the AckNum this AckNum is associated with. AckRetry int64 AckReplyTm time.Time // things like Fin, FinAck, DataAck, KeepAlive are // all TcpEvents. TcpEvent TcpEvent // Convey the state in keepalives in place of // retry for the estabAck. Otherwise if estabAck // is lost, the client doing Connect() can get stuck // in SynSent. FromTcpState TcpState // like the byte count AdvertisedWindow in TCP, but // since nats has both byte and message count // limits, we want convey these instead. AvailReaderBytesCap int64 AvailReaderMsgCap int64 // Estimate of the round-trip-time (RTT) from // the senders point of view. In nanoseconds. // Allows mostly passive recievers to have // an accurate view of the end-to-end RTT. FromRttEstNsec int64 // Estimate of the standard deviation of // the round-trip-time from the senders // point of view. In nanoseconds. FromRttSdNsec int64 // number of RTT observations in the From RTT // estimates above, also avoids double counting. FromRttN int64 // CumulBytesTransmitted should give the total accumulated // count of bytes ever transmitted on this session // from `From` to `Dest`. // On data payloads, CumulBytesTransmitted allows // the receiver to figure out how // big any gaps are, so as to give accurate flow control // byte count info. The CumulBytesTransmitted count // should include this packet's len(Data), assuming // this is a data packet. CumulBytesTransmitted int64 Data []byte // DataOffset tells us // where to start reading from in Data. It // allows us to have consumed only part of // a packet on one Read(). DataOffset int // checksum of Data Blake2bChecksum []byte // those waiting for when this particular // Packet is acked by the // recipient can allocate a bchan.New(1) here and wait for a // channel receive on <-CliAcked.Ch CliAcked *bchan.Bchan `msg:"-"` // omit from serialization Accounting *ByteAccount `msg:"-"` // omit from serialization }
Packet is what is transmitted between Sender A and Receiver B, where A and B are the two endpoints in a given Session. (Endpoints are specified by the strings localInbox and destInbox in the NewSession constructor.)
Packets also flow symmetrically from Sender B to Receiver A.
Special packets have AckOnly, KeepAlive, or Closing flagged; otherwise normal packets are data segments that have neither of these flags set. Only normal data packets are tracked for timeout and retry purposes.
func CopyPacketSansData ¶
func (*Packet) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type RTT ¶
RTT provides round-trip time estimation. Currently it is implemented as a simple single exponential moving average with alpha = 0.1 and no seasonal/cyclic terms.
func (*RTT) GetEstimate ¶
GetEstimate returns the current estimate.
type ReadRequest ¶
func NewReadRequest ¶
func NewReadRequest(p []byte) *ReadRequest
type RealClock ¶
type RealClock struct{}
RealClock just passes the Now() call to time.Now().
var RealClk RealClock
type RecvState ¶
type RecvState struct { Clk Clock Net Network Inbox string RemoteInbox string NextFrameExpected int64 LastFrameClientConsumed int64 Rxq []*RxqSlot RecvWindowSize int64 RecvWindowSizeBytes int64 Timeout time.Duration RecvHistory []*Packet MsgRecv chan *Packet Halt *idem.Halter DoSendClosingCh chan *closeReq RecvSz int64 DiscardCount int64 LastMsgConsumed int64 LargestSeqnoRcvd int64 MaxCumulBytesTrans int64 LastByteConsumed int64 LastAvailReaderBytesCap int64 LastAvailReaderMsgCap int64 RcvdButNotConsumed map[int64]*Packet ReadyForDelivery []*Packet ReadMessagesCh chan InOrderSeq NumHeldMessages chan int64 // If AsapOn is true the recevier will // forward packets for delivery to a // client as soon as they arrive // but without ordering guarantees; // and we may also drop packets if // the receive doesn't happen within // 100 msec. // // The client must have previously called // Session.RegisterAsap and provided a // channel to receive *Packet on. // // As-soon-as-possible delivery has no // effect on the flow-control properties // of the session, nor on the delivery // to the one-time/order-preserved clients. AsapOn bool TcpState TcpState AppCloseCallback func() AcceptReadRequest chan *ReadRequest ConnectCh chan *ConnectReq LocalSessNonce string RemoteSessNonce string KeepAliveInterval time.Duration // contains filtered or unexported fields }
RecvState tracks the receiver's sliding window state.
func NewRecvState ¶
func NewRecvState( net Network, recvSz int64, recvSzBytes int64, timeout time.Duration, inbox string, snd *SenderState, clk Clock, nonce string, destInbox string, keepAliveInterval time.Duration, ) *RecvState
NewRecvState makes a new RecvState manager.
func (*RecvState) Close ¶
Close is gentler than Stop(). It politely notifies the remote side to go through its full shutdown sequence. It will return before that sequence is complete.
func (*RecvState) HeldAsString ¶
HeldAsString turns r.RcvdButNotConsumed into a string for convenience of display.
func (*RecvState) Start ¶
Start begins receiving. RecvStates receives both data and acks from earlier sends. Start launches a go routine in the background.
func (*RecvState) UpdateControl ¶
UpdateFlowControl updates our flow control parameters r.LastAvailReaderMsgCap and r.LastAvailReaderBytesCap based on the most recently observed Packet deliveried status, and tells the sender about this indirectly with an r.snd.FlowCt.UpdateFlow() update.
Called with each pack that RecvState receives, which is passed to UpdateFlow().
type SWP ¶
type SWP struct { Sender *SenderState Recver *RecvState }
SWP holds the Sliding Window Protocol state
func NewSWP ¶
func NewSWP(net Network, windowMsgCount int64, windowByteCount int64, timeout time.Duration, inbox string, destInbox string, clk Clock, keepAliveInterval time.Duration, nonce string) *SWP
NewSWP makes a new sliding window protocol manager, holding both sender and receiver components.
type SenderState ¶
type SenderState struct { Clk Clock Net Network Inbox string Dest string LastAckRec int64 LastFrameSent int64 Txq []*TxqSlot SenderWindowSize int64 Timeout time.Duration // the main goroutine safe way to request // sending a packet: BlockingSend chan *Packet GotPack chan *Packet Halt *idem.Halter SendHistory []*Packet SendSz int64 SendAck chan *Packet DiscardCount int64 LastSendTime time.Time LastHeardFromDownstream time.Time KeepAliveInterval time.Duration // after this many failed keepalives, we // close down the session. Set to less than 1 // to disable the auto-close. NumFailedKeepAlivesBeforeClosing int SentButNotAckedByDeadline *retree SentButNotAckedBySeqNum *retree // flow control params // last seen from our downstream // receiver, we throttle ourselves // based on these. LastSeenAvailReaderBytesCap int64 LastSeenAvailReaderMsgCap int64 // do synchronized access via GetFlow() // and UpdateFlow(s.Net) FlowCt *FlowCtrl TotalBytesSent int64 TotalBytesSentAndAcked int64 // tell the receiver that sender is terminating SenderShutdown chan bool DoSendClosingCh chan *closeReq LocalSessNonce string RemoteSessNonce string // contains filtered or unexported fields }
SenderState tracks the sender's sliding window state. To avoid circular deadlocks, the SenderState never talks directly to the RecvState. The RecvState will tell the Sender stuff on GotPack.
Acks, retries, keep-alives, and original-data: these are the four types of sends we do. Also now a closing message is sent on shutdown.
func NewSenderState ¶
func NewSenderState( net Network, sendSz int64, timeout time.Duration, inbox string, destInbox string, clk Clock, keepAliveInterval time.Duration, nonce string, ) *SenderState
NewSenderState constructs a new SenderState struct.
func (*SenderState) ComputeInflight ¶
func (s *SenderState) ComputeInflight() (bytesInflight int64, msgInflight int64)
ComputeInflight returns the number of bytes and messages that are in-flight: they have been sent but not yet acked.
func (*SenderState) GetDeadlineDur ¶
func (s *SenderState) GetDeadlineDur(flow Flow) time.Duration
GetDeadlineDur returns the duration until the receive deadline using a weighted average of our observed RTT info and the remote end's observed RTT info. Add it to time.Now() before using.
func (*SenderState) GetErr ¶
func (s *SenderState) GetErr() (err error)
func (*SenderState) GetRecvLastFrameClientConsumed ¶
func (s *SenderState) GetRecvLastFrameClientConsumed() int64
func (*SenderState) SetErr ¶
func (s *SenderState) SetErr(err error)
func (*SenderState) SetRecvLastFrameClientConsumed ¶
func (s *SenderState) SetRecvLastFrameClientConsumed(nfe int64)
func (*SenderState) Start ¶
func (s *SenderState) Start(sess *Session)
Start initiates the SenderState goroutine, which manages sends, timeouts, and resends
func (*SenderState) UpdateRTT ¶
func (s *SenderState) UpdateRTT(pack *Packet)
type Session ¶
type Session struct { Cfg *SessionConfig Swp *SWP Destination string MyInbox string Net Network ReadMessagesCh chan InOrderSeq AcceptReadRequest chan *ReadRequest // Halt.Done.Chan is closed if session is terminated. // This will happen if the remote session stops // responding and is thus declared dead, as well // as after an explicit close. Halt *idem.Halter NumFailedKeepAlivesBeforeClosing int ConnectTimeout time.Duration ConnectAttempts int RemoteSenderClosed chan bool LocalSessNonce string RemoteSessNonce string // contains filtered or unexported fields }
Session tracks a given point-to-point sesssion and its sliding window state for one of the end-points.
func NewSession ¶
func NewSession(cfg SessionConfig) (*Session, error)
NewSession makes a new Session, and calls Swp.Start to begin the sliding-window-protocol.
If windowByteSz is negative or less than windowMsgSz, we estimate a byte size based on 10kb messages and the given windowMsgSz. It is much better to give the aggregate byte limit you want specifically. Negative windowByteSz is merely a convenience for writing tests.
The timeout parameter controls how often we wake up to check for packets that need to be retried.
Packet timers (retry deadlines) are established using an exponential moving average + some standard deviations of the observed round-trip time of Ack packets. Retries reset the DataSentTm so there is never confusion as to which retry is being acked. The Ack packet method is not subject to two-clock skew because the same clock is used for both begin and end measurements.
func SetupRecvStream ¶
func SetupRecvStream( nc *nats.Conn, serverHost string, serverNport int, clientName string, mysubj string, fromsubj string, skipTLS bool, errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error), ignoreSlowConsumerErrors bool, ) (*Session, error)
SetupRecvStream sets up to receives a stream on bytes over the existing nc, which should be a live *nats.Conn that is already connected to a server. We will the nc subscribe to listen on the mysubj topic.
if errorCallbackFunc is nil, we will panic on nats errors, with one exception: we respect the ignoreSlowConsumerErrors flag. So if ignoreSlowConsumerErrors we won't panic on a slow consumer, even if errorCallbackFunc is nil.
after SetupRecvStream(), just do
n, err := sessB.Read(p)
with the sessB returned. This will read from the session stream, into a p []byte. This is the standard interface Reader and Read() method.
func SetupSendStream ¶
func SetupSendStream( nc *nats.Conn, serverHost string, serverNport int, clientName string, mysubj string, fromsubj string, skipTLS bool, errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error), ignoreSlowConsumerErrors bool, ) (*Session, error)
just do
n, err := sessA.Write(writeme)
with the sessA returned.
func (*Session) ConnectIfNeeded ¶
if not established, do the connect 3-way handshake to exchange Session nonces.
func (*Session) CountPacketsReadConsumed ¶
CountPacketsReadConsumed reports on how many packets the application has read from the session.
func (*Session) CountPacketsSentForTransfer ¶
CountPacketsSentForTransfer reports on how many packets. the application has written to the session.
func (*Session) IncrPacketsReadConsumed ¶
IncrPacketsReadConsumed increment packetsConsumed and return the new total.
func (*Session) IncrPacketsSentForTransfer ¶
IncrPacketsSentForTransfer increment packetsConsumed and return the new total.
func (*Session) IncrementClockOnReceiveForTesting ¶
func (s *Session) IncrementClockOnReceiveForTesting()
IncrementClockOnReceiveForTesting supports testing by incrementing the Clock automatically when a packet is received. Expects a SimClock to be in use, and so to avoid mixing testing and prod this call will panic if that assumption in violated.
func (*Session) Push ¶
Push sends a message packet, blocking until there is a flow-control slot available. The Push will block due to flow control to avoid over-running the receiving clients buffers.
Upon return we are not guaranteed that the the message has reached the broker or the client. If you want to Flush() to the broker, use the PushGetAck() method instead. This will hurt throughput, but may be needed if you are going to shutdown right after the send (to avoid dropping the packet before it reaches the broker).
You can use s.CountPacketsSentForTransfer() to get the total count of packets Push()-ed so far.
func (*Session) RecvFile ¶
RecvFile will check for corruption and report it in err. We may also return the corrupted *BigFile structure for further examination.
func (*Session) RegisterAsap ¶
RegisterAsap registers a call back channel, rcvUnordered, which will get *Packet that are unordered and possibly have gaps in their sequence (where packets where dropped). However the channel will see the packets as soon as possible. The session will still be flow controlled however, so if the receiver throttles the sender, packets may be delayed. Clients should be prepared to deal with duplicated, dropped, and mis-ordered packets on the rcvUnordered channel. The limit argument sets how many messages are queued before we drop the oldest.
func (*Session) SelfConsumeForTesting ¶
func (s *Session) SelfConsumeForTesting()
SelfConsumeForTesting sets up a reader to read all produced messages automatically. You can use CountPacketsReadConsumed() to see the total number consumed thus far.
func (*Session) SetConnectDefaults ¶
func (s *Session) SetConnectDefaults()
type SessionConfig ¶
type SessionConfig struct { // the network to use, NatsNet or SimNet Net Network // where we listen LocalInbox string // the remote destination topic for our messages DestInbox string // capacity of our receive buffers in message count WindowMsgCount int64 // capacity of our receive buffers in byte count WindowByteSz int64 // how often we wakeup and check // if packets need to be retried. Timeout time.Duration KeepAliveInterval time.Duration // set to -1 to disable auto-close. If // not set (or left at 0), then we default // to 50 (so after 50 keep-alive intervals // with no remote contact, we close the session). NumFailedKeepAlivesBeforeClosing int // the clock (real or simulated) to use Clk Clock TermCfg TermConfig }
SessionConfig configures a Session.
type SimClock ¶
SimClock simulates time passing. Call Advance to increment the time.
type SimNet ¶
type SimNet struct { Net map[string]chan *Packet LossProb float64 Latency time.Duration FilterThisEvent map[TcpEvent]*int FilterCount int // only filter this many, if > 0 TotalSent map[string]int64 TotalRcvd map[string]int64 // simulate loss of the first packets DiscardOnce int64 // simulate re-ordering of packets by setting this to 1 SimulateReorderNext int // simulate duplicating the next packet DuplicateNext uint32 // enforce that advertised windows are never // violated by having more messages in flight // than have been advertised. Advertised map[string]int64 Inflight map[string]int64 ReqStop chan bool Done chan bool AllowBlackHoleSends bool // contains filtered or unexported fields }
SimNet simulates a network with the given latency and loss characteristics. See NewSimNet.
func NewSimNet ¶
NewSimNet makes a network simulator. The latency is one-way trip time; lossProb is the probability of the packet getting lost on the network.
type SubReport ¶
type SubReport struct { Delivered int64 Dropped int MaxMsgsQueued int MaxBytesQueued int PendingMsg int PendingBytes int LimitMsg int LimitBytes int SubscriptionActive bool }
SubReport is the output of ReportOnSubscriptions.
func ReportOnSubscription ¶
func ReportOnSubscription(s *nats.Subscription) *SubReport
ReportOnSubscription describes the status of the nats subscription s in terms of a SubReport with details the backing memory used and how close to the limits the client currently is.
type Sum ¶
type Sum struct { ObsKeepRateFromA float64 ObsKeepRateFromB float64 // contains filtered or unexported fields }
Sum is output by the Simnet.Summary function to summarize the packet losses in a network simulation.
type SynAckAck ¶
type SynAckAck struct { // EventSyn => also conveys SessionNonce TcpEvent TcpEvent // SessionNonce identifies the file in this session (replaces port numbers). // Should always be present. See NewSessionNonce() to generate. SessionNonce string // NextExpected should be 0 if fresh start; // i.e. we know nothing from prior evesdropping on // any prior multicast of this SessionNonce. Otherwise, // NextExpected and NackList convey knowledge of what we have // and don't have to allow the sender to skip // repeating the packets. // // This is the next serial number that the receiver has not received. // // Only present on TcpEvent == EventSynAck. NextExpected int64 // NackList can be an empty slice. // Nacklist is a list of missing packets (on the receiver side) that we are aware of. // (Nack=negative acknowledgement). // Only present on TcpEvent == EventSynAck. NackList []int64 }
SynAckAck is used as the encoded Data for EventSyn, EventSynAck, and EventEstabAck: connection setup.
func (*SynAckAck) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type TcpAction ¶
type TcpAction int
func (TcpAction) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type TcpEvent ¶
type TcpEvent int
here so that msgp can know what it is
const ( EventNil TcpEvent = 0 EventStartListen TcpEvent = 1 // server enters Listen state EventStartConnect TcpEvent = 2 // client enters SynSent state // server enters SynReceived state, client enters SynReceived (simultaneous open) EventSyn TcpEvent = 3 // server enters SynReceived // client enters Established, does EventSendEstabAck EventSynAck TcpEvent = 4 // client enters Established // server enters Established EventEstabAck TcpEvent = 5 EventStartClose TcpEvent = 6 EventApplicationClosed TcpEvent = 7 EventFin TcpEvent = 8 EventFinAck TcpEvent = 9 // common acks of data during Established state // aka AckOnly EventDataAck TcpEvent = 10 EventData TcpEvent = 11 // a data packet. Most common. state is Established. // a keepalive EventKeepAlive TcpEvent = 12 )
func (TcpEvent) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type TcpState ¶
type TcpState int
const ( // init sequence Fresh TcpState = 0 Closed TcpState = 1 Listen TcpState = 2 // server, passive open. SynReceived TcpState = 3 SynSent TcpState = 4 // client, active open. // the numbering is significant, since we'll // test that state is >= Established before // enforcing SessNonce matching (in recv.go). Established TcpState = 5 // close sequence // if we timeout in this state, we go to Closed. CloseInitiatorHasSentFin TcpState = 6 // FinWait1 // if we timeout in this state, we go to Closed. CloseResponderGotFin TcpState = 7 // CloseWait )
func (TcpState) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
func (TcpState) Msgsize ¶
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*TcpState) UnmarshalMsg ¶
UnmarshalMsg implements msgp.Unmarshaler
func (*TcpState) UpdateTcp ¶
UpdateTcp is a simplified version of the TCP state machine. We open with the same 3-way handshake, with Syn, SynAck, and EstabAck; but simplify the close process, opting to avoid the four-way close handshake. Typically our sessions will disappear quickly so this quick close down was more practical.
Any TcpAction returned is executed by the RecvState.doTcpAction method in recv.go.
Argument e is the received event, coming from the remote endpoint.
type TermConfig ¶
type TerminatedError ¶
type TerminatedError struct {
Msg string
}
func (*TerminatedError) DecodeMsg ¶
func (z *TerminatedError) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (TerminatedError) EncodeMsg ¶
func (z TerminatedError) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*TerminatedError) Error ¶
func (t *TerminatedError) Error() string
func (TerminatedError) MarshalMsg ¶
func (z TerminatedError) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (TerminatedError) Msgsize ¶
func (z TerminatedError) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*TerminatedError) UnmarshalMsg ¶
func (z *TerminatedError) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler