Documentation ¶
Overview ¶
Package chanserv provides a simple message queue framework based upon nested Go-lang channels being served using AstraNet.
Index ¶
- Constants
- Variables
- type Client
- type ClientOption
- func ClientDialTimeout(d time.Duration) ClientOption
- func ClientFrameBufferSize(size int) ClientOption
- func ClientFrameReadTimeout(d time.Duration) ClientOption
- func ClientMasterReadTimeout(d time.Duration) ClientOption
- func ClientMasterWriteTimeout(d time.Duration) ClientOption
- func ClientOnError(fn func(err error)) ClientOption
- func ClientSourceBufferSize(size int) ClientOption
- type Frame
- type MetaData
- type Multiplexer
- type RequestTag
- type Server
- type ServerOption
- func ServerChanAcceptTimeout(d time.Duration) ServerOption
- func ServerFrameWriteTimeout(d time.Duration) ServerOption
- func ServerMasterReadTimeout(d time.Duration) ServerOption
- func ServerMasterWriteTimeout(d time.Duration) ServerOption
- func ServerMaxErrorMass(mass int) ServerOption
- func ServerOnChanError(fn func(err error)) ServerOption
- func ServerOnError(fn func(err error)) ServerOption
- func ServerOnMaxErrorMass(fn func(mass int, err error)) ServerOption
- func ServerServingTimeout(d time.Duration) ServerOption
- func ServerSourcingTimeout(d time.Duration) ServerOption
- func ServerUseCompression(v bool) ServerOption
- type Source
- type SourceFunc
Constants ¶
const FrameSizeLimit = 100 * 1024 * 1024
FrameSizeLimit specifies the maximum size of payload in a frame, this limit may be increased or lifted in future.
Variables ¶
var CompressionHeader = []byte("lz4!")
var ErrWrongSize = errors.New("wrong frame size")
var ErrWrongUncompressedSize = errors.New("wrong uncompressed frame size")
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // LookupAndPost tries to discover the given vAddr, and posts the body to the server's SourceFunc. // Provide additional tags if you want to change behaviour of the service discovery and set additional // request params. Returns a new source subscribtion or error if any. The subscription channel will be closed // upon network error or success on the remote side. LookupAndPost(vAddr string, body []byte, tags map[RequestTag]string) (<-chan Source, error) }
func NewClient ¶
func NewClient(mpx Multiplexer, opts ...ClientOption) Client
NewClient initializes a new client using the provided multiplexer for the network capabilities. Refer to the client options if you want to specify timeouts and error callbacks.
type ClientOption ¶
type ClientOption func(c *client)
ClientOption applies a configuration option to the client.
func ClientDialTimeout ¶
func ClientDialTimeout(d time.Duration) ClientOption
ClientDialTimeout specifies the timeout for dialing the remote host. Default value: 10s.
func ClientFrameBufferSize ¶
func ClientFrameBufferSize(size int) ClientOption
ClientFrameBufferSize specifies the frame channel buffer size. Default value: 1024.
func ClientFrameReadTimeout ¶
func ClientFrameReadTimeout(d time.Duration) ClientOption
ClientFrameReadTimeout specifies the timeout for reading a frame data bytes from descendant chan connections that have benn discovered.
func ClientMasterReadTimeout ¶
func ClientMasterReadTimeout(d time.Duration) ClientOption
ClientMasterReadTimeout specifies the timeout for reading a source announcement bytes from the master connection.
func ClientMasterWriteTimeout ¶
func ClientMasterWriteTimeout(d time.Duration) ClientOption
ClientMasterWriteTimeout specifies the timeout for writing the request body bytes after the connection has been accepted by the server.
func ClientOnError ¶
func ClientOnError(fn func(err error)) ClientOption
ClientOnError specifies a function to call upon an error.
func ClientSourceBufferSize ¶
func ClientSourceBufferSize(size int) ClientOption
ClientSourceBufferSize specifies the source announcements channel buffer size. Default value: 128.
type Frame ¶
type Frame interface { // Bytes returns a byte representation of the payload. Bytes() []byte }
Frame represents the payload to send over the channel, allowing user to implement any serialisation logic by himself.
For example, having your Message struct implement the Bytes() method that uses cap'n'proto or protobuf to return the representation as bytes is a good idea.
type MetaData ¶
type MetaData interface { // RemoteAddr indicates the originating node's virtual address, e.g. VHyWCWr39kI:1697777 RemoteAddr() string }
MetaData for the source, usually is available on the client-side only, and is created by the chanserv itself.
type Multiplexer ¶
type Multiplexer interface { Bind(net, laddr string) (net.Listener, error) DialTimeout(network string, address string, timeout time.Duration) (net.Conn, error) }
Multiplexer can be any muxer that is able to bind to some address and dial some address. Chanserv assumes this would be the AstraNet multiplexer that can handle millions of streams.
type RequestTag ¶
type RequestTag int
RequestTag allows to specify additional options of a client's request.
const ( TagMeta RequestTag = iota // TagBucket specifies the bucket hash for the hash-based balancing algorithm. // Use this if your multiplexer can dial hosts with taking a hash into account. TagBucket )
type Server ¶
type Server interface { // ListenAndServe starts to listen incomming connections on vAddr, // and emits frame sources using the provided SourceFunc. ListenAndServe(vAddr string, src SourceFunc) error }
func NewServer ¶
func NewServer(mpx Multiplexer, opts ...ServerOption) Server
NewServer initializes a new server using the provided multiplexer for the network capabilities. Refer to the server options if you want to specify timeouts and error callbacks.
type ServerOption ¶
type ServerOption func(s *server)
ServerOption applies a configuration option to the server.
func ServerChanAcceptTimeout ¶
func ServerChanAcceptTimeout(d time.Duration) ServerOption
ServerChanAcceptTimeout specifies the timeout of frame channels waiting for being discovered by the client. Default value: 30s.
func ServerFrameWriteTimeout ¶
func ServerFrameWriteTimeout(d time.Duration) ServerOption
ServerFrameWriteTimeout specifies the timeout for writing the frame bytes to the descendant chan connection.
func ServerMasterReadTimeout ¶
func ServerMasterReadTimeout(d time.Duration) ServerOption
ServerMasterReadTimeout specifies the timeout for reading the client's request body after the master connection has been accepted.
func ServerMasterWriteTimeout ¶
func ServerMasterWriteTimeout(d time.Duration) ServerOption
ServerMasterWriteTimeout specifies the timeout for writing the source announcement parts to the master connection (two parts).
func ServerMaxErrorMass ¶
func ServerMaxErrorMass(mass int) ServerOption
ServerMaxErrorMass specifies amount of sequential serving errors before considering doing some action, usually to sleep 30 seconds before a new retry.
func ServerOnChanError ¶
func ServerOnChanError(fn func(err error)) ServerOption
ServerOnChanError specifies a function to call upon frame chan serving error.
func ServerOnError ¶
func ServerOnError(fn func(err error)) ServerOption
ServerOnError specifies a function to call upon master chan serving error.
func ServerOnMaxErrorMass ¶
func ServerOnMaxErrorMass(fn func(mass int, err error)) ServerOption
ServerOnMaxErrorMass specifies the action to perform when error mass is critical enough. By default it sleeps 30 seconds before a new retry.
func ServerServingTimeout ¶
func ServerServingTimeout(d time.Duration) ServerOption
ServerServingTimeout specifies the overall serving timeout. This timeout sets a deadline for the master channel and all the descendant frame channels.
func ServerSourcingTimeout ¶
func ServerSourcingTimeout(d time.Duration) ServerOption
ServerSourcingTimeout specifies the timeout of source announcements in the master channel. The sourcing func must publish all the sources and close the sourcing channel before this timeout expires.
func ServerUseCompression ¶
func ServerUseCompression(v bool) ServerOption
ServerCompression specifies whether the server must use compression for the output streams.
type Source ¶
type Source interface { // Header gets the application data associated with this source. The source implementation // is not required to return any header bytes. Header() []byte // Meta returns MetaData that was created by chanserv on the client side. Meta() MetaData // Out is a read-only channel of frames, generated by some source. // On the server side the channel must be closed after sending all the available frames, // on the client side it will be closed by chanserv upon a network/timeout error or success on the remote side. Out() <-chan Frame }
Source represents an announce of the new frame source.
type SourceFunc ¶
SourceFunc emits frame sources based on the request data provided. On the server side the channel must be closed after sending all the source announcements, on the client side it will be closed by chanserv upon a network/timeout error or success on the remote side.