Documentation ¶
Index ¶
- Constants
- Variables
- func FormatRequestID(n uint32) []byte
- func Handle(op string, fn interface{})
- func HandleBufferNotification(name string, fn BufferNoteHandler)
- func HandleBufferRequest(op string, fn BufferReqHandler)
- func HandleNotification(name string, fn interface{})
- func HandleStreamRequest(op string, fn StreamReqHandler)
- func MakeHeartbeatMsg(load uint16) []byte
- func MakeMsg(t MsgType, id, name3 string, wait, size int) []byte
- func Pipe(handlers *Handlers, limits Limits) (*Sock, *Sock, error)
- func ReadVersion(s io.Reader) (uint8, error)
- func Serve(how, addr string, config *tls.Config, acceptHandler SockHandler) error
- func WriteVersion(s io.Writer) (int, error)
- type BufferNoteHandler
- type BufferReqHandler
- type Handlers
- func (h *Handlers) FindBufferRequestHandler(op string) BufferReqHandler
- func (h *Handlers) FindNotificationHandler(name string) BufferNoteHandler
- func (h *Handlers) FindStreamRequestHandler(op string) StreamReqHandler
- func (h *Handlers) Handle(op string, fn interface{})
- func (h *Handlers) HandleBufferNotification(name string, fn BufferNoteHandler)
- func (h *Handlers) HandleBufferRequest(op string, fn BufferReqHandler)
- func (h *Handlers) HandleNotification(name string, fn interface{})
- func (h *Handlers) HandleStreamRequest(op string, fn StreamReqHandler)
- type Limits
- type MsgType
- type Request
- type Response
- type Server
- type Sock
- func (s *Sock) Addr() string
- func (s *Sock) Adopt(c io.ReadWriteCloser)
- func (s *Sock) BufferNotify(name string, buf []byte) error
- func (s *Sock) BufferRequest(op string, buf []byte) ([]byte, error)
- func (s *Sock) Close() error
- func (s *Sock) CloseError(code int) error
- func (s *Sock) Conn() io.ReadWriteCloser
- func (s *Sock) Connect(how, addr string, config *tls.Config, limits Limits) error
- func (s *Sock) Handshake() error
- func (s *Sock) Notify(name string, v interface{}) error
- func (s *Sock) Read(limits Limits) error
- func (s *Sock) Request(op string, in interface{}, out interface{}) error
- func (s *Sock) SendHeartbeat(load float32) error
- func (s *Sock) SendRequest(r *Request, reschan chan Response) error
- func (s *Sock) StreamRequest(op string) (*StreamRequest, chan Response)
- type SockHandler
- type StreamReqHandler
- type StreamRequest
- type WebSocketServer
Constants ¶
const ( MsgTypeSingleReq = MsgType('r') MsgTypeStreamReq = MsgType('s') MsgTypeStreamReqPart = MsgType('p') MsgTypeSingleRes = MsgType('R') MsgTypeStreamRes = MsgType('S') MsgTypeErrorRes = MsgType('E') MsgTypeRetryRes = MsgType('e') MsgTypeNotification = MsgType('n') MsgTypeHeartbeat = MsgType('h') MsgTypeProtocolError = MsgType('f') )
Protocol message types
const ( ProtocolErrorAbnormal = 0 ProtocolErrorUnsupported = 1 ProtocolErrorInvalidMsg = 2 ProtocolErrorTimeout = 3 )
ProtocolError codes
const ProtocolVersion = uint8(1)
Version of this protocol
Variables ¶
var ( ErrAbnormal = errors.New("abnormal condition") ErrUnsupported = errors.New("unsupported protocol") ErrInvalidMsg = errors.New("invalid protocol message") ErrTimeout = errors.New("timeout") )
var DefaultHandlers = NewHandlers()
var ErrUnexpectedStreamingRes = errors.New("unexpected streaming response")
Returned by (Sock)BufferRequest when a streaming response is recieved
var HeartbeatMsgMaxLoad = 0xffff
Maximum value of a heartbeat's "load"
Functions ¶
func FormatRequestID ¶
Returns a 4-byte representation of a 32-bit integer, suitable an integer-based request ID.
func Handle ¶
func Handle(op string, fn interface{})
Handle operation with automatic JSON encoding of values.
`fn` must conform to one of the following signatures:
func(*Sock, string, interface{}) (interface{}, error) -- takes socket, op and parameters func(*Sock, interface{}) (interface{}, error) -- takes socket and parameters func(interface{}) (interface{}, error) -- takes parameters, but no socket func(*Sock) (interface{}, error) -- takes no parameters func() (interface{},error) -- takes no socket or parameters
Where optionally the `interface{}` return value can be omitted, i.e:
func(*Sock, string, interface{}) error func(*Sock, interface{}) error func(interface{}) error func(*Sock) error func() error
If `op` is empty, handle all requests which doesn't have a specific handler registered.
func HandleBufferNotification ¶
func HandleBufferNotification(name string, fn BufferNoteHandler)
Handle notifications of a certain name with raw input buffers. If `name` is empty, handle all notifications which doesn't have a specific handler registered.
func HandleBufferRequest ¶
func HandleBufferRequest(op string, fn BufferReqHandler)
Handle operation with raw input and output buffers. If `op` is empty, handle all requests which doesn't have a specific handler registered.
func HandleNotification ¶
func HandleNotification(name string, fn interface{})
Handle notifications of a certain name with automatic JSON encoding of values.
`fn` must conform to one of the following signatures:
func(s *Sock, name string, v interface{}) -- takes socket, name and parameters func(name string, v interface{}) -- takes name and parameters, but no socket func(v interface{}) -- takes only parameters
If `name` is empty, handle all notifications which doesn't have a specific handler registered.
func HandleStreamRequest ¶
func HandleStreamRequest(op string, fn StreamReqHandler)
Handle operation by reading and writing directly from/to the underlying stream. If `op` is empty, handle all requests which doesn't have a specific handler registered.
func MakeHeartbeatMsg ¶
Create a slice of bytes representing a heartbeat message
func Pipe ¶
Creates two sockets which are connected to eachother without any resource limits. If `handlers` is nil, DefaultHandlers are used. If `limits` is nil, DefaultLimits are used.
func ReadVersion ¶
Read the version the other end implements. Returns an error if this side's protocol is incompatible with the other side's version.
Types ¶
type BufferNoteHandler ¶
type BufferReqHandler ¶
If a handler panics, it's assumed that the effect of the panic was isolated to the active request. Panic is recovered, a stack trace is logged, and connection is closed.
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
func NewHandlers ¶
func NewHandlers() *Handlers
func (*Handlers) FindBufferRequestHandler ¶
func (h *Handlers) FindBufferRequestHandler(op string) BufferReqHandler
Look up a single-buffer handler for operation `op`. Returns `nil` if not found.
func (*Handlers) FindNotificationHandler ¶
func (h *Handlers) FindNotificationHandler(name string) BufferNoteHandler
Look up a handler for notification `name`. Returns `nil` if not found.
func (*Handlers) FindStreamRequestHandler ¶
func (h *Handlers) FindStreamRequestHandler(op string) StreamReqHandler
Look up a stream handler for operation `op`. Returns `nil` if not found.
func (*Handlers) HandleBufferNotification ¶
func (h *Handlers) HandleBufferNotification(name string, fn BufferNoteHandler)
See HandleBufferNotification()
func (*Handlers) HandleBufferRequest ¶
func (h *Handlers) HandleBufferRequest(op string, fn BufferReqHandler)
See HandleBufferRequest()
func (*Handlers) HandleNotification ¶
See HandleNotification()
func (*Handlers) HandleStreamRequest ¶
func (h *Handlers) HandleStreamRequest(op string, fn StreamReqHandler)
See HandleStreamRequest()
type Limits ¶
type Limits interface { // Maximum amount of time allowed to read a buffer request. 0 = no timeout. // Defaults to 30 seconds. ReadTimeout() time.Duration SetReadTimeout(time.Duration) // contains filtered or unexported methods }
DefaultLimits does not limit buffer requests, and disables stream requests.
NoLimits does not limit buffer requests or stream requests, not does it have a read timeout.
func NewLimits ¶
Create new Limits, limiting request processing.
`streamRequestLimit` limits the amount of stream requests but works together with `requestLimit` meaning that we can handle `requestLimit` requests of any type, but no more than
`streamRequestLimit` of the streaming kind. Say `streamRequestLimit=5` and `requestLimit=10`, and we are currently processing 5 streaming requests, we can handle an additional 5 buffered requests, but no more streaming requests.
- If both `requestLimit` and `streamRequestLimit` is 0, buffer requests are not limited and stream requests are disabled.
- If `streamRequestLimit` is 0, buffer requests are limited to `requestLimit` and stream requests are disabled.
- If `requestLimit` is 0, buffer requests aren't limited, but stream requests are limited to `streamRequestLimit`.
type Response ¶
func (*Response) IsRetry ¶
True if response is a "server can't handle it right now, please retry" (RetryResult)
func (*Response) IsStreaming ¶
True if this is part of a streaming response (StreamResult)
type Server ¶
type Server struct { // Handlers associated with this listener. Accepted sockets inherit the value. Handlers *Handlers // Limits. Accepted sockets are subject to the same limits. Limits Limits // Function to be invoked just after a new socket connection has been accepted and // protocol handshake has sucessfully completed. At this point the socket is ready // to be used. However the function will be called in the socket's "read" goroutine, // meaning no messages will be received on the socket until this function returns. AcceptHandler SockHandler // Template value for accepted sockets. Defaults to 0 (no automatic heartbeats) HeartbeatInterval time.Duration // Template value for accepted sockets. Defaults to nil OnHeartbeat func(load int, t time.Time) // contains filtered or unexported fields }
Accepts socket connections
func Listen ¶
Start a `how` server listening for connections at `addr`. You need to call Accept() on the returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()` and thus any values accepted by net.Listen are valid. The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set.
type Sock ¶
type Sock struct { // Handlers associated with this socket Handlers *Handlers // Associate some application-specific data with this socket UserData interface{} // Enable streaming requests and set the limit for how many streaming requests this socket // can handle at the same time. Setting this to `0` disables streaming requests alltogether // (the default) while setting this to a large number might be cause for security concerns // as a malicious peer could send many "start stream" messages, but never sending // any "end stream" messages, slowly exhausting memory. StreamReqLimit int // A function to be called when the socket closes. // If the socket was closed because of a protocol error, `code` is >=0 and represents a // ProtocolError* constant. CloseHandler func(s *Sock, code int) // Automatically retry requests which can be retried AutoRetryRequests bool // HeartbeatInterval controls how much time a socket waits between sending its heartbeats. // If this is 0, automatic sending of heartbeats is disabled. Defaults to 20 seconds. HeartbeatInterval time.Duration // If not nil, this function is invoked when a heartbeat is recevied OnHeartbeat func(load int, t time.Time) // contains filtered or unexported fields }
func Connect ¶
Connect to a server via `how` at `addr`. Unless there's an error, the returned socket is already reading in a different goroutine and is ready to be used.
func (*Sock) Adopt ¶
func (s *Sock) Adopt(c io.ReadWriteCloser)
Adopt an I/O stream, which should already be in a "connected" state. After adopting a new connection, you should call Handshake to perform the protocol handshake, followed by Read to read messages.
func (*Sock) BufferNotify ¶
Send a single-buffer notification
func (*Sock) BufferRequest ¶
Send a single-buffer request, wait for and return the response. Automatically retries the request if needed.
func (*Sock) CloseError ¶
Close this socket because of a protocol error
func (*Sock) Conn ¶
func (s *Sock) Conn() io.ReadWriteCloser
Access the socket's underlying connection
func (*Sock) Handshake ¶
Before reading any messages over a socket, handshake must happen. This function will block until the handshake either succeeds or fails.
func (*Sock) Read ¶
After completing a succesful handshake, call this function to read messages received to this socket. Does not return until the socket is closed. If HeartbeatInterval > 0 this method also sends automatic heartbeats.
func (*Sock) Request ¶
Send a single-value request where the input and output values are JSON-encoded
func (*Sock) SendHeartbeat ¶
func (*Sock) SendRequest ¶
Send a single-buffer request. A response should be received from reschan.
func (*Sock) StreamRequest ¶
func (s *Sock) StreamRequest(op string) (*StreamRequest, chan Response)
Send a multi-buffer streaming request
type SockHandler ¶
type SockHandler func(*Sock)
type StreamReqHandler ¶
EOS when <-rch==nil
type StreamRequest ¶
type StreamRequest struct {
// contains filtered or unexported fields
}
func (*StreamRequest) End ¶
func (r *StreamRequest) End() error
func (*StreamRequest) Write ¶
func (r *StreamRequest) Write(b []byte) error
type WebSocketServer ¶
type WebSocketServer struct { Limits Handlers *Handlers OnAccept SockHandler // Template value for accepted sockets. Defaults to 0 (no automatic heartbeats) HeartbeatInterval time.Duration // Template value for accepted sockets. Defaults to nil OnHeartbeat func(load int, t time.Time) Server *websocket.Server }
func WebSocketHandler ¶
func WebSocketHandler() *WebSocketServer
Handler that can be used with the http package
func (*WebSocketServer) ServeHTTP ¶
func (s *WebSocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
pipe
A simple example of two connected sockets communicating with eachother
|
A simple example of two connected sockets communicating with eachother |
stream
Demonstrates using streaming requests and results Demonstrates
|
Demonstrates using streaming requests and results Demonstrates |