Documentation ¶
Overview ¶
Package gofast implement high performance symmetric protocol for on the wire data transport. Each Transport{} instance encapsulate a single socket connection, and application routines can concurrently post, request, stream messages on the same transport (same socket). Internally gofast library maintain a global collection of all active Transport that are created during the lifetime of application.
NOTE: Opaque-space, is range of uint64 values reserved for tagging packets, and configured via settings argument supplied to the Transport construction. Note that Opaque-space can't be less than TagOpaqueStart.
Messages are golang objects implementing the Message{} interface. Message objects need to be subscribed with transport before they are exchanged over the transport. It is also expected that applications using gofast should pre-define messages and their Ids.
Message IDs need to be unique for every type of message transferred using gofast protocol, following id range is reserved for internal use:
0x1000 - 0x100F -- reserved messageid.
transport instantiation steps:
setts := gosettings.Settings{"log.level": "info", "log.file": logfile} golog.SetLogger(nil /* use-default-logging */, setts) LogComponents("gofast") t := NewTransport(conn, &ver, nil, settings) t.SubscribeMessage(&msg1, handler1) // subscribe message t.SubscribeMessage(&msg2, handler2) // subscribe another message t.Handshake() t.FlushPeriod(tm) // optional t.SendHeartbeat(tm) // optional
If your application is using a custom logger, implement golog.Logger{} interface on your custom logger and supply that as first argument to SetLogger().
Index ¶
- Constants
- Variables
- func DefaultSettings(start, end int64) s.Settings
- func Listhandler(w http.ResponseWriter, r *http.Request)
- func LogComponents(components ...string)
- func Stat(name string) map[string]uint64
- func Stats() map[string]uint64
- func Statshandler(w http.ResponseWriter, r *http.Request)
- type BinMessage
- type Message
- type RequestCallback
- type Stream
- type StreamCallback
- type Streamer
- type Transport
- func (t *Transport) Close() error
- func (t *Transport) DefaultHandler(handler RequestCallback) *Transport
- func (t *Transport) FlushPeriod(ms time.Duration)
- func (t *Transport) Handshake() error
- func (t *Transport) IsClosed() bool
- func (t *Transport) LocalAddr() net.Addr
- func (t *Transport) Name() string
- func (t *Transport) PeerVersion() Version
- func (t *Transport) Ping(echo string) (string, error)
- func (t *Transport) Post(msg Message, flush bool) error
- func (t *Transport) RemoteAddr() net.Addr
- func (t *Transport) Request(msg Message, flush bool, resp Message) error
- func (t *Transport) SendHeartbeat(ms time.Duration)
- func (t *Transport) Silentsince() time.Duration
- func (t *Transport) Stat() map[string]uint64
- func (t *Transport) Stream(msg Message, flush bool, rxcallb StreamCallback) (*Stream, error)
- func (t *Transport) SubscribeMessage(msg Message, handler RequestCallback) *Transport
- func (t *Transport) Whoami() (wai Whoami, err error)
- type Transporter
- type Version
- type Version64
- type Whoami
- func (msg *Whoami) Decode(in []byte) int64
- func (msg *Whoami) Encode(out []byte) []byte
- func (msg *Whoami) ID() uint64
- func (msg *Whoami) Name() string
- func (msg *Whoami) Repr() string
- func (msg *Whoami) Size() int64
- func (msg *Whoami) String() string
- func (msg *Whoami) Tags() string
- func (msg *Whoami) Version() Version
Constants ¶
const TagOpaqueEnd = 15309735
TagOpaqueEnd ending value for opaque-space
const TagOpaqueStart = 266
TagOpaqueStart starting value for opaque-space
Variables ¶
var ErrorInvalidTag = errors.New("gofast.invalidtag")
ErrorInvalidTag if supplied tag is not supported by the gofast package.
Functions ¶
func DefaultSettings ¶
DefaultSettings for gofast, start and end arguments are used to generate opaque id for streams.
Configurable parameters:
"buffersize" (int64, default: 512)
Maximum size that a single message will need for encoding.
"batchsize" (int64, default:1 )
Number of messages to batch before writing to socket, transport will create a local buffer of size buffersize * batchsize.
"chansize" (int64, default: 100000)
Buffered channel size to use for internal go-routines.
"opaque.start" (int64, default: <start-argument>)
Starting opaque range, inclusive. must be > TagOpaqueStart
"opaque.end" (int64, default: <end-argument>)
Ending opaque range, inclusive.
"tags" (int64, default: "")
Comma separated list of tags to apply, in specified order.
"gzip.level" (int64, default: <flate.BestSpeed>)
Gzip compression level, if `tags` contain "gzip".
func Listhandler ¶
func Listhandler(w http.ResponseWriter, r *http.Request)
Listhandler http handler to return list of active transport.
NOTE: This handler is used by gofast/http package. Typically application are not expected to use this function directly.
func LogComponents ¶
func LogComponents(components ...string)
LogComponents enable logging. By default logging is disabled, if applications want log information for gofast components call this function with "self" or "all" or "gofast" as argument.
func Stat ¶
Stat count for named transport object, every Transport instance can be named.
Available statistics:
"n_tx", number of messages transmitted.
"n_flushes", number of times message-batches where flushed.
"n_txbyte", number of bytes transmitted on socket.
"n_txpost", number of post messages transmitted.
"n_txreq", number of request messages transmitted.
"n_txresp", number of response messages transmitted.
"n_txstart", number of start messages transmitted, indicates the number of streams started by this local node.
"n_txstream", number of stream messages transmitted.
"n_txfin", number of finish messages transmitted, indicates the number of streams closed by this local node, should always match "n_txstart" plus active streams.
"n_rx", number of packets received.
"n_rxbyte", number of bytes received from socket.
"n_rxpost", number of post messages received.
"n_rxreq", number of request messages received.
"n_rxresp", number of response messages received.
"n_rxstart", number of start messages received, indicates the number of streams started by the remote node.
"n_rxstream", number of stream messages received.
"n_rxfin", number of finish messages received, indicates the number of streams closed by the remote node, should always match "n_rxstart" plus active streams.
"n_rxbeats", number of heartbeats received.
"n_dropped", bytes dropped.
"n_mdrops", messages dropped.
Note that `n_dropped` and `n_mdrops` are counted because gofast supports either end to finish an ongoing stream of messages. It might be normal to see non-ZERO values.
func Stats ¶
Stats return consolidated counts of all transport objects. Refer gofast.Stat() api for more information.
func Statshandler ¶
func Statshandler(w http.ResponseWriter, r *http.Request)
Statshandler http handler to handle statistics endpoint, returns statistics for specified transport or aggregate statistics of all transports, based on the query parameters.
NOTE: This handler is used by gofast/http package. Typically application are not expected to use this function directly.
Types ¶
type BinMessage ¶
BinMessage is a tuple of {id, encodedmsg-slice}. This type is used on the receiver side of the transport.
type Message ¶
type Message interface { // ID return a unique message identifier. ID() uint64 // Encode message to binary blob. Encode(out []byte) []byte // Decode this message from a binary blob. Decode(in []byte) (n int64) // Size is maximum memory foot-print needed for encoding this message. Size() int64 // String representation of this message, used for logging. String() string }
Message interface, shall implemented by all messages exchanged via gofast-transport.
type RequestCallback ¶
type RequestCallback func(*Stream, BinMessage) StreamCallback
RequestCallback handler called for an incoming post, or request, or stream message. On either side of the connection, RequestCallback initiates a new incoming exchange - be it a Post or Request or Stream type. Applications should first register request-handler for expecting message-ids, and also register a default handler to catch all other messages:
Stream pointer will be nil if incoming message is a POST.
Handler shall not block and must be as light-weight as possible.
If request is initiating a stream of messages from remote, handler should return a stream-callback. StreamCallback will dispatched for every new messages on this stream.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream for a newly started stream on the transport. Sender can initiate a new stream by calling Transport.Stream() API, while receiver will return a Stream instance via RequestCallback.
type StreamCallback ¶
type StreamCallback func(BinMessage, bool)
StreamCallback handler called for an incoming message on a stream, the boolean argument, if false, indicates whether remote has closed.
type Streamer ¶
type Streamer interface { // Response to a request, to batch the response pass flush as false. Response(msg Message, flush bool) error // Stream a single message, to batch the message pass flush as false. Stream(msg Message, flush bool) error // Close this stream. Close() error }
Streamer interface to send/receive bidirectional streams of several messages.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is a peer-to-peer transport enabler.
func NewTransport ¶
func NewTransport( name string, conn Transporter, version Version, setts s.Settings) (*Transport, error)
NewTransport encapsulate a transport over this connection, one connection one transport.
func (*Transport) DefaultHandler ¶
func (t *Transport) DefaultHandler(handler RequestCallback) *Transport
DefaultHandler register a default handler to handle all messages. If incoming message-id does not have a matching handler, default handler is dispatched, it is more like a catch-all for incoming messages.
NOTE: handler shall not block and must be as light-weight as possible
func (*Transport) FlushPeriod ¶
FlushPeriod to periodically flush batched packets.
func (*Transport) Handshake ¶
Handshake with remote, shall be called after NewTransport(), before application messages are exchanged between nodes. Specifically, following information will be gathered from romote:
- Peer version, can later be queried via PeerVersion() API.
- Tags settings.
func (*Transport) PeerVersion ¶
PeerVersion from peer node.
func (*Transport) RemoteAddr ¶
RemoteAddr of this connection.
func (*Transport) Request ¶
Request a response from peer. Caller is expected to pass reference to an expected response message, this also implies that every request can expect only one response type. This also have an added benefit of reducing the memory pressure on GC.
func (*Transport) SendHeartbeat ¶
SendHeartbeat periodically to remote peer, this can help in detecting inactive, or half-open connections.
func (*Transport) Silentsince ¶
Silentsince returns the timestamp of last heartbeat message received from peer. If ZERO, remote is not using the heart-beat mechanism.
func (*Transport) Stat ¶
Stat shall return the stat counts for this transport. Refer gofast.Stat() api for more information.
func (*Transport) SubscribeMessage ¶
func (t *Transport) SubscribeMessage(msg Message, handler RequestCallback) *Transport
SubscribeMessage that shall be exchanged via this transport. Only subscribed messages can be exchanged. And for every incoming message with its ID equal to msg.ID(), handler will be dispatch.
NOTE: handler shall not block and must be as light-weight as possible
type Transporter ¶
type Transporter interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) LocalAddr() net.Addr RemoteAddr() net.Addr Close() error }
Transporter interface to send and receive packets, connection object shall implement this interface.
type Version ¶
type Version interface { // Less than supplied version. Less(ver Version) bool // Equal to the supplied version. Equal(ver Version) bool // Encode version into array of bytes. Encode(out []byte) []byte // Decode array of bytes to version object. Decode(in []byte) (n int64) // Size return memory foot-print encoded version Size() int64 // String representation of version, for logging. String() string }
Version interface to exchange and manage transport's version.
type Version64 ¶
type Version64 uint64
Version64 example version implementation.
type Whoami ¶
type Whoami struct {
// contains filtered or unexported fields
}
Whoami messages exchanged by remotes.
func (*Whoami) Name ¶
Name return name of the transport, either local or remote based on the context in which Whoami was obtained.