swp

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2017 License: MIT, MIT Imports: 26 Imported by: 6

README

go-sliding-window

Docs: https://godoc.org/github.com/glycerine/go-sliding-window

In a picture:


    /------------------ swp flow-controls these 2 end points -------------\
    |                                                                     |
    V                                                                     V
publisher ---tcp-->  gnatsd  ---tcp-->  nats go-client lib -in-proc-> subscriber
   (a)                 (b)                      (c)                      (d)


legend:
(a) is your go pub process
(b) is the gnatsd server process [ref 3]
(c) nats-client lib [ref 4]
(d) your go subscriber

The problem solved here:
the (c) internal buffers can overflow if (a) is a fast publisher.

Note: (c) and (d) are compiled together into your go subscriber.
executive summary

The problem

Referring to the figure above, when the publisher (a) is faster than the subscriber (d), the buffers in (c) can overflow. Parts (c) and (d) are compiled into your subscriber process.

This overflow can happen even though the sub links from (a)->(b) and (b)->(c) are individually flow-controlled, because there is no end-to-end feedback that includes (d).

The solution

swp provides flow-control between (a) and (d), giving your publisher feedback about how much your subscriber can handle.

Implication: nats can be used for both control-plane and data delivery.

Note: swp is optional, and is layered on top of nats. It does not change the nats service at all. It simply provides additional guarantees between two endpoints connected over nats. Of course other subscribers listening to the same publisher will see the same rate of events that (d) does.

description

Package swp implements the same Sliding Window Protocol that TCP uses for flow-control and reliable, ordered delivery.

The Nats event bus (https://nats.io/) is a software model of a hardware multicast switch. Nats provides multicast, but no guarantees of delivery and no flow-control. This works fine as long as your downstream read/subscribe capacity is larger than your publishing rate. If you don't know nats, reference 1 is a great introduction.

If your nats publisher ever produces faster than your subscriber can keep up, you may overrun your buffers and drop messages. If your sender is local and replaying a disk file of traffic over nats, you are guaranteed to exhaust even the largest of the internal nats client buffers. In addition you may wish the enforced order of delivery (even with dropped messages), which swp provides.

discussion

swp was built to provide flow-control and reliable, ordered delivery on top of the nats event bus. It reproduces the TCP sliding window and flow-control mechanism in a Session between two nats clients. It provides flow control between exactly two nats endpoints; in many cases this is sufficient to allow all subscribers to keep up. If you have a wide variation in consumer performance, establish the rate-controlling swp Session between your producer and your slowest consumer.

There is also a Session.RegisterAsap() API that can be used to obtain possibly-out-of-order and possibly-duplicated but as-soon-as-possible delivery (similar to that which nats give you natively), while retaining the flow-control required to avoid client-buffer overrun. This can be used in tandem with the main always-ordered-and-lossless API if so desired.

API documentation

Docs: https://godoc.org/github.com/glycerine/go-sliding-window

notes

An implementation of the sliding window protocol (SWP) in Go.

This algorithm is the same one that TCP uses for reliability, ordering, and flow-control.

Reference: pp118-120, Computer Networks: A Systems Approach by Peterson and Davie, Morgan Kaufmann Publishers, 1996. For flow control implementation details see section 6.2.4 "Sliding Window Revisited", pp296-299.

Per Peterson and Davie, the SWP has three benefits:

  • SWP reliably delivers messages across an unreliable link. SWP accomplishes this by acknowledging messages, and automatically resending messages that do not get acknowledged within a timeout.

  • SWP preserves the order in which messages are transmitted and received, by attaching sequence numbers and holding off on delivery until ordered delivery is obtained.

  • SWP can provide flow control. Overly fast senders can be throttled by slower receivers. We implement this here; it was the main motivation for swp development.

status

Working and useful. The library was test-driven and features a network simulator that simulates packet reordering, duplication, and loss. We pass tests with 20% packet loss easily, and we pass tests for lock-step flow control down to one message only in-flight at a time (just as a verification of flow control; do not actually run this way in production if you want performance and/or packet-reordering support!). Round-trip time estimation and smoothing from the observed data-to-ack round trips has been implemented to set retry-deadlines, similar to how TCP operates.

In short, the library is ready, working, and quite useful.

next steps

What could be improved: the network simulator could be improved by adding a chaos monkey mode that is even more aggressive about re-ordering and duplicating packets.

credits

Author: Jason E. Aten, Ph.D.

License: MIT

references

[1] https://www.youtube.com/watch?v=5GcAgMPECxE

[2] https://github.com/nats-io/gnatsd/issues/217

[3] https://github.com/nats-io/gnatsd

[4] https://github.com/nats-io/nats

Documentation

Overview

Package swp implements the same Sliding Window Protocol that TCP uses for flow-control and reliable, ordered delivery.

The Nats event bus (https://nats.io/) is a software model of a hardware multicast switch. Nats provides multicast, but no guarantees of delivery and no flow-control. This works fine as long as your downstream read/subscribe capacity is larger than your publishing rate.

If your nats publisher evers produces faster than your subscriber can keep up, you may overrun your buffers and drop messages. If your sender is local and replaying a disk file of traffic over nats, you are guanateed to exhaust even the largest of the internal nats client buffers. In addition you may wish guaranteed order of delivery (even with dropped messages), which swp provides.

Hence swp was built to provide flow-control and reliable, ordered delivery on top of the nats event bus. It reproduces the TCP sliding window and flow-control mechanism in a Session between two nats clients. It provides flow control between exactly two nats endpoints; in many cases this is sufficient to allow all subscribers to keep up. If you have a wide variation in consumer performance, establish the rate-controlling swp Session between your producer and your slowest consumer.

There is also a Session.RegisterAsap() API that can be used to obtain possibly-out-of-order and possibly-duplicated but as-soon-as-possible delivery (similar to that which nats give you natively), while retaining the flow-control required to avoid client-buffer overrun. This can be used in tandem with the main always-ordered-and-lossless API if so desired.

Index

Constants

This section is empty.

Variables

View Source
var ErrConnectWhenNotListen = fmt.Errorf("connect request when receiver was not in Listen state")
View Source
var ErrSenderClosed = fmt.Errorf("got senderClosed")
View Source
var ErrSessDone = fmt.Errorf("got session Done")
View Source
var ErrShutdown = fmt.Errorf("shutdown in progress")
View Source
var ErrTimeoutClose = fmt.Errorf("timed-out after 10 seconds waiting to get Close() message through.")

Functions

func Blake2bOfBytes

func Blake2bOfBytes(by []byte) []byte

func GetSubscripCap

func GetSubscripCap(s *nats.Subscription) (bytecap int64, msgcap int64)

GetSubscipCaps returns the byte and messge count capacity left for this subscription.

func HistoryEqual

func HistoryEqual(a, b []*Packet) bool

HistoryEqual lets one easily compare and send and a recv history

func InWindow

func InWindow(seqno, min, max int64) bool

InWindow returns true iff seqno is in [min, max].

func NewSessionNonce

func NewSessionNonce() string

func SetSubscriptionLimits

func SetSubscriptionLimits(sub *nats.Subscription,
	msgLimit int64,
	bytesLimit int64) error

SetSubscriptionLimits changes the limits in the subscription sub.

func StartGnatsd

func StartGnatsd(host string, port int) (*server.Server, error)

Types

type AsapHelper

type AsapHelper struct {
	ReqStop chan bool
	Done    chan bool

	// drop packets at this size limite,
	// but discard the first rather than
	// the last, so new info can be seen
	// rather than stale.
	Limit int64
	// contains filtered or unexported fields
}

AsapHelper is a simple queue goroutine that delivers packets to ASAP clients as soon as they become avaialable. Packets may be dropped, duplicated, or misordered, but they will be delivered as soon as possible.

func NewAsapHelper

func NewAsapHelper(rcvUnordered chan *Packet, max int64) *AsapHelper

NewAsapHelper creates a new AsapHelper. Callers provide rcvUnordered which which they should then do blocking receives on to aquire new *Packets out of order but As Soon As Possible.

func (*AsapHelper) Start

func (r *AsapHelper) Start()

Start starts the AsapHelper tiny queuing service.

func (*AsapHelper) Stop

func (r *AsapHelper) Stop()

Stop shuts down the AsapHelper goroutine.

type BigFile

type BigFile struct {
	Filepath    string
	SizeInBytes int64
	Blake2b     []byte
	SendTime    time.Time
	Data        []byte
}

BigFile represents the transmission of a file; size is limited only by memory. Since the primary use is to capture the state that is being held in memory, this is a reasonable limit.

If you requirer larger than memory transfers, can always do infinitely sized streams by using the Session implementations of the stardard io.Reader and io.Writer interfaces (the Read() and Write() methods) directly.

For example, you could use io.Copy() to copy a file on one end and a file on the other end of two nats clients joined by *Session.

func (*BigFile) DecodeMsg

func (z *BigFile) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*BigFile) EncodeMsg

func (z *BigFile) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*BigFile) MarshalMsg

func (z *BigFile) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*BigFile) Msgsize

func (z *BigFile) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*BigFile) UnmarshalMsg

func (z *BigFile) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ByteAccount

type ByteAccount struct {
	NumBytesAcked int64
}

ByteAccount should be accessed with atomics to avoid data races.

func (*ByteAccount) DecodeMsg

func (z *ByteAccount) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (ByteAccount) EncodeMsg

func (z ByteAccount) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (ByteAccount) MarshalMsg

func (z ByteAccount) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (ByteAccount) Msgsize

func (z ByteAccount) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*ByteAccount) UnmarshalMsg

func (z *ByteAccount) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type CertConfig

type CertConfig struct {
	// contains filtered or unexported fields
}

func (*CertConfig) CertLoad

func (cc *CertConfig) CertLoad() error

func (*CertConfig) Init

func (cc *CertConfig) Init(tlsDir string)

type Clock

type Clock interface {

	// Now provides the present (or simulated) time
	Now() time.Time
}

Clock interface allows test code to control time advancement

type ConnectReq

type ConnectReq struct {
	DestInbox   string
	Err         error
	RemoteNonce string
	Done        chan bool
	// contains filtered or unexported fields
}

func NewConnectReq

func NewConnectReq(dest string) *ConnectReq

type EventRingBuf

type EventRingBuf struct {
	A        []time.Time
	N        int // MaxView, the total size of A, whether or not in use.
	Window   time.Duration
	Beg      int // start of in-use data in A
	Readable int // number of pointers available in A (in use)
}

EventRingBuf:

a fixed-size circular ring buffer of interface{}

func NewEventRingBuf

func NewEventRingBuf(maxEventsStored int, maxTimeWindow time.Duration) *EventRingBuf

constructor. NewEventRingBuf will allocate internally a slice of size maxViewInBytes.

func (*EventRingBuf) AddEventCheckOverflow

func (b *EventRingBuf) AddEventCheckOverflow(e time.Time) bool

AddEventCheckOverflow adds event e and removes any events that are older than event e in the ring by more than Window. Returns true if the ring is full after the addition of e. Hence a true response indicates we have seen N events within Window of time.

func (*EventRingBuf) Adopt

func (b *EventRingBuf) Adopt(me []time.Time)

Adopt(): non-standard.

For efficiency's sake, (possibly) take ownership of already allocated slice offered in me.

If me is large we will adopt it, and we will potentially then write to the me buffer. If we already have a bigger buffer, copy me into the existing buffer instead.

func (*EventRingBuf) Advance

func (b *EventRingBuf) Advance(n int)

Advance(): non-standard, but better than Next(), because we don't have to unwrap our buffer and pay the cpu time for the copy that unwrapping may need. Useful in conjuction/after ReadWithoutAdvance() above.

func (*EventRingBuf) ReadPtrs

func (b *EventRingBuf) ReadPtrs(p []time.Time) (n int, err error)

ReadPtrs():

from bytes.Buffer.Read(): Read reads the next len(p) time.Time pointers from the buffer or until the buffer is drained. The return value n is the number of bytes read. If the buffer has no data to return, err is io.EOF (unless len(p) is zero); otherwise it is nil.

func (*EventRingBuf) ReadWithoutAdvance

func (b *EventRingBuf) ReadWithoutAdvance(p []time.Time) (n int, err error)

ReadWithoutAdvance(): if you want to Read the data and leave it in the buffer, so as to peek ahead for example.

func (*EventRingBuf) Reset

func (b *EventRingBuf) Reset()

Reset quickly forgets any data stored in the ring buffer. The data is still there, but the ring buffer will ignore it and overwrite those buffers as new data comes in.

func (*EventRingBuf) String

func (r *EventRingBuf) String() string

func (*EventRingBuf) TwoContig

func (b *EventRingBuf) TwoContig(makeCopy bool) (first []time.Time, second []time.Time)

TwoContig returns all readable pointers, but in two separate slices, to avoid copying. The two slices are from the same buffer, but are not contiguous. Either or both may be empty slices.

func (*EventRingBuf) WritePtrs

func (b *EventRingBuf) WritePtrs(p []time.Time) (n int, err error)

WritePtrs writes len(p) time.Time values from p to the underlying ring, b.A. It returns the number of bytes written from p (0 <= n <= len(p)) and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p).

type Flow

type Flow struct {
	// Control messages such as acks and keepalives
	// should not be blocked by flow-control (for
	// correctness/resumption from no-flow), so we need
	// to reserve extra headroom in the nats
	// subscription limits of this much to
	// allow resumption of flow.
	//
	// These reserved headroom settings can be
	// manually made larger before calling Start()
	//  -- and might need to be if you are running
	// very large windowMsgSz and/or windowByteSz; or
	// if you have large messages. After Start()
	// the nats buffer sizes on the subscription are
	// fixed and ReservedByteCap and ReservedMsgCap
	// are not consulted again.
	ReservedByteCap int64
	ReservedMsgCap  int64

	// flow control params:
	// These to advertised to senders with
	// both acks and data segments, and kept
	// up to date as conditions change.
	AvailReaderBytesCap int64
	AvailReaderMsgCap   int64

	// Estimate of the round-trip-time from
	// the remote-end point of view. In nanoseconds.
	RemoteRttEstNsec int64

	// Estimate of the standard deviation of
	// the round-trip-time from the remote-end
	// point of view. In nanoseconds.
	RemoteRttSdNsec int64

	// The number of RTT observations in the RemoteRtt
	// estimates above, also avoids double counting
	// by acting like a (possibly gappy) sequence number.
	RemoteRttN int64

	// figure out if we should close connection
	// by tracking LastHeardFromDownstream
	LastHeardFromDownstream time.Time
}

FlowCtrl data is shared by sender and receiver, so use the sender.FlowCt.UpdateFlow() method to safely serialize access.

type FlowCtrl

type FlowCtrl struct {
	Flow Flow
	// contains filtered or unexported fields
}

FlowCtrl serializes access to the Flow state information so that sender and receiver don't trample reads/writes.

func (*FlowCtrl) GetFlow

func (r *FlowCtrl) GetFlow() Flow

GetFlow atomically returns a copy of the current Flow; it does not itself call UpdateFlow, but one should have done so recently to get the most up-to-date info

func (*FlowCtrl) UpdateFlow

func (r *FlowCtrl) UpdateFlow(who string, net Network,
	availReaderMsgCap int64, availReaderBytesCap int64,
	pack *Packet) Flow

UpdateFlow updates the flow information. It returns the latest info in the Flow structure.

Updates r.Flow.RemoteRttEstNsec from pack, since if pack is non-nil, it is coming from the receiver.

NB: availReaderMsgCap is ignored if < 0, so use -1 to indicate no update (just query existing values). Same with availReaderBytesCap.

type InOrderSeq

type InOrderSeq struct {
	Seq []*Packet
}

InOrderSeq represents ordered (and gapless) data as delivered to the consumer application. The appliation requests it by asking on the Session.ReadMessagesCh channel.

type NatsClient

type NatsClient struct {
	Nc      *nats.Conn
	Scrip   *nats.Subscription
	Cfg     *NatsClientConfig
	Subject string
}

NatsClient wraps a nats.Conn, a nats.Subscription, and a message arrival channel into an easy to use and easy to configure (even with TLS) structure.

func NewNatsClient

func NewNatsClient(cfg *NatsClientConfig) *NatsClient

NewNatsClient creates a new NatsClient.

func NewNatsClientAlreadyStarted

func NewNatsClientAlreadyStarted(cfg *NatsClientConfig, nc *nats.Conn) *NatsClient

func (*NatsClient) Close

func (s *NatsClient) Close()

Close unsubscribes from the nats subscription and closes the nats.Conn connection.

func (*NatsClient) MakeSub

func (s *NatsClient) MakeSub(subject string, hand nats.MsgHandler) error

MakeSub creates a nats subscription on subject with the hand as the callback handler.

func (*NatsClient) Start

func (s *NatsClient) Start() error

Start connects to the gnatsd server.

type NatsClientConfig

type NatsClientConfig struct {
	// ====================
	// user supplied
	// ====================
	Host string
	Port int

	NatsNodeName string // client node name, for nats-top.
	Subject      string

	SkipTLS bool

	// helpful for test code to auto-crash on error
	AsyncErrPanics bool

	ErrorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error)

	ReportSlowConsumerErrors bool

	// ====================
	// Init() fills in:
	// ====================
	ServerList string

	NatsAsyncErrCh   chan asyncErr
	NatsConnClosedCh chan *nats.Conn
	NatsConnDisconCh chan *nats.Conn
	NatsConnReconCh  chan *nats.Conn

	Opts  []nats.Option
	Certs CertConfig
}

NatsClientConfig provides the nats configuration for the NatsClient.

func NewNatsClientConfig

func NewNatsClientConfig(
	host string,
	port int,
	myname string,
	subject string,
	skipTLS bool,
	asyncErrCrash bool,
	errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error),

) *NatsClientConfig

NewNatsClientConfig creates a new config struct to provide to NewNatsClient.

func (*NatsClientConfig) Init

func (cfg *NatsClientConfig) Init()

Init initializes the nats options and loads the TLS certificates, if any.

type NatsNet

type NatsNet struct {
	Cli *NatsClient

	Halt *idem.Halter
	// contains filtered or unexported fields
}

NatsNet connects to nats using the Network interface.

func NewNatsNet

func NewNatsNet(cli *NatsClient) *NatsNet

NewNatsNet makes a new NataNet based on an actual nats client.

func (*NatsNet) BufferCaps

func (n *NatsNet) BufferCaps() (bytecap int64, msgcap int64)

BufferCaps returns the byte and message limits currently in effect, so that flow control can be used to avoid sender overrunning them.

func (*NatsNet) Flush

func (n *NatsNet) Flush()

func (*NatsNet) Listen

func (n *NatsNet) Listen(inbox string) (chan *Packet, error)

Listen starts receiving packets addressed to inbox on the returned channel.

func (*NatsNet) Send

func (n *NatsNet) Send(pack *Packet, why string) error

Send blocks until Send has started (but not until acked).

func (*NatsNet) Stop

func (n *NatsNet) Stop()

type Network

type Network interface {

	// Send transmits the packet. It is send and pray; no
	// guarantee of delivery is made by the Network.
	Send(pack *Packet, why string) error

	// Listen starts receiving packets addressed to inbox on the returned channel.
	Listen(inbox string) (chan *Packet, error)

	// Flush waits for roundtrip to gnatsd broker to complete; or
	// for 60 seconds to elapse.
	Flush()
}

Network describes our network abstraction, and is implemented by SimNet and NatsNet.

type Packet

type Packet struct {
	From string
	Dest string

	// uniquely identify a file/session with a randomly
	// chosen then fixed nonce. See NewSessionNonce() to generate.
	FromSessNonce string
	DestSessNonce string

	// ArrivedAtDestTm is timestamped by
	// the receiver immediately when the
	// packet arrives at the Dest receiver.
	ArrivedAtDestTm time.Time

	// DataSendTm is stamped anew on each data send and retry
	// from the sender. Acks by the receiver are
	// stamped with the AckReplyTm field, and DataSendTm
	// is copied to the ack to make one-way estimates (if
	// your clocks are synced) viable. Currently we do
	// as TCP does and compute RTT based on the roundtrip
	// from the originating endpoint, which avoids issues
	// around clock skew.
	//
	// For the rationale for updating the timestamp on each
	// retry, see the discussion of the Karn/Partridge
	// algorithm, p302 of Peterson and Davie 1996.
	//
	// Summary: Accurate RTT requires accurate association of
	// ack with which retry it is responding to, and thus
	// time of the retry is the relevant one for RTT computation.
	// If internally you need to know when a data packet was
	// first/originally transmitted, see the TxqSlot.OrigSendTime
	// value.
	DataSendTm time.Time

	SeqNum int64

	// SeqRetry: 0, 1, 2: allow
	// accurate RTT estimates. Only on
	// data frames; acks/control will be negative.
	SeqRetry int64

	AckNum int64

	// AckRetry: which SeqRetry attempt for
	// the AckNum this AckNum is associated with.
	AckRetry   int64
	AckReplyTm time.Time

	// things like Fin, FinAck, DataAck, KeepAlive are
	// all TcpEvents.
	TcpEvent TcpEvent

	// Convey the state in keepalives in place of
	// retry for the estabAck. Otherwise if estabAck
	// is lost, the client doing Connect() can get stuck
	// in SynSent.
	FromTcpState TcpState

	// like the byte count AdvertisedWindow in TCP, but
	// since nats has both byte and message count
	// limits, we want convey these instead.
	AvailReaderBytesCap int64
	AvailReaderMsgCap   int64

	// Estimate of the round-trip-time (RTT) from
	// the senders point of view. In nanoseconds.
	// Allows mostly passive recievers to have
	// an accurate view of the end-to-end RTT.
	FromRttEstNsec int64

	// Estimate of the standard deviation of
	// the round-trip-time from the senders
	// point of view. In nanoseconds.
	FromRttSdNsec int64

	// number of RTT observations in the From RTT
	// estimates above, also avoids double counting.
	FromRttN int64

	// CumulBytesTransmitted should give the total accumulated
	// count of bytes ever transmitted on this session
	// from `From` to `Dest`.
	// On data payloads, CumulBytesTransmitted allows
	// the receiver to figure out how
	// big any gaps are, so as to give accurate flow control
	// byte count info. The CumulBytesTransmitted count
	// should include this packet's len(Data), assuming
	// this is a data packet.
	CumulBytesTransmitted int64

	Data []byte

	// DataOffset tells us
	// where to start reading from in Data. It
	// allows us to have consumed only part of
	// a packet on one Read().
	DataOffset int

	// checksum of Data
	Blake2bChecksum []byte

	// those waiting for when this particular
	// Packet is acked by the
	// recipient can allocate a bchan.New(1) here and wait for a
	// channel receive on <-CliAcked.Ch
	CliAcked *bchan.Bchan `msg:"-"` // omit from serialization

	Accounting *ByteAccount `msg:"-"` // omit from serialization
}

Packet is what is transmitted between Sender A and Receiver B, where A and B are the two endpoints in a given Session. (Endpoints are specified by the strings localInbox and destInbox in the NewSession constructor.)

Packets also flow symmetrically from Sender B to Receiver A.

Special packets have AckOnly, KeepAlive, or Closing flagged; otherwise normal packets are data segments that have neither of these flags set. Only normal data packets are tracked for timeout and retry purposes.

func CopyPacketSansData

func CopyPacketSansData(p *Packet) *Packet

func (*Packet) DecodeMsg

func (z *Packet) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Packet) EncodeMsg

func (z *Packet) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Packet) MarshalMsg

func (z *Packet) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Packet) Msgsize

func (z *Packet) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Packet) UnmarshalMsg

func (z *Packet) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RTT

type RTT struct {
	Est   float64
	Alpha float64
	N     int64
	Sd    sdTracker
}

RTT provides round-trip time estimation. Currently it is implemented as a simple single exponential moving average with alpha = 0.1 and no seasonal/cyclic terms.

func NewRTT

func NewRTT() *RTT

NewRTT makes a new RTT.

func (*RTT) AddSample

func (r *RTT) AddSample(newSample time.Duration)

AddSample adds a new RTT sample to the estimate.

func (*RTT) GetEstimate

func (r *RTT) GetEstimate() time.Duration

GetEstimate returns the current estimate.

func (*RTT) GetSd

func (r *RTT) GetSd() time.Duration

GetSd returns the standard deviation of the samples seen so far.

type ReadRequest

type ReadRequest struct {
	Done chan bool
	N    int
	Err  error
	P    []byte
}

func NewReadRequest

func NewReadRequest(p []byte) *ReadRequest

type RealClock

type RealClock struct{}

RealClock just passes the Now() call to time.Now().

var RealClk RealClock

func (RealClock) Now

func (c RealClock) Now() time.Time

Now returns time.Now().

type RecvState

type RecvState struct {
	Clk                     Clock
	Net                     Network
	Inbox                   string
	RemoteInbox             string
	NextFrameExpected       int64
	LastFrameClientConsumed int64
	Rxq                     []*RxqSlot
	RecvWindowSize          int64
	RecvWindowSizeBytes     int64

	Timeout     time.Duration
	RecvHistory []*Packet

	MsgRecv chan *Packet

	Halt *idem.Halter

	DoSendClosingCh chan *closeReq
	RecvSz          int64
	DiscardCount    int64

	LastMsgConsumed    int64
	LargestSeqnoRcvd   int64
	MaxCumulBytesTrans int64
	LastByteConsumed   int64

	LastAvailReaderBytesCap int64
	LastAvailReaderMsgCap   int64

	RcvdButNotConsumed map[int64]*Packet

	ReadyForDelivery []*Packet
	ReadMessagesCh   chan InOrderSeq
	NumHeldMessages  chan int64

	// If AsapOn is true the recevier will
	// forward packets for delivery to a
	// client as soon as they arrive
	// but without ordering guarantees;
	// and we may also drop packets if
	// the receive doesn't happen within
	// 100 msec.
	//
	// The client must have previously called
	// Session.RegisterAsap and provided a
	// channel to receive *Packet on.
	//
	// As-soon-as-possible delivery has no
	// effect on the flow-control properties
	// of the session, nor on the delivery
	// to the one-time/order-preserved clients.
	AsapOn bool

	TcpState TcpState

	AppCloseCallback  func()
	AcceptReadRequest chan *ReadRequest
	ConnectCh         chan *ConnectReq

	LocalSessNonce  string
	RemoteSessNonce string

	KeepAliveInterval time.Duration
	// contains filtered or unexported fields
}

RecvState tracks the receiver's sliding window state.

func NewRecvState

func NewRecvState(
	net Network,
	recvSz int64,
	recvSzBytes int64,
	timeout time.Duration,
	inbox string,
	snd *SenderState,
	clk Clock,
	nonce string,
	destInbox string,
	keepAliveInterval time.Duration,

) *RecvState

NewRecvState makes a new RecvState manager.

func (*RecvState) Close

func (r *RecvState) Close(zr *closeReq) error

Close is gentler than Stop(). It politely notifies the remote side to go through its full shutdown sequence. It will return before that sequence is complete.

func (*RecvState) Connect

func (r *RecvState) Connect(dest string, simulateUnderTestLostSynCount int, timeout time.Duration, numAttempts int) (remoteNonce string, err error)

func (*RecvState) HeldAsString

func (r *RecvState) HeldAsString() string

HeldAsString turns r.RcvdButNotConsumed into a string for convenience of display.

func (*RecvState) Start

func (r *RecvState) Start() error

Start begins receiving. RecvStates receives both data and acks from earlier sends. Start launches a go routine in the background.

func (*RecvState) Stop

func (r *RecvState) Stop()

Stop the RecvState componennt

func (*RecvState) UpdateControl

func (r *RecvState) UpdateControl(pack *Packet)

UpdateFlowControl updates our flow control parameters r.LastAvailReaderMsgCap and r.LastAvailReaderBytesCap based on the most recently observed Packet deliveried status, and tells the sender about this indirectly with an r.snd.FlowCt.UpdateFlow() update.

Called with each pack that RecvState receives, which is passed to UpdateFlow().

type RxqSlot

type RxqSlot struct {
	Received bool
	Pack     *Packet
}

RxqSlot is the receiver's sliding window element.

type SWP

type SWP struct {
	Sender *SenderState
	Recver *RecvState
}

SWP holds the Sliding Window Protocol state

func NewSWP

func NewSWP(net Network, windowMsgCount int64, windowByteCount int64,
	timeout time.Duration, inbox string, destInbox string, clk Clock, keepAliveInterval time.Duration, nonce string) *SWP

NewSWP makes a new sliding window protocol manager, holding both sender and receiver components.

func (*SWP) Connect

func (swp *SWP) Connect(dest string, simulateLostSynCount int, timeout time.Duration, attempts int) (remoteNonce string, err error)

func (*SWP) Start

func (s *SWP) Start(sess *Session)

Start the sliding window protocol

func (*SWP) Stop

func (s *SWP) Stop()

Stop the sliding window protocol

type SenderState

type SenderState struct {
	Clk              Clock
	Net              Network
	Inbox            string
	Dest             string
	LastAckRec       int64
	LastFrameSent    int64
	Txq              []*TxqSlot
	SenderWindowSize int64

	Timeout time.Duration

	// the main goroutine safe way to request
	// sending a packet:
	BlockingSend chan *Packet

	GotPack chan *Packet

	Halt        *idem.Halter
	SendHistory []*Packet
	SendSz      int64
	SendAck     chan *Packet

	DiscardCount int64

	LastSendTime            time.Time
	LastHeardFromDownstream time.Time
	KeepAliveInterval       time.Duration

	// after this many failed keepalives, we
	// close down the session. Set to less than 1
	// to disable the auto-close.
	NumFailedKeepAlivesBeforeClosing int

	SentButNotAckedByDeadline *retree
	SentButNotAckedBySeqNum   *retree

	// flow control params
	// last seen from our downstream
	// receiver, we throttle ourselves
	// based on these.
	LastSeenAvailReaderBytesCap int64
	LastSeenAvailReaderMsgCap   int64

	// do synchronized access via GetFlow()
	// and UpdateFlow(s.Net)
	FlowCt                 *FlowCtrl
	TotalBytesSent         int64
	TotalBytesSentAndAcked int64

	// tell the receiver that sender is terminating
	SenderShutdown chan bool

	DoSendClosingCh chan *closeReq

	LocalSessNonce  string
	RemoteSessNonce string
	// contains filtered or unexported fields
}

SenderState tracks the sender's sliding window state. To avoid circular deadlocks, the SenderState never talks directly to the RecvState. The RecvState will tell the Sender stuff on GotPack.

Acks, retries, keep-alives, and original-data: these are the four types of sends we do. Also now a closing message is sent on shutdown.

func NewSenderState

func NewSenderState(
	net Network,
	sendSz int64,
	timeout time.Duration,
	inbox string,
	destInbox string,
	clk Clock,
	keepAliveInterval time.Duration,
	nonce string,

) *SenderState

NewSenderState constructs a new SenderState struct.

func (*SenderState) ComputeInflight

func (s *SenderState) ComputeInflight() (bytesInflight int64, msgInflight int64)

ComputeInflight returns the number of bytes and messages that are in-flight: they have been sent but not yet acked.

func (*SenderState) GetDeadlineDur

func (s *SenderState) GetDeadlineDur(flow Flow) time.Duration

GetDeadlineDur returns the duration until the receive deadline using a weighted average of our observed RTT info and the remote end's observed RTT info. Add it to time.Now() before using.

func (*SenderState) GetErr

func (s *SenderState) GetErr() (err error)

func (*SenderState) GetRecvLastFrameClientConsumed

func (s *SenderState) GetRecvLastFrameClientConsumed() int64

func (*SenderState) SetErr

func (s *SenderState) SetErr(err error)

func (*SenderState) SetRecvLastFrameClientConsumed

func (s *SenderState) SetRecvLastFrameClientConsumed(nfe int64)

func (*SenderState) Start

func (s *SenderState) Start(sess *Session)

Start initiates the SenderState goroutine, which manages sends, timeouts, and resends

func (*SenderState) Stop

func (s *SenderState) Stop()

Stop the SenderState componennt

func (*SenderState) UpdateRTT

func (s *SenderState) UpdateRTT(pack *Packet)

type Session

type Session struct {
	Cfg         *SessionConfig
	Swp         *SWP
	Destination string
	MyInbox     string

	Net               Network
	ReadMessagesCh    chan InOrderSeq
	AcceptReadRequest chan *ReadRequest

	// Halt.Done.Chan is closed if session is terminated.
	// This will happen if the remote session stops
	// responding and is thus declared dead, as well
	// as after an explicit close.
	Halt *idem.Halter

	NumFailedKeepAlivesBeforeClosing int
	ConnectTimeout                   time.Duration
	ConnectAttempts                  int

	RemoteSenderClosed chan bool

	LocalSessNonce  string
	RemoteSessNonce string
	// contains filtered or unexported fields
}

Session tracks a given point-to-point sesssion and its sliding window state for one of the end-points.

func NewSession

func NewSession(cfg SessionConfig) (*Session, error)

NewSession makes a new Session, and calls Swp.Start to begin the sliding-window-protocol.

If windowByteSz is negative or less than windowMsgSz, we estimate a byte size based on 10kb messages and the given windowMsgSz. It is much better to give the aggregate byte limit you want specifically. Negative windowByteSz is merely a convenience for writing tests.

The timeout parameter controls how often we wake up to check for packets that need to be retried.

Packet timers (retry deadlines) are established using an exponential moving average + some standard deviations of the observed round-trip time of Ack packets. Retries reset the DataSentTm so there is never confusion as to which retry is being acked. The Ack packet method is not subject to two-clock skew because the same clock is used for both begin and end measurements.

func SetupRecvStream

func SetupRecvStream(
	nc *nats.Conn,
	serverHost string,
	serverNport int,
	clientName string,
	mysubj string,
	fromsubj string,
	skipTLS bool,
	errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error),
	ignoreSlowConsumerErrors bool,

) (*Session, error)

SetupRecvStream sets up to receives a stream on bytes over the existing nc, which should be a live *nats.Conn that is already connected to a server. We will the nc subscribe to listen on the mysubj topic.

if errorCallbackFunc is nil, we will panic on nats errors, with one exception: we respect the ignoreSlowConsumerErrors flag. So if ignoreSlowConsumerErrors we won't panic on a slow consumer, even if errorCallbackFunc is nil.

after SetupRecvStream(), just do

n, err := sessB.Read(p)

with the sessB returned. This will read from the session stream, into a p []byte. This is the standard interface Reader and Read() method.

func SetupSendStream

func SetupSendStream(
	nc *nats.Conn,
	serverHost string,
	serverNport int,
	clientName string,
	mysubj string,
	fromsubj string,
	skipTLS bool,
	errorCallbackFunc func(c *nats.Conn, s *nats.Subscription, e error),
	ignoreSlowConsumerErrors bool,

) (*Session, error)

just do

n, err := sessA.Write(writeme)

with the sessA returned.

func (*Session) Close

func (sess *Session) Close() error

func (*Session) Connect

func (s *Session) Connect(dest string) error

Connect takes an inbox (subject) as dest.

func (*Session) ConnectIfNeeded

func (s *Session) ConnectIfNeeded(dest string, simulateLostSynCount int) error

if not established, do the connect 3-way handshake to exchange Session nonces.

func (*Session) CountPacketsReadConsumed

func (s *Session) CountPacketsReadConsumed() int64

CountPacketsReadConsumed reports on how many packets the application has read from the session.

func (*Session) CountPacketsSentForTransfer

func (s *Session) CountPacketsSentForTransfer() int64

CountPacketsSentForTransfer reports on how many packets. the application has written to the session.

func (*Session) GetErr

func (s *Session) GetErr() (err error)

func (*Session) IncrPacketsReadConsumed

func (s *Session) IncrPacketsReadConsumed(n int64) int64

IncrPacketsReadConsumed increment packetsConsumed and return the new total.

func (*Session) IncrPacketsSentForTransfer

func (s *Session) IncrPacketsSentForTransfer(n int64) int64

IncrPacketsSentForTransfer increment packetsConsumed and return the new total.

func (*Session) IncrementClockOnReceiveForTesting

func (s *Session) IncrementClockOnReceiveForTesting()

IncrementClockOnReceiveForTesting supports testing by incrementing the Clock automatically when a packet is received. Expects a SimClock to be in use, and so to avoid mixing testing and prod this call will panic if that assumption in violated.

func (*Session) Push

func (s *Session) Push(pack *Packet)

Push sends a message packet, blocking until there is a flow-control slot available. The Push will block due to flow control to avoid over-running the receiving clients buffers.

Upon return we are not guaranteed that the the message has reached the broker or the client. If you want to Flush() to the broker, use the PushGetAck() method instead. This will hurt throughput, but may be needed if you are going to shutdown right after the send (to avoid dropping the packet before it reaches the broker).

You can use s.CountPacketsSentForTransfer() to get the total count of packets Push()-ed so far.

func (*Session) Read

func (s *Session) Read(fillme []byte) (n int, err error)

Read implements io.Reader

func (*Session) RecvFile

func (sess *Session) RecvFile() (*BigFile, error)

RecvFile will check for corruption and report it in err. We may also return the corrupted *BigFile structure for further examination.

func (*Session) RegisterAsap

func (s *Session) RegisterAsap(rcvUnordered chan *Packet, limit int64) error

RegisterAsap registers a call back channel, rcvUnordered, which will get *Packet that are unordered and possibly have gaps in their sequence (where packets where dropped). However the channel will see the packets as soon as possible. The session will still be flow controlled however, so if the receiver throttles the sender, packets may be delayed. Clients should be prepared to deal with duplicated, dropped, and mis-ordered packets on the rcvUnordered channel. The limit argument sets how many messages are queued before we drop the oldest.

func (*Session) SelfConsumeForTesting

func (s *Session) SelfConsumeForTesting()

SelfConsumeForTesting sets up a reader to read all produced messages automatically. You can use CountPacketsReadConsumed() to see the total number consumed thus far.

func (*Session) SendFile

func (sess *Session) SendFile(path string, writeme []byte, tm time.Time) (*BigFile, error)

func (*Session) SetConnectDefaults

func (s *Session) SetConnectDefaults()

func (*Session) SetErr

func (s *Session) SetErr(err error)

func (*Session) Stop

func (s *Session) Stop()

Stop shutsdown the session

func (*Session) Write

func (s *Session) Write(payload []byte) (n int, err error)

Write implements io.Writer, chopping p into packet sized pieces if need be, and sending then in order over the flow-controlled Session s.

simulateLostSynCount should be 0 unless testing for lost SYN.

type SessionConfig

type SessionConfig struct {

	// the network to use, NatsNet or SimNet
	Net Network

	// where we listen
	LocalInbox string

	// the remote destination topic for our messages
	DestInbox string

	// capacity of our receive buffers in message count
	WindowMsgCount int64

	// capacity of our receive buffers in byte count
	WindowByteSz int64

	// how often we wakeup and check
	// if packets need to be retried.
	Timeout time.Duration

	KeepAliveInterval time.Duration

	// set to -1 to disable auto-close. If
	// not set (or left at 0), then we default
	// to 50 (so after 50 keep-alive intervals
	// with no remote contact, we close the session).
	NumFailedKeepAlivesBeforeClosing int

	// the clock (real or simulated) to use
	Clk Clock

	TermCfg TermConfig
}

SessionConfig configures a Session.

type SimClock

type SimClock struct {
	When time.Time
	// contains filtered or unexported fields
}

SimClock simulates time passing. Call Advance to increment the time.

func (*SimClock) Advance

func (c *SimClock) Advance(d time.Duration) time.Time

Advance causes the simulated clock to advance by d.

func (*SimClock) Now

func (c *SimClock) Now() time.Time

Now provides the simulated current time.

func (*SimClock) Set

func (c *SimClock) Set(w time.Time)

Set sets the SimClock time to w, and w will be returned by Now() until another Set or Advance call is made.

type SimNet

type SimNet struct {
	Net      map[string]chan *Packet
	LossProb float64
	Latency  time.Duration

	FilterThisEvent map[TcpEvent]*int
	FilterCount     int // only filter this many, if > 0

	TotalSent map[string]int64
	TotalRcvd map[string]int64

	// simulate loss of the first packets
	DiscardOnce int64

	// simulate re-ordering of packets by setting this to 1
	SimulateReorderNext int

	// simulate duplicating the next packet
	DuplicateNext uint32

	// enforce that advertised windows are never
	// violated by having more messages in flight
	// than have been advertised.
	Advertised map[string]int64
	Inflight   map[string]int64

	ReqStop chan bool
	Done    chan bool

	AllowBlackHoleSends bool
	// contains filtered or unexported fields
}

SimNet simulates a network with the given latency and loss characteristics. See NewSimNet.

func NewSimNet

func NewSimNet(lossProb float64, latency time.Duration) *SimNet

NewSimNet makes a network simulator. The latency is one-way trip time; lossProb is the probability of the packet getting lost on the network.

func (*SimNet) Flush

func (net *SimNet) Flush()

func (*SimNet) Listen

func (sim *SimNet) Listen(inbox string) (chan *Packet, error)

Listen returns a channel that will be sent on when packets have Dest inbox.

func (*SimNet) Send

func (sim *SimNet) Send(pack *Packet, why string) error

Send sends the packet pack to pack.Dest. The why annoation is optional and allows the logs to illumate the purpose of each send (ack, keepAlive, data, etc).

func (*SimNet) Summary

func (net *SimNet) Summary() *Sum

Summary summarizes the packet drops in a Sum report.

type SubReport

type SubReport struct {
	Delivered          int64
	Dropped            int
	MaxMsgsQueued      int
	MaxBytesQueued     int
	PendingMsg         int
	PendingBytes       int
	LimitMsg           int
	LimitBytes         int
	SubscriptionActive bool
}

SubReport is the output of ReportOnSubscriptions.

func ReportOnSubscription

func ReportOnSubscription(s *nats.Subscription) *SubReport

ReportOnSubscription describes the status of the nats subscription s in terms of a SubReport with details the backing memory used and how close to the limits the client currently is.

type Sum

type Sum struct {
	ObsKeepRateFromA float64
	ObsKeepRateFromB float64
	// contains filtered or unexported fields
}

Sum is output by the Simnet.Summary function to summarize the packet losses in a network simulation.

func (*Sum) Print

func (s *Sum) Print()

Print displays the Sum report.

type SynAckAck

type SynAckAck struct {

	// EventSyn => also conveys SessionNonce
	TcpEvent TcpEvent

	// SessionNonce identifies the file in this session (replaces port numbers).
	// Should always be present.  See NewSessionNonce() to generate.
	SessionNonce string

	// NextExpected should be 0 if fresh start;
	// i.e. we know nothing from prior evesdropping on
	// any prior multicast of this SessionNonce. Otherwise,
	// NextExpected and NackList convey knowledge of what we have
	// and don't have to allow the sender to skip
	// repeating the packets.
	//
	// This is the next serial number that the receiver has not received.
	//
	// Only present on TcpEvent == EventSynAck.
	NextExpected int64

	// NackList can be an empty slice.
	// Nacklist is a list of missing packets (on the receiver side) that we are aware of.
	// (Nack=negative acknowledgement).
	// Only present on TcpEvent == EventSynAck.
	NackList []int64
}

SynAckAck is used as the encoded Data for EventSyn, EventSynAck, and EventEstabAck: connection setup.

func (*SynAckAck) DecodeMsg

func (z *SynAckAck) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*SynAckAck) EncodeMsg

func (z *SynAckAck) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*SynAckAck) MarshalMsg

func (z *SynAckAck) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*SynAckAck) Msgsize

func (z *SynAckAck) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*SynAckAck) UnmarshalMsg

func (z *SynAckAck) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type TcpAction

type TcpAction int
const (
	NoAction     TcpAction = 0
	SendSyn      TcpAction = 1
	SendSynAck   TcpAction = 2
	SendEstabAck TcpAction = 3
	SendFin      TcpAction = 4
	SendFinAck   TcpAction = 5
	DoAppClose   TcpAction = 6 // after App has closed, then SendFinAck
	SendDataAck  TcpAction = 7
)

func (*TcpAction) DecodeMsg

func (z *TcpAction) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (TcpAction) EncodeMsg

func (z TcpAction) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (TcpAction) MarshalMsg

func (z TcpAction) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (TcpAction) Msgsize

func (z TcpAction) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (TcpAction) String

func (i TcpAction) String() string

func (*TcpAction) UnmarshalMsg

func (z *TcpAction) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type TcpEvent

type TcpEvent int

here so that msgp can know what it is

const (
	EventNil          TcpEvent = 0
	EventStartListen  TcpEvent = 1 // server enters Listen state
	EventStartConnect TcpEvent = 2

	// client enters SynSent state
	// server enters SynReceived state, client enters SynReceived (simultaneous open)
	EventSyn TcpEvent = 3

	// server enters SynReceived
	// client enters Established, does EventSendEstabAck
	EventSynAck TcpEvent = 4

	// client enters Established
	// server enters Established
	EventEstabAck TcpEvent = 5

	EventStartClose        TcpEvent = 6
	EventApplicationClosed TcpEvent = 7

	EventFin    TcpEvent = 8
	EventFinAck TcpEvent = 9

	// common acks of data during Established state
	// aka AckOnly
	EventDataAck TcpEvent = 10
	EventData    TcpEvent = 11 // a data packet. Most common. state is Established.

	// a keepalive
	EventKeepAlive TcpEvent = 12
)

func (*TcpEvent) DecodeMsg

func (z *TcpEvent) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (TcpEvent) EncodeMsg

func (z TcpEvent) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (TcpEvent) MarshalMsg

func (z TcpEvent) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (TcpEvent) Msgsize

func (z TcpEvent) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (TcpEvent) String

func (i TcpEvent) String() string

func (*TcpEvent) UnmarshalMsg

func (z *TcpEvent) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type TcpState

type TcpState int
const (
	// init sequence
	Fresh       TcpState = 0
	Closed      TcpState = 1
	Listen      TcpState = 2 // server, passive open.
	SynReceived TcpState = 3
	SynSent     TcpState = 4 // client, active open.

	// the numbering is significant, since we'll
	// test that state is >= Established before
	// enforcing SessNonce matching (in recv.go).
	Established TcpState = 5

	// close sequence
	// if we timeout in this state, we go to Closed.
	CloseInitiatorHasSentFin TcpState = 6 // FinWait1

	// if we timeout in this state, we go to Closed.
	CloseResponderGotFin TcpState = 7 // CloseWait
)

func (*TcpState) DecodeMsg

func (z *TcpState) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (TcpState) EncodeMsg

func (z TcpState) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (TcpState) MarshalMsg

func (z TcpState) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (TcpState) Msgsize

func (z TcpState) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (TcpState) String

func (i TcpState) String() string

func (*TcpState) UnmarshalMsg

func (z *TcpState) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*TcpState) UpdateTcp

func (s *TcpState) UpdateTcp(e TcpEvent, fromState TcpState) TcpAction

UpdateTcp is a simplified version of the TCP state machine. We open with the same 3-way handshake, with Syn, SynAck, and EstabAck; but simplify the close process, opting to avoid the four-way close handshake. Typically our sessions will disappear quickly so this quick close down was more practical.

Any TcpAction returned is executed by the RecvState.doTcpAction method in recv.go.

Argument e is the received event, coming from the remote endpoint.

type TermConfig

type TermConfig struct {
	// how long a window we use for termination
	// checking. Ignored if 0.
	TermWindowDur time.Duration

	// how many unacked packets we can see inside
	// TermWindowDur before giving up and terminating
	// the session. Ignored if 0.
	TermUnackedLimit int
}

type TerminatedError

type TerminatedError struct {
	Msg string
}

func (*TerminatedError) DecodeMsg

func (z *TerminatedError) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (TerminatedError) EncodeMsg

func (z TerminatedError) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*TerminatedError) Error

func (t *TerminatedError) Error() string

func (TerminatedError) MarshalMsg

func (z TerminatedError) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (TerminatedError) Msgsize

func (z TerminatedError) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*TerminatedError) UnmarshalMsg

func (z *TerminatedError) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type TxqSlot

type TxqSlot struct {
	OrigSendTime  time.Time
	RetryDeadline time.Time
	RetryDur      time.Duration
	Pack          *Packet
}

TxqSlot is the sender's sliding window element.

func (*TxqSlot) String

func (s *TxqSlot) String() string

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL