gotalk

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2015 License: Apache-2.0 Imports: 22 Imported by: 0

README

gotalk

Gotalk exists to make it easy for programs to talk with one another over the internet, like a web app coordinating with a web server, or a bunch of programs dividing work amongst eachother.

A terribly boring amateur comic strip

Gotalk takes the natural approach of bidirectional and concurrent communication — any peer have the ability to expose "operations" as well as asking other peers to perform operations. The traditional restrictions of who can request and who can respond usually associated with a client-server model is nowhere to be found in gotalk.

Gotalk in a nutshell

Bidirectional — There's no discrimination on capabilities depending on who connected or who accepted. Both "servers" and "clients" can expose operations as well as send requests to the other side.

Concurrent — Requests, results, and notifications all share a single connection without blocking eachother by means of pipelining. There's no serialization on request-result or even for a single large message, as the gotalk protocol is frame-based and multiplexes messages over a single connection. This means you can perform several requests at once without having to think about queueing or blocking.

Diagram of how Gotalk uses connection pipelining

Simple — Gotalk has a simple and opinionated API with very few components. You expose an operation via "handle" and send requests via "request".

Debuggable — The Gotalk protocol's wire format is ASCII-based for easy on-the-wire inspection of data. For example, here's a protocol message representing an operation request: r0001005hello00000005world. The Gotalk protocol can thus be operated over any reliable byte transport.

Practical — Gotalk includes a JavaScript implementation for Web Sockets alongside the full-featured Go implementation, making it easy to build real-time web applications. The Gotalk source code also includes a number of easily-readable examples.

By example

There are a few examples in the examples directory demonstrating Gotalk. But let's explore a simple program right now — here's a little something written in Go which demonstrates the use of an operation named "greet":

func server() {
  gotalk.Handle("greet", func(in GreetIn) (GreetOut, error) {
    return GreetOut{"Hello " + in.Name}, nil
  })
  if err := gotalk.Serve("tcp", "localhost:1234"); err != nil {
    log.Fatalln(err)
  }
}

func client() {
  s, err := gotalk.Connect("tcp", "localhost:1234")
  if err != nil {
    log.Fatalln(err)
  }
  greeting := &GreetOut{}
  if err := s.Request("greet", GreetIn{"Rasmus"}, greeting); err != nil {
    log.Fatalln(err)
  }
  log.Printf("greeting: %+v\n", greeting)
  s.Close()
}

Let's look at the above example in more detail, broken apart to see what's going on.

We begin by importing the gotalk library together with log which we use for printing to the console:

package main
import (
  "log"
  "github.com/rsms/gotalk"
)

We define two types: Expected input (request parameters) and output (result) for our "greet" operation:

type GreetIn struct {
  Name string `json:"name"`
}
type GreetOut struct {
  Greeting string `json:"greeting"`
}

Registers a process-global request handler for an operation called "greet" accepting parameters of type GreetIn, returning results of type GreetOut:

func server() {
  gotalk.Handle("greet", func(in GreetIn) (GreetOut, error) {
    return GreetOut{"Hello " + in.Name}, nil
  })

Finally at the bottom of our server function we call gotalk.Serve, which starts a local TCP server on port 1234:

  if err := gotalk.Serve("tcp", "localhost:1234"); err != nil {
    log.Fatalln(err)
  }
}

In out client function we start by connecting to the server:

func client() {
  s, err := gotalk.Connect("tcp", "localhost:1234")
  if err != nil {
    log.Fatalln(err)
  }

Finally we send a request for "greet" and print the result:

  greeting := GreetOut{}
  if err := s.Request("greet", GreetIn{"Rasmus"}, &greeting); err != nil {
    log.Fatalln(err)
  }
  log.Printf("greeting: %+v\n", greeting)

  s.Close()
}

Output:

greeting: {Greeting:Hello Rasmus}

Gotalk in the web browser

Gotalk is implemented not only in the full-fledged Go package, but also in a JavaScript library. This allows writing web apps talking Gotalk via Web Sockets possible.

// server.go:
package main
import (
  "net/http"
  "github.com/rsms/gotalk"
)
func main() {
  gotalk.Handle("echo", func(in string) (string, error) {
    return in, nil
  })
  http.Handle("/gotalk/", gotalk.WebSocketHandler())
  http.Handle("/", http.FileServer(http.Dir(".")))
  err := http.ListenAndServe("localhost:1234", nil)
  if err != nil {
    panic(err)
  }
}

In our html document, we begin by registering any operations we can handle:

<!-- index.html -->
<body>
<script type="text/javascript" src="/gotalk/gotalk.js"></script>
<script>
gotalk.handle('greet', function (params, result) {
  result({ greeting: 'Hello ' + params.name });
});
</script>

Notice how we load a JavaScript from "/gotalk/gotalk.js" — a gotalk web socket server embeds a matching web browser JS library which it returns from {path where gotalk web socket is mounted}/gotalk.js. It uses Etag cache validation, so you shouldn't need to think about "cache busting" the URL.

We can't "listen & accept" connections in a web browser, but we can "connect" so we do just that:

<!-- index.html -->
<body>
<script type="text/javascript" src="/gotalk/gotalk.js"></script>
<script>
gotalk.handle('greet', function (params, result) {
  result({ greeting: 'Hello ' + params.name });
});

var s = gotalk.connection().on('open', function () {
  // do something useful
}).on('close', function (err) {
  if (err.isGotalkProtocolError) return console.error(err);
});
</script>

This is enough for enabling the server to do things in the browser ...

But you probably want to have the browser send requests to the server, so let's send a "echo" request just as our connection opens:

var s = gotalk.connection().on('open', function () {
  s.request("echo", "Hello world", function (err, result) {
    if (err) return console.error('echo failed:', err);
    console.log('echo result:', result);
  });
});

We could rewrite our code like this to allow some UI component to send a request:

var s = gotalk.connection();

button.addEventListener('click', function () {
  s.request("echo", "Hello world", function (err, result) {
    if (err) return console.error('echo failed:', err);
    console.log('echo result:', result);
  });
});

The request will fail with an error "socket is closed" if the user clicks our button while the connection isn't open.

There are two ways to open a connection on a socket: Sock.prototype.open which simply opens a connection, and Sock.prototype.openKeepAlive which keeps the connection open, reconnecting as needed with exponential back-off and internet reachability knowledge. gotalk.connection() is a short-hand for creating a new Sock with gotalk.defaultHandlers and then calling openKeepAlive on it.

Protocol and wire format

The wire format is designed to be human-readable and flexible; it's byte-based and can be efficiently implemented in a number of environments ranging from HTTP and WebSocket in a web browser to raw TCP in Go or C. The protocol provides only a small set of operations on which more elaborate operations can be modeled by the user.

This document describes protocol version 1

Here's a complete description of the protocol:

conversation    = ProtocolVersion Message*
message         = SingleRequest | StreamRequest
                | SingleResult | StreamResult
                | ErrorResult | RetryResult
                | Notification | ProtocolError

ProtocolVersion = <hexdigit> <hexdigit>

SingleRequest   = "r" requestID operation payload
StreamRequest   = "s" requestID operation payload StreamReqPart*
StreamReqPart   = "p" requestID payload
SingleResult    = "R" requestID payload
StreamResult    = "S" requestID payload StreamResult*
ErrorResult     = "E" requestID payload
RetryResult     = "e" requestID wait payload
Notification    = "n" name payload
Heartbeat       = "h" load time
ProtocolError   = "f" code

requestID       = <byte> <byte> <byte> <byte>

operation       = text3
name            = text3
wait            = hexUInt8
code            = hexUInt8
time            = hexUInt8
load            = hexUInt4

text3           = text3Size text3Value
text3Size       = hexUInt3
text3Value      = <<byte>{text3Size} as utf8 text>

payload         = payloadSize payloadData
payloadSize     = hexUInt8
payloadData     = <byte>{payloadSize}

hexUInt3        = <hexdigit> <hexdigit> <hexdigit>
hexUInt4        = <hexdigit> <hexdigit> <hexdigit> <hexdigit>
hexUInt8        = <hexdigit> <hexdigit> <hexdigit> <hexdigit>
                  <hexdigit> <hexdigit> <hexdigit> <hexdigit>
Handshake

A conversation begins with the protocol version:

01  -- ProtocolVersion 1

If the version of the protocol spoken by the other end is not supported by the reader, a ProtocolError message is sent with code 1 and the connection is terminated. Otherwise, any messages are read and/or written.

Single-payload requests and results

This is a "single-payload" request ...

+------------------ SingleRequest
|   +---------------- requestID   "0001"
|   |      +--------- operation   "echo" (text3Size 4, text3Value "echo")
|   |      |       +- payloadSize 25
|   |      |       |
r0001004echo00000019{"message":"Hello World"}

... and a corresponding "single-payload" result:

+------------------ SingleResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 25
|   |       |
R000100000019{"message":"Hello World"}

Each request is identified by exactly three bytes—the requestID—which is requestor-specific and has no purpose beyond identity, meaning the value is never interpreted. 4 bytes can express 4 294 967 296 different values, meaning we can send up to 4 294 967 295 requests while another request is still being served. Should be enough.

These "single" requests & results are the most common protocol messages, and as their names indicates, their payloads follow immediately after the header. For large payloads this can become an issue when dealing with many concurrent requests over a single connection, for which there's a more complicated "streaming" request & result type which we will explore later on.

Faults

There are two types of replies indicating a fault: ErrorResult for requestor faults and RetryResult for responder faults.

If a request is faulty, like missing some required input data or sent over an unauthorized connection, an "error" is send as the reply instead of a regular result:

+------------------ ErrorResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 38
|   |       |
E000100000026{"error":"Unknown operation \"echo\""}

A request that produces an error should not be retried as-is, similar to the 400-class of errors of the HTTP protocol.

In the scenario a fault occurs on the responder side, like suffering a temporary internal error or is unable to complete the request because of resource starvation, a RetryResult is sent as the reply to a request:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        0
|   |       |       +-- payloadSize 20
|   |       |       |
e00010000000000000014"service restarting"

In this case — where wait is zero — the requestor is free to retry the request at its convenience.

However in some scenarios the responder might require the requestor to wait for some time before retrying the request, in which case the wait property has a non-zero value:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        5000 ms
|   |       |       +-- payloadSize 20
|   |       |       |
e00010000138800000014"request rate limit"

In this case the requestor must not retry the request until at least 5000 milliseconds has passed.

If the protocol communication itself experiences issues—e.g. an illegal message is received—a ProtocolError is written and the connection is closed.

ProtocolError codes:

Code Meaning Comments
0 Abnormal Closed because of an abnormal condition (e.g. server fault, etc)
1 Unsupported protocol The other side does not support the callers protocol
2 Invalid message An invalid message was transmitted
3 Timeout The other side closed the connection because communicating took too long

Example of a peer which does not support the version of the protocol spoken by the sender:

+-------- ProtocolError
|       +-- code 1
|       |
f00000001
Streaming requests and results

For more complicated scenarios there are "streaming-payload" requests and results at our disposal. This allows transmitting of large amounts of data without the need for large buffers. For example this could be used to forward audio data to audio playback hardware, or to transmit a large file off of slow media like a tape drive or hard-disk drive.

Because transmitting a streaming request or result does not occupy "the line" (single-payloads are transmitted serially), they can also be useful when there are many concurrent requests happening over a single connection.

Here's an example of a "streaming-payload" request ...

+------------------ StreamRequest
|   +---------------- requestID   "0001"
|   |      +--------- operation   "echo" (text3Size 4, text3Value "echo")
|   |      |       +- payloadSize 11
|   |      |       |
s0001004echo0000000b{"message":

+------------------ streamReqPart
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 14
|   |       |
p00010000000e"Hello World"}

+------------------ streamReqPart
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 0 (end of stream)
|   |       |
p000100000000

... followed by a "streaming-payload" result:

+------------------ StreamResult (1st part)
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 11
|   |       |
S00010000000b{"message":

+------------------ StreamResult (2nd part)
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 14
|   |       |
S00010000000e"Hello World"}

+------------------ StreamResult
|   +---------------- requestID   "0001"
|   |       +-------- payloadSize 0 (end of stream)
|   |       |
S000100000000

Streaming requests occupy resources on the responder's side for the duration of the "stream session". Therefore handling of streaming requests should be limited and "RetryResult" used to throttle requests:

+-------------------- RetryResult
|   +------------------ requestID   "0001"
|   |       +---------- wait        5000 ms
|   |       |       +-- payloadSize 19
|   |       |       |
e00010000138800000013"stream rate limit"

This means that the requestor must not send any new requests until wait time has passed.

Notifications

When there's no expectation on a response, Gotalk provides a "notification" message type:

+---------------------- Notification
|              +--------- name        "chat message" (text3Size 12, text3Value "chat message")
|              |       +- payloadSize 46
|              |       |
n00cchat message0000002e{"message":"Hi","from":"nthn","room":"gonuts"}

Notifications are never replied to nor can they cause "error" results. Applications needing acknowledgement of notification delivery might consider using a request instead.

Heartbeats

Because most responders will limit the time it waits for reads, a heartbeat message is send at a certain interval. When a heartbeat is sent is up to the implementation.

A heartbeat contains the sender's local time in the form of an unsigned 32-bit UNIX timestamp. This is enought to cover usage until 2106. I really hope gotalk is nowhere to be found in 2106.

It also contains an optional "load" value, indicating how pressured, or under what load, the sender is. 0 means "idle" and 65535 (0xffff) means "omg I think I'm dying." This can be used to distribute work to less loaded responders in a load-balancing setup.

+------------------ Heartbeat
|   +--------- load 2
|   |       +- time 2015-02-08 22:09:30 UTC
|   |       |
h000254d7de9a
Notes

Requests and results does not need to match on the "single" vs "streaming" detail — it's perfectly fine to send a streaming request and read a single response, or send a single response just to receive a streaming result. The payload type is orthogonal to the message type, with the exception of an error response which is always a "single-payload" message, carrying any information about the error in its payload. Note however that the current version of the Go package does not provide a high-level API for mixed-kind request-response handling.

For transports which might need "heartbeats" to stay alive, like some raw TCP connections over the internet, the suggested way to implement this is by notifications, e.g. send a "heartbeat" notification at a ceretain interval while no requests are being sent. The Gotalk protocol does not include a "heartbeat" feature because of this reason, as well as the fact that some transports (like web socket) already provide "heartbeat" features.

Other implementations

MIT license

Copyright (c) 2015 Rasmus Andersson http://rsms.me/

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Documentation

Index

Constants

View Source
const (
	MsgTypeSingleReq     = MsgType('r')
	MsgTypeStreamReq     = MsgType('s')
	MsgTypeStreamReqPart = MsgType('p')
	MsgTypeSingleRes     = MsgType('R')
	MsgTypeStreamRes     = MsgType('S')
	MsgTypeErrorRes      = MsgType('E')
	MsgTypeRetryRes      = MsgType('e')
	MsgTypeNotification  = MsgType('n')
	MsgTypeHeartbeat     = MsgType('h')
	MsgTypeProtocolError = MsgType('f')
)

Protocol message types

View Source
const (
	ProtocolErrorAbnormal    = 0
	ProtocolErrorUnsupported = 1
	ProtocolErrorInvalidMsg  = 2
	ProtocolErrorTimeout     = 3
)

ProtocolError codes

View Source
const ProtocolVersion = uint8(1)

Version of this protocol

Variables

View Source
var (
	ErrAbnormal    = errors.New("abnormal condition")
	ErrUnsupported = errors.New("unsupported protocol")
	ErrInvalidMsg  = errors.New("invalid protocol message")
	ErrTimeout     = errors.New("timeout")
)
View Source
var DefaultHandlers = NewHandlers()
View Source
var ErrUnexpectedStreamingRes = errors.New("unexpected streaming response")

Returned by (Sock)BufferRequest when a streaming response is recieved

View Source
var HeartbeatMsgMaxLoad = 0xffff

Maximum value of a heartbeat's "load"

Functions

func FormatRequestID

func FormatRequestID(n uint32) []byte

Returns a 4-byte representation of a 32-bit integer, suitable an integer-based request ID.

func Handle

func Handle(op string, fn interface{})

Handle operation with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(*Sock, string, interface{}) (interface{}, error) -- takes socket, op and parameters
func(*Sock, interface{}) (interface{}, error)         -- takes socket and parameters
func(interface{}) (interface{}, error)                -- takes parameters, but no socket
func(*Sock) (interface{}, error)                      -- takes no parameters
func() (interface{},error)                            -- takes no socket or parameters

Where optionally the `interface{}` return value can be omitted, i.e:

func(*Sock, string, interface{}) error
func(*Sock, interface{}) error
func(interface{}) error
func(*Sock) error
func() error

If `op` is empty, handle all requests which doesn't have a specific handler registered.

func HandleBufferNotification

func HandleBufferNotification(name string, fn BufferNoteHandler)

Handle notifications of a certain name with raw input buffers. If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func HandleBufferRequest

func HandleBufferRequest(op string, fn BufferReqHandler)

Handle operation with raw input and output buffers. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func HandleNotification

func HandleNotification(name string, fn interface{})

Handle notifications of a certain name with automatic JSON encoding of values.

`fn` must conform to one of the following signatures:

func(s *Sock, name string, v interface{}) -- takes socket, name and parameters
func(name string, v interface{})          -- takes name and parameters, but no socket
func(v interface{})                       -- takes only parameters

If `name` is empty, handle all notifications which doesn't have a specific handler registered.

func HandleStreamRequest

func HandleStreamRequest(op string, fn StreamReqHandler)

Handle operation by reading and writing directly from/to the underlying stream. If `op` is empty, handle all requests which doesn't have a specific handler registered.

func MakeHeartbeatMsg

func MakeHeartbeatMsg(load uint16) []byte

Create a slice of bytes representing a heartbeat message

func MakeMsg

func MakeMsg(t MsgType, id, name3 string, wait, size int) []byte

Create a slice of bytes representing a message (w/o any payload)

func Pipe

func Pipe(handlers *Handlers, limits Limits) (*Sock, *Sock, error)

Creates two sockets which are connected to eachother without any resource limits. If `handlers` is nil, DefaultHandlers are used. If `limits` is nil, DefaultLimits are used.

func ReadVersion

func ReadVersion(s io.Reader) (uint8, error)

Read the version the other end implements. Returns an error if this side's protocol is incompatible with the other side's version.

func Serve

func Serve(how, addr string, config *tls.Config, acceptHandler SockHandler) error

Start a `how` server accepting connections at `addr`

func WriteVersion

func WriteVersion(s io.Writer) (int, error)

Write the version this protocol implements to `s`

Types

type BufferNoteHandler

type BufferNoteHandler func(s *Sock, name string, payload []byte)

type BufferReqHandler

type BufferReqHandler func(s *Sock, op string, payload []byte) ([]byte, error)

If a handler panics, it's assumed that the effect of the panic was isolated to the active request. Panic is recovered, a stack trace is logged, and connection is closed.

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

func NewHandlers

func NewHandlers() *Handlers

func (*Handlers) FindBufferRequestHandler

func (h *Handlers) FindBufferRequestHandler(op string) BufferReqHandler

Look up a single-buffer handler for operation `op`. Returns `nil` if not found.

func (*Handlers) FindNotificationHandler

func (h *Handlers) FindNotificationHandler(name string) BufferNoteHandler

Look up a handler for notification `name`. Returns `nil` if not found.

func (*Handlers) FindStreamRequestHandler

func (h *Handlers) FindStreamRequestHandler(op string) StreamReqHandler

Look up a stream handler for operation `op`. Returns `nil` if not found.

func (*Handlers) Handle

func (h *Handlers) Handle(op string, fn interface{})

See Handle()

func (*Handlers) HandleBufferNotification

func (h *Handlers) HandleBufferNotification(name string, fn BufferNoteHandler)

See HandleBufferNotification()

func (*Handlers) HandleBufferRequest

func (h *Handlers) HandleBufferRequest(op string, fn BufferReqHandler)

See HandleBufferRequest()

func (*Handlers) HandleNotification

func (h *Handlers) HandleNotification(name string, fn interface{})

See HandleNotification()

func (*Handlers) HandleStreamRequest

func (h *Handlers) HandleStreamRequest(op string, fn StreamReqHandler)

See HandleStreamRequest()

type Limits

type Limits interface {
	// Maximum amount of time allowed to read a buffer request. 0 = no timeout.
	// Defaults to 30 seconds.
	ReadTimeout() time.Duration
	SetReadTimeout(time.Duration)
	// contains filtered or unexported methods
}
var DefaultLimits Limits = &noLimitNoStream{30 * time.Second}

DefaultLimits does not limit buffer requests, and disables stream requests.

var NoLimits Limits = noLimit(false)

NoLimits does not limit buffer requests or stream requests, not does it have a read timeout.

func NewLimits

func NewLimits(requestLimit uint32, streamRequestLimit uint32) Limits

Create new Limits, limiting request processing.

`streamRequestLimit` limits the amount of stream requests but works together with `requestLimit` meaning that we can handle `requestLimit` requests of any type, but no more than

`streamRequestLimit` of the streaming kind. Say `streamRequestLimit=5` and `requestLimit=10`, and we are currently processing 5 streaming requests, we can handle an additional 5 buffered requests, but no more streaming requests.

  • If both `requestLimit` and `streamRequestLimit` is 0, buffer requests are not limited and stream requests are disabled.
  • If `streamRequestLimit` is 0, buffer requests are limited to `requestLimit` and stream requests are disabled.
  • If `requestLimit` is 0, buffer requests aren't limited, but stream requests are limited to `streamRequestLimit`.

type MsgType

type MsgType byte

Protocol message type

func ReadMsg

func ReadMsg(s io.Reader) (t MsgType, id, name3 string, wait, size uint32, err error)

Read a message from `s` If t is MsgTypeHeartbeat, wait==load, size==time

type Request

type Request struct {
	MsgType
	Op   string
	Data []byte
}

func NewRequest

func NewRequest(op string, buf []byte) *Request

Creates a new single request

type Response

type Response struct {
	MsgType
	Data []byte
	Wait time.Duration // only valid when IsRetry()==true
}

func (*Response) Error

func (r *Response) Error() string

Returns a string describing the error, when IsError()==true

func (*Response) IsError

func (r *Response) IsError() bool

True if this response is a requestor error (ErrorResult)

func (*Response) IsRetry

func (r *Response) IsRetry() bool

True if response is a "server can't handle it right now, please retry" (RetryResult)

func (*Response) IsStreaming

func (r *Response) IsStreaming() bool

True if this is part of a streaming response (StreamResult)

type Server

type Server struct {
	// Handlers associated with this listener. Accepted sockets inherit the value.
	Handlers *Handlers

	// Limits. Accepted sockets are subject to the same limits.
	Limits Limits

	// Function to be invoked just after a new socket connection has been accepted and
	// protocol handshake has sucessfully completed. At this point the socket is ready
	// to be used. However the function will be called in the socket's "read" goroutine,
	// meaning no messages will be received on the socket until this function returns.
	AcceptHandler SockHandler

	// Template value for accepted sockets. Defaults to 0 (no automatic heartbeats)
	HeartbeatInterval time.Duration

	// Template value for accepted sockets. Defaults to nil
	OnHeartbeat func(load int, t time.Time)
	// contains filtered or unexported fields
}

Accepts socket connections

func Listen

func Listen(how, addr string, config *tls.Config) (*Server, error)

Start a `how` server listening for connections at `addr`. You need to call Accept() on the returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()` and thus any values accepted by net.Listen are valid. The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set.

func NewServer

func NewServer(h *Handlers, limits Limits, l net.Listener) *Server

Create a new server already listening on `l`

func (*Server) Accept

func (s *Server) Accept() error

Accept connections. Blocks until Close() is called or an error occurs.

func (*Server) Addr

func (s *Server) Addr() string

Address this server is listening at

func (*Server) Close

func (s *Server) Close() error

Stop listening for and accepting connections

type Sock

type Sock struct {
	// Handlers associated with this socket
	Handlers *Handlers

	// Associate some application-specific data with this socket
	UserData interface{}

	// Enable streaming requests and set the limit for how many streaming requests this socket
	// can handle at the same time. Setting this to `0` disables streaming requests alltogether
	// (the default) while setting this to a large number might be cause for security concerns
	// as a malicious peer could send many "start stream" messages, but never sending
	// any "end stream" messages, slowly exhausting memory.
	StreamReqLimit int

	// A function to be called when the socket closes.
	// If the socket was closed because of a protocol error, `code` is >=0 and represents a
	// ProtocolError* constant.
	CloseHandler func(s *Sock, code int)

	// Automatically retry requests which can be retried
	AutoRetryRequests bool

	// HeartbeatInterval controls how much time a socket waits between sending its heartbeats.
	// If this is 0, automatic sending of heartbeats is disabled. Defaults to 20 seconds.
	HeartbeatInterval time.Duration

	// If not nil, this function is invoked when a heartbeat is recevied
	OnHeartbeat func(load int, t time.Time)
	// contains filtered or unexported fields
}

func Connect

func Connect(how, addr string, config *tls.Config) (*Sock, error)

Connect to a server via `how` at `addr`. Unless there's an error, the returned socket is already reading in a different goroutine and is ready to be used.

func NewSock

func NewSock(h *Handlers) *Sock

func (*Sock) Addr

func (s *Sock) Addr() string

Address of this socket

func (*Sock) Adopt

func (s *Sock) Adopt(c io.ReadWriteCloser)

Adopt an I/O stream, which should already be in a "connected" state. After adopting a new connection, you should call Handshake to perform the protocol handshake, followed by Read to read messages.

func (*Sock) BufferNotify

func (s *Sock) BufferNotify(name string, buf []byte) error

Send a single-buffer notification

func (*Sock) BufferRequest

func (s *Sock) BufferRequest(op string, buf []byte) ([]byte, error)

Send a single-buffer request, wait for and return the response. Automatically retries the request if needed.

func (*Sock) Close

func (s *Sock) Close() error

Close this socket

func (*Sock) CloseError

func (s *Sock) CloseError(code int) error

Close this socket because of a protocol error

func (*Sock) Conn

func (s *Sock) Conn() io.ReadWriteCloser

Access the socket's underlying connection

func (*Sock) Connect

func (s *Sock) Connect(how, addr string, config *tls.Config, limits Limits) error

Connect to a server via `how` at `addr`

func (*Sock) Handshake

func (s *Sock) Handshake() error

Before reading any messages over a socket, handshake must happen. This function will block until the handshake either succeeds or fails.

func (*Sock) Notify

func (s *Sock) Notify(name string, v interface{}) error

Send a single-value request where the value is JSON-encoded

func (*Sock) Read

func (s *Sock) Read(limits Limits) error

After completing a succesful handshake, call this function to read messages received to this socket. Does not return until the socket is closed. If HeartbeatInterval > 0 this method also sends automatic heartbeats.

func (*Sock) Request

func (s *Sock) Request(op string, in interface{}, out interface{}) error

Send a single-value request where the input and output values are JSON-encoded

func (*Sock) SendHeartbeat

func (s *Sock) SendHeartbeat(load float32) error

func (*Sock) SendRequest

func (s *Sock) SendRequest(r *Request, reschan chan Response) error

Send a single-buffer request. A response should be received from reschan.

func (*Sock) StreamRequest

func (s *Sock) StreamRequest(op string) (*StreamRequest, chan Response)

Send a multi-buffer streaming request

type SockHandler

type SockHandler func(*Sock)

type StreamReqHandler

type StreamReqHandler func(s *Sock, name string, rch chan []byte, out io.WriteCloser) error

EOS when <-rch==nil

type StreamRequest

type StreamRequest struct {
	// contains filtered or unexported fields
}

func (*StreamRequest) End

func (r *StreamRequest) End() error

func (*StreamRequest) Write

func (r *StreamRequest) Write(b []byte) error

type WebSocketServer

type WebSocketServer struct {
	Limits
	Handlers *Handlers
	OnAccept SockHandler

	// Template value for accepted sockets. Defaults to 0 (no automatic heartbeats)
	HeartbeatInterval time.Duration

	// Template value for accepted sockets. Defaults to nil
	OnHeartbeat func(load int, t time.Time)

	Server *websocket.Server
}

func WebSocketHandler

func WebSocketHandler() *WebSocketServer

Handler that can be used with the http package

func (*WebSocketServer) ServeHTTP

func (s *WebSocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
examples
pipe
A simple example of two connected sockets communicating with eachother
A simple example of two connected sockets communicating with eachother
stream
Demonstrates using streaming requests and results Demonstrates
Demonstrates using streaming requests and results Demonstrates
tcp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL