gofast

package module
v0.0.0-...-5c982d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2021 License: MIT Imports: 22 Imported by: 0

README

Summary

High performance protocol for distributed applications.

Build Status Coverage Status GoDoc Go Report Card

  • CBOR,(Concise Binary Object Representation) based protocol, avoids yet another protocol frame. Well formed gofast packets are fully CBOR compliant.
  • Symmetric protocol - communication starts by client initiating the connection with server, but there after client and server can exchange messages like peers, that is both ends can:
    • POST messages to remote node.
    • REQUEST a RESPONSE from remote node.
    • Start one or more bi-direction STREAM with remote node.
  • Concurrent request on a single connection, improves throughput when latency is high.
  • Configurable batching of packets scheduled for transmission.
  • Periodic flusher for batching response and streams.
  • Send periodic heartbeat to remote node.
  • Add transport level compression like gzip, lzw ...
  • Sub-μs protocol overhead.
  • Scales with number of connection and number of cores.
  • And most importantly - does not attempt to solve all the world's problem.

dev-notes

  • Transport{} is safe for concurrent access.
  • Stream{} is not safe for concurrent access.

Frame-format

Frames encode packet in CBOR compliant form, identifying the exchange as one of the following:

Post-request, client post a packet and expects no response:

| 0xd9 0xd9f7 | 0xc6 | packet |

Request-response, client make a request and expects a single response:

| 0xd9 0xd9f7 | 0x81 | packet |

Bi-directional streaming, where client and server will have to close the stream by sending a 0xff:

 | 0xd9 0xd9f7         | 0x9f | packet1    |
        | 0xd9 0xd9f7  | 0xc7 | packet2    |
        ...
        | 0xd9 0xd9f7  | 0xc8 | end-packet |
  • 0xd9 says frame is a tag with 2-bye extension.
  • Following two bytes 0xd9f7 is tag-number Tag-55799.
  • As per the RFC - 0xd9 0xd9f7 appears not to be in use as a distinguishing mark for frequently used file types.
  • Maximum length of a packet can be 4GB.
  • 0xc6 is gofast reserved tag (tagvalue-6) to denote that the following packet is a post.
  • 0x81 denotes a cbor array of single item, a special meaning for new request that expects a single response from remote.
  • 0x9f denotes a cbor array of indefinite items, a special meaning for a new request that starts a bi-directional stream.
  • 0xc7 is gofast reserved tag (tagvalue-7) to denote that the following packet is part of a stream.
  • 0xc8 is gofast reserved tag (tagvalue-8) to denote that this packet is an end-packet closing the bi-directional stream.
  • Packet shall always be encoded as CBOR byte-array.

Except for post-request, the exchange between client and server is always symmetrical.

Packet-format

A packet is CBOR byte-array that can carry tags and payloads, it has the following format:

  | len | tag1 |         payload1               |
               | tag2 |      payload2           |
                      | tag3 |   payload3       |
                             | tag 4 | hdr-data |
  • Entire packet is encoded as CBOR byte-array.
  • len is nothing but the byte-array length (Major-type-2).
  • Payload shall always be encoded as CBOR byte-array.
  • HDR-DATA shall always be encoded as CBOR map.
  • Tags are uint64 numbers that will either be prefixed to payload or hdr-data.
  • Tag1, will always be a opaque number falling within a reserved tag-space called opaque-space.
  • Opaque-space should not start before 256.
  • Tag2, Tag3 can be one of the values predefined by this library.
  • Final embedded tag, in this case tag4, shall always be tagMsg (value 43).

hdr-data

  • TagId, identifies message with unique id.
  • TagData, identified encoded message as byte array.

end-packet

    | len | tag1  | 0x40 | 0xff |
  • End-packet does not carry any useful payload, it simply signifies a stream close.
  • For that, tag1 opaque-value is required to identify the stream.
  • 0x40 is the single-byte payload for this packet, which says that the payload-data is ZERO bytes.
  • The last 0xff is important since it will match with 0x9f that indicates a stream-start as Indefinite array of items.

Reading a frame from socket

Framing of packets are done such that any gofast packet will at-least be 9 bytes long. Here is how it happens:

  • The smallest payload should be at-least 1 byte length, because it is encoded as CBOR byte-array or as end-packet (0xff).
  • Every payload will be prefix with opaque-tag, which is always >= 256 in value. That is 3 bytes.
  • Entire packet is encoded as CBOR byte-array, that is another 1 byte overhead.
  • And finally framing always takes up 4 bytes.

That is a total of: 1 + 3 + 1 + 4

Incidentally these 9 bytes are enough to learn how many more bytes to read from the socket to complete the entire packet.

Regarding Opaque-value

By design opaque value should be >= 256. These are ephemeral tag values that do not carry any meaning other than identifying the stream. Opaque values will continuously reused for the life-time of connection. Users are expected to give a range of these ephemeral tag-values, and gofast will skip reserved TAGS.

Reserved-tags

Following list of CBOR tags are reserved for gofast protocol. Some of these tags may be standardised by CBOR specification, but the choice of these tag values will speed-up frame encoding and decoding.

tag-6 TagPost, following tagged CBOR byte-array carries a POST request.

tag-7 TagStream, following tagged CBOR byte-array carries a STREAM message.

tag-8 TagFinish, following is tagged CBOR breakstop (0xff) item.

tag-43 TagMsg, following CBOR map carries message header and data.

tag-44 TagId, used as key in CBOR header-data mapping to unique message ID.

tag-45 TagData, used as key in CBOR header-data mapping to message, binary encoded as CBOR byte-array.

tag-46 TagGzip, following CBOR byte array is compressed using gzip encoding.

tag-47 TagLzw, following CBOR byte array is compressed using gzip encoding.

These reserved tags are not part of CBOR specification or IANA registry, please refer/follow issue #1.

Sizing

Based on the configuration following heap allocations can affect memory sizing.

  • Batch of packets copied into a single buffers before flushing into socket: writebuf := make([]byte, batchsize*buffersize) for configured range of opaque space between [opaque.start, opaque.end]
  • As many stream{} objects will be pre-created and pooled: ((opaque.end-opaque.start)+1) * sizeof(stream{})
  • Each stream will allocate 3 buffers for sending/receiving packets. buffersize * 3
  • As many txproto{} objects will be pre-create and pooled: ((opaque.end-opaque.start)+1) * sizeof(txproto{})
  • As many tx protocol encode buffers will be pre-created and pooled: ((opaque.end-opaque.start)+1) * buffersize

Panic and Recovery

Panics are to expected when APIs are misused. Programmers might choose to ignore the errors, but not panics. For example:

  • When trying to subscribe message to transport whose ID is already reserved.
  • When transport is created with invalid opaque-range.
  • All transport instances are named, and if transports are created twice with same name.
  • Trying to close an invalid transport.
  • When unforeseen panic happens in doTx() routine, it recovers from panic, dumps the stack-trace, closes the transport and exits.
  • When unforeseen panic happens in doRx() routine, it recovers from panic, dumps the stack-trace, closes the transport and exits.
  • When unforeseen panic happens in syncRx() routine, it recovers from panic, dumps the stack trace, closes the transport, all live pending streams and exits.

How to contribute

Issue Stats Issue Stats

  • Pick an issue, or create an new issue. Provide adequate documentation for the issue.
  • Assign the issue or get it assigned.
  • Work on the code, once finished, raise a pull request.
  • Gofast is written in golang, hence expected to follow the global guidelines for writing go programs.
  • If the changeset is more than few lines, please generate a report card.
  • As of now, branch master is the development branch.

Documentation

Overview

Package gofast implement high performance symmetric protocol for on the wire data transport. Each Transport{} instance encapsulate a single socket connection, and application routines can concurrently post, request, stream messages on the same transport (same socket). Internally gofast library maintain a global collection of all active Transport that are created during the lifetime of application.

NOTE: Opaque-space, is range of uint64 values reserved for tagging packets, and configured via settings argument supplied to the Transport construction. Note that Opaque-space can't be less than TagOpaqueStart.

Messages are golang objects implementing the Message{} interface. Message objects need to be subscribed with transport before they are exchanged over the transport. It is also expected that applications using gofast should pre-define messages and their Ids.

Message IDs need to be unique for every type of message transferred using gofast protocol, following id range is reserved for internal use:

0x1000 - 0x100F -- reserved messageid.

transport instantiation steps:

setts := gosettings.Settings{"log.level": "info", "log.file": logfile}
golog.SetLogger(nil /* use-default-logging */, setts)
LogComponents("gofast")

t := NewTransport(conn, &ver, nil, settings)
t.SubscribeMessage(&msg1, handler1) // subscribe message
t.SubscribeMessage(&msg2, handler2) // subscribe another message
t.Handshake()
t.FlushPeriod(tm)                   // optional
t.SendHeartbeat(tm)                 // optional

If your application is using a custom logger, implement golog.Logger{} interface on your custom logger and supply that as first argument to SetLogger().

Index

Constants

View Source
const TagOpaqueEnd = 15309735

TagOpaqueEnd ending value for opaque-space

View Source
const TagOpaqueStart = 266

TagOpaqueStart starting value for opaque-space

Variables

View Source
var ErrorInvalidTag = errors.New("gofast.invalidtag")

ErrorInvalidTag if supplied tag is not supported by the gofast package.

Functions

func DefaultSettings

func DefaultSettings(start, end int64) s.Settings

DefaultSettings for gofast, start and end arguments are used to generate opaque id for streams.

Configurable parameters:

"buffersize" (int64, default: 512)

Maximum size that a single message will need for encoding.

"batchsize" (int64, default:1 )

Number of messages to batch before writing to socket, transport
will create a local buffer of size buffersize * batchsize.

"chansize" (int64, default: 100000)

Buffered channel size to use for internal go-routines.

"opaque.start" (int64, default: <start-argument>)

Starting opaque range, inclusive. must be > TagOpaqueStart

"opaque.end" (int64, default: <end-argument>)

Ending opaque range, inclusive.

"tags" (int64, default: "")

Comma separated list of tags to apply, in specified order.

"gzip.level" (int64, default: <flate.BestSpeed>)

Gzip compression level, if `tags` contain "gzip".

func Listhandler

func Listhandler(w http.ResponseWriter, r *http.Request)

Listhandler http handler to return list of active transport.

NOTE: This handler is used by gofast/http package. Typically application are not expected to use this function directly.

func LogComponents

func LogComponents(components ...string)

LogComponents enable logging. By default logging is disabled, if applications want log information for gofast components call this function with "self" or "all" or "gofast" as argument.

func Stat

func Stat(name string) map[string]uint64

Stat count for named transport object, every Transport instance can be named.

Available statistics:

"n_tx", number of messages transmitted.

"n_flushes", number of times message-batches where flushed.

"n_txbyte", number of bytes transmitted on socket.

"n_txpost", number of post messages transmitted.

"n_txreq", number of request messages transmitted.

"n_txresp", number of response messages transmitted.

"n_txstart", number of start messages transmitted, indicates the number of streams started by this local node.

"n_txstream", number of stream messages transmitted.

"n_txfin", number of finish messages transmitted, indicates the number of streams closed by this local node, should always match "n_txstart" plus active streams.

"n_rx", number of packets received.

"n_rxbyte", number of bytes received from socket.

"n_rxpost", number of post messages received.

"n_rxreq", number of request messages received.

"n_rxresp", number of response messages received.

"n_rxstart", number of start messages received, indicates the number of streams started by the remote node.

"n_rxstream", number of stream messages received.

"n_rxfin", number of finish messages received, indicates the number of streams closed by the remote node, should always match "n_rxstart" plus active streams.

"n_rxbeats", number of heartbeats received.

"n_dropped", bytes dropped.

"n_mdrops", messages dropped.

Note that `n_dropped` and `n_mdrops` are counted because gofast supports either end to finish an ongoing stream of messages. It might be normal to see non-ZERO values.

func Stats

func Stats() map[string]uint64

Stats return consolidated counts of all transport objects. Refer gofast.Stat() api for more information.

func Statshandler

func Statshandler(w http.ResponseWriter, r *http.Request)

Statshandler http handler to handle statistics endpoint, returns statistics for specified transport or aggregate statistics of all transports, based on the query parameters.

NOTE: This handler is used by gofast/http package. Typically application are not expected to use this function directly.

Types

type BinMessage

type BinMessage struct {
	ID   uint64
	Data []byte
}

BinMessage is a tuple of {id, encodedmsg-slice}. This type is used on the receiver side of the transport.

type Message

type Message interface {
	// ID return a unique message identifier.
	ID() uint64

	// Encode message to binary blob.
	Encode(out []byte) []byte

	// Decode this message from a binary blob.
	Decode(in []byte) (n int64)

	// Size is maximum memory foot-print needed for encoding this message.
	Size() int64

	// String representation of this message, used for logging.
	String() string
}

Message interface, shall implemented by all messages exchanged via gofast-transport.

type RequestCallback

type RequestCallback func(*Stream, BinMessage) StreamCallback

RequestCallback handler called for an incoming post, or request, or stream message. On either side of the connection, RequestCallback initiates a new incoming exchange - be it a Post or Request or Stream type. Applications should first register request-handler for expecting message-ids, and also register a default handler to catch all other messages:

Stream pointer will be nil if incoming message is a POST.

Handler shall not block and must be as light-weight as possible.

If request is initiating a stream of messages from remote, handler should return a stream-callback. StreamCallback will dispatched for every new messages on this stream.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream for a newly started stream on the transport. Sender can initiate a new stream by calling Transport.Stream() API, while receiver will return a Stream instance via RequestCallback.

func (*Stream) Close

func (s *Stream) Close() error

Close this stream.

func (*Stream) Response

func (s *Stream) Response(msg Message, flush bool) error

Response to a request, to batch the response pass flush as false.

func (*Stream) Stream

func (s *Stream) Stream(msg Message, flush bool) (err error)

Stream a single message, to batch the message pass flush as false.

func (*Stream) Transport

func (s *Stream) Transport() *Transport

Transport return the underlying transport carrying this stream.

type StreamCallback

type StreamCallback func(BinMessage, bool)

StreamCallback handler called for an incoming message on a stream, the boolean argument, if false, indicates whether remote has closed.

type Streamer

type Streamer interface {
	// Response to a request, to batch the response pass flush as false.
	Response(msg Message, flush bool) error

	// Stream a single message, to batch the message pass flush as false.
	Stream(msg Message, flush bool) error

	// Close this stream.
	Close() error
}

Streamer interface to send/receive bidirectional streams of several messages.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a peer-to-peer transport enabler.

func NewTransport

func NewTransport(
	name string, conn Transporter, version Version,
	setts s.Settings) (*Transport, error)

NewTransport encapsulate a transport over this connection, one connection one transport.

func (*Transport) Close

func (t *Transport) Close() error

Close this transport, connection shall be closed as well.

func (*Transport) DefaultHandler

func (t *Transport) DefaultHandler(handler RequestCallback) *Transport

DefaultHandler register a default handler to handle all messages. If incoming message-id does not have a matching handler, default handler is dispatched, it is more like a catch-all for incoming messages.

NOTE: handler shall not block and must be as light-weight as possible

func (*Transport) FlushPeriod

func (t *Transport) FlushPeriod(ms time.Duration)

FlushPeriod to periodically flush batched packets.

func (*Transport) Handshake

func (t *Transport) Handshake() error

Handshake with remote, shall be called after NewTransport(), before application messages are exchanged between nodes. Specifically, following information will be gathered from romote:

  • Peer version, can later be queried via PeerVersion() API.
  • Tags settings.

func (*Transport) IsClosed

func (t *Transport) IsClosed() bool

IsClosed return whether this transport is closed or not.

func (*Transport) LocalAddr

func (t *Transport) LocalAddr() net.Addr

LocalAddr of this connection.

func (*Transport) Name

func (t *Transport) Name() string

Name returns the transport-name.

func (*Transport) PeerVersion

func (t *Transport) PeerVersion() Version

PeerVersion from peer node.

func (*Transport) Ping

func (t *Transport) Ping(echo string) (string, error)

Ping pong with peer, returns the pong string.

func (*Transport) Post

func (t *Transport) Post(msg Message, flush bool) error

Post request to peer.

func (*Transport) RemoteAddr

func (t *Transport) RemoteAddr() net.Addr

RemoteAddr of this connection.

func (*Transport) Request

func (t *Transport) Request(msg Message, flush bool, resp Message) error

Request a response from peer. Caller is expected to pass reference to an expected response message, this also implies that every request can expect only one response type. This also have an added benefit of reducing the memory pressure on GC.

func (*Transport) SendHeartbeat

func (t *Transport) SendHeartbeat(ms time.Duration)

SendHeartbeat periodically to remote peer, this can help in detecting inactive, or half-open connections.

func (*Transport) Silentsince

func (t *Transport) Silentsince() time.Duration

Silentsince returns the timestamp of last heartbeat message received from peer. If ZERO, remote is not using the heart-beat mechanism.

func (*Transport) Stat

func (t *Transport) Stat() map[string]uint64

Stat shall return the stat counts for this transport. Refer gofast.Stat() api for more information.

func (*Transport) Stream

func (t *Transport) Stream(
	msg Message, flush bool, rxcallb StreamCallback) (*Stream, error)

Stream a bi-directional stream with peer.

func (*Transport) SubscribeMessage

func (t *Transport) SubscribeMessage(msg Message, handler RequestCallback) *Transport

SubscribeMessage that shall be exchanged via this transport. Only subscribed messages can be exchanged. And for every incoming message with its ID equal to msg.ID(), handler will be dispatch.

NOTE: handler shall not block and must be as light-weight as possible

func (*Transport) Whoami

func (t *Transport) Whoami() (wai Whoami, err error)

Whoami shall return remote's Whoami.

type Transporter

type Transporter interface {
	Read(b []byte) (n int, err error)
	Write(b []byte) (n int, err error)
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
}

Transporter interface to send and receive packets, connection object shall implement this interface.

type Version

type Version interface {
	// Less than supplied version.
	Less(ver Version) bool

	// Equal to the supplied version.
	Equal(ver Version) bool

	// Encode version into array of bytes.
	Encode(out []byte) []byte

	// Decode array of bytes to version object.
	Decode(in []byte) (n int64)

	// Size return memory foot-print encoded version
	Size() int64

	// String representation of version, for logging.
	String() string
}

Version interface to exchange and manage transport's version.

type Version64

type Version64 uint64

Version64 example version implementation.

func (*Version64) Decode

func (v *Version64) Decode(in []byte) int64

Decode implement Version interface{}.

func (*Version64) Encode

func (v *Version64) Encode(out []byte) []byte

Encode implement Version interface{}.

func (*Version64) Equal

func (v *Version64) Equal(other Version) bool

Equal implement Version interface{}.

func (*Version64) Less

func (v *Version64) Less(other Version) bool

Less implement Version interface{}.

func (*Version64) Size

func (v *Version64) Size() int64

Size implement Version interface{}.

func (*Version64) String

func (v *Version64) String() string

String implement Version interface{}.

type Whoami

type Whoami struct {
	// contains filtered or unexported fields
}

Whoami messages exchanged by remotes.

func (*Whoami) Decode

func (msg *Whoami) Decode(in []byte) int64

Decode implement Message interface{}.

func (*Whoami) Encode

func (msg *Whoami) Encode(out []byte) []byte

Encode implement Message interface{}.

func (*Whoami) ID

func (msg *Whoami) ID() uint64

ID implement Message interface{}.

func (*Whoami) Name

func (msg *Whoami) Name() string

Name return name of the transport, either local or remote based on the context in which Whoami was obtained.

func (*Whoami) Repr

func (msg *Whoami) Repr() string

func (*Whoami) Size

func (msg *Whoami) Size() int64

Size implement Message interface{}.

func (*Whoami) String

func (msg *Whoami) String() string

String implement Message interface{}.

func (*Whoami) Tags

func (msg *Whoami) Tags() string

Tags return comma separated value of tags, either local or remote based on the context in which Whoami was obtained.

func (*Whoami) Version

func (msg *Whoami) Version() Version

Version return endpoint's Version, either local or remote based on the context in which Whoami was obtained.

Directories

Path Synopsis
Package http implement gofast http endpoints, subscribed to net/http.DefaultServerMux.
Package http implement gofast http endpoints, subscribed to net/http.DefaultServerMux.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL