Documentation ¶
Overview ¶
Package buffstreams provides a simple interface for creating and storing sockets that have a pre-defined set of behaviors, making them simple to use for consuming streaming protocol buffer messages over TCP
Index ¶
Constants ¶
const DefaultMaxMessageSize int = 4096
DefaultMaxMessageSize is the value that is used if a ManagerConfig indicates a MaxMessageSize of 0
const Version string = "2.0.0"
Version is the official semver for the library
Variables ¶
var ( // ErrZeroBytesReadHeader is thrown when the value parsed from the header is not valid ErrZeroBytesReadHeader = errors.New("0 Bytes parsed from header. Connection Closed") // ErrLessThanZeroBytesReadHeader is thrown when the value parsed from the header caused some kind of underrun ErrLessThanZeroBytesReadHeader = errors.New("Less than zero bytes parsed from header. Connection Closed") )
var ErrAlreadyOpened = errors.New("A connection to this ip / port is already open.")
ErrAlreadyOpened represents the error where a caller has tried to open the same ip / port address more than once.
var ErrNotOpened = errors.New("A connection to this ip / port must be opened first.")
ErrNotOpened represents the error where a caller has tried to use a socket to an address that they have not opened yet.
Functions ¶
func FormatAddress ¶
FormatAddress is to cover the event that you want/need a programmtically correct way to format an address/port to use with StartListening or WriteTo
Types ¶
type ListenCallback ¶
ListenCallback is a function type that calling code will need to implement in order to receive arrays of bytes from the socket. Each slice of bytes will be stripped of the size header, meaning you can directly serialize the raw slice. You would then perform your custom logic for interpretting the message, before returning. You can optionally return an error, which in turn will be logged if EnableLogging is set to true.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager represents the object used to govern interactions between tcp endpoints. You can use it to read from and write to streaming or non-streaming TCP connections and have it handle packaging data with a header describing the size of the data payload. This is to make it easy to work with wire formats like ProtocolBuffers, which require a custom-delimeter situation to be sent in a streaming fashion.
func NewManager ¶
func NewManager() *Manager
NewManager creates a new *Manager based on the provided ManagerConfig
func (*Manager) CloseListener ¶
CloseListener lets you send a signal to a TCPListener that tells it to stop accepting new requests. It will finish any requests in flight.
func (*Manager) CloseWriter ¶
CloseWriter lets you send a signal to a TCPWriter that tells it to stop accepting new requests. It will finish any requests in flight.
func (*Manager) Dial ¶
func (bm *Manager) Dial(cfg *TCPConnConfig) error
Dial must be called before attempting to write. This is because the TCPWriter need certain configuration information, which should be provided upfront. Once the connection is open, there should be no need to check on it's status. WriteTo will attempt to re-use or rebuild the connection using the existing connection if any errors occur on a write. It will return the number of bytes written. While the TCPWriter makes every attempt to continue to send bytes until they are all written, you should always check to make sure this number matches the bytes you attempted to write, due to very exceptional cases.
func (*Manager) StartListening ¶
func (bm *Manager) StartListening(cfg TCPListenerConfig) error
StartListening is an asyncrhonous, non-blocking method. It begins listening on the given port, and fire off a goroutine for every client connection it receives. That goroutine will read the fixed header, then the message payload, and then invoke the povided ListenCallbacl. In the event of an transport error, it will disconnect the client. It is the clients responsibility to re-connect if needed.
func (*Manager) Write ¶
Write allows you to dial to a remote or local TCP endpoint, and send a series of bytes as messages. Each array of bytes you pass in will be pre-pended with it's size within the size of the pre-defined maximum message size. If the connection isn't open yet, WriteTo will open it, and cache it. If for anyreason the connection breaks, it will be disposed a. If not all bytes can be written, WriteTo will keep trying until the full message is delivered, or the connection is broken.
type TCPConn ¶
type TCPConn struct {
// contains filtered or unexported fields
}
TCPConn is an abstraction over the normal net.TCPConn, but optimized for wtiting data encoded in a length+data format, like you would treat networked protocol buffer messages
func DialTCP ¶
func DialTCP(cfg *TCPConnConfig) (*TCPConn, error)
DialTCP creates a TCPWriter, and dials a connection to the remote endpoint. It does not begin writing anything until you begin to do so.
func (*TCPConn) Close ¶
Close will immediately call close on the connection to the remote endpoint. Per the golang source code for the netFD object, this call uses a special mutex to control access to the underlying pool of readers/writers. This call should be threadsafe, so that any other threads writing will finish, or be blocked, when this is invoked.
func (*TCPConn) Reopen ¶
Reopen allows you to close and re-establish a connection to the existing Address without needing to create a whole new TCPWriter object.
func (*TCPConn) Write ¶
Write allows you to send a stream of bytes as messages. Each array of bytes you pass in will be pre-pended with it's size. If the connection isn't open you will receive an error. If not all bytes can be written, Write will keep trying until the full message is delivered, or the connection is broken.
type TCPConnConfig ¶
type TCPConnConfig struct { // Controls how large the largest Message may be. The server will reject any messages whose clients // header size does not match this configuration. MaxMessageSize int // Address is the address to connect to for writing streaming messages. Address string }
TCPConnConfig representss the information needed to begin listening for writing messages.
type TCPListener ¶
type TCPListener struct {
// contains filtered or unexported fields
}
TCPListener represents the abstraction over a raw TCP socket for reading streaming protocolbuffer data without having to write a ton of boilerplate
func ListenTCP ¶
func ListenTCP(cfg TCPListenerConfig) (*TCPListener, error)
ListenTCP creates a TCPListener, and opens it's local connection to allow it to begin receiving, once you're ready to. So the connection is open, but it is not yet attempting to handle connections.
func (*TCPListener) Close ¶
func (t *TCPListener) Close()
Close represents a way to signal to the Listener that it should no longer accept incoming connections, and begin to shutdown.
func (*TCPListener) StartListening ¶
func (t *TCPListener) StartListening() error
StartListening represents a way to start accepting TCP connections, which are handled by the Callback provided upon initialization. This method will block the current executing thread / go-routine.
func (*TCPListener) StartListeningAsync ¶
func (t *TCPListener) StartListeningAsync() error
StartListeningAsync represents a way to start accepting TCP connections, which are handled by the Callback provided upon initialization. It does the listening in a go-routine, so as not to block.
type TCPListenerConfig ¶
type TCPListenerConfig struct { // Controls how large the largest Message may be. The server will reject any messages whose clients // header size does not match this configuration MaxMessageSize int // Controls the ability to enable logging errors occuring in the library EnableLogging bool // The local address to listen for incoming connections on. Typically, you exclude // the ip, and just provide port, ie: ":5031" Address string // The callback to invoke once a full set of message bytes has been received. It // is your responsibility to handle parsing the incoming message and handling errors // inside the callback Callback ListenCallback }
TCPListenerConfig representss the information needed to begin listening for incoming messages.