buffstreams

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2015 License: Apache-2.0 Imports: 7 Imported by: 27

README

BuffStreams GoDoc Build Status

Streaming Protocol Buffers messages over TCP in Golang

What is BuffStreams?

BuffStreams is a set of abstraction over TCPConns for streaming connections that write data in a format involving the length of the message + the message payload itself (like Protocol Buffers, hence the name).

BuffStreams gives you a simple interface to start a (blocking or non) listener on a given port, which will stream arrays of raw bytes into a callback you provide it. In this way, BuffStreams is not so much a daemon, but a library to build networked services that can communicate over TCP using Protocol Buffer messages.

Why BuffStreams

I was writing a few different projects for fun in Golang, and kept writing code something like what is in the library, but less organized. I decided to focus on the networking code, pulling it out and improving it so I knew it could be trusted to perform reliably across projects.

Ethos

There is nothing special or magical about Buffstreams, or the code in here. The idea isn't that it's a better, faster socket abstraction - it's to do as much of the boilerplate for you when handling streaming data like protobuff messages, with as little impact to performance as possible. Currently, Buffstreams is able to do over 1.1 million messsages per second, at 110 bytes per message on a single listening socket which saturates a 1gig nic.

The idea of Buffstreams is to do the boring parts and handle common errors, enabling you to write systems on top of it, while performing with as little overhead as possible.

How does it work?

Since protobuff messages lack any kind of natural delimeter, BuffStreams uses the method of adding a fixed header of bytes (which is configurable) that describes the size of the actual payload. This is handled for you, by the call to write. You never need to pack on the size yourself.

On the server side, it will listen for these payloads, read the fixed header, and then the subsequent message. The server must have the same maximum size as the client for this to work. BuffStreams will then pass the byte array to a callback you provided for handling messages received on that port. Deserializing the messages and interpreting their value is up to you.

One important note is that internally, BuffStreams does not actually use or rely on the Protocol Buffers library itself in any way. All serialization / deserialization is handled by the client prior to / after interactions with BuffStreams. In this way, you could theoretically use this library to stream any data over TCP that uses the same strategy of a fixed header of bytes + a subsequent message body.

Currently, I have only used it for ProtocolBuffers messages.

Logging

You can optionally enable logging of errors, although this naturally comes with a performance penalty under extreme load.

Benchmarks

I've tried very hard to optimize BuffStreams as best as possible, striving to keep it's averages above 1M messages per second, with no errors during transit.

See Bench

How do I use it?

Download the library

go get "github.com/StabbyCutyou/buffstreams"

Import the library

import "github.com/StabbyCutyou/buffstreams"

Listening for connections

One of the core objects in Buffstreams is the TCPListener. This struct allows you to open a socket on a local port, and begin waiting for clients to connect. Once a connection is made, each full message written by the client will be received by the Listener, and a callback you define will be invoked with the message contents (an array of bytes).

To begin listening, first create a TCPListenerConfig object to define how the listener should behave. A sample TCPListenerConfig might look like this:

cfg := TCPListenerConfig {
  EnableLogging: false, // true will have log messages printed to stdout/stderr, via log
  MaxMessageSize: 4098,
  Callback: func(byte[])error{return nil} // Any function type that adheres to this signature, you'll need to deserialize in here if need be
  Address: FormatAddress("", strconv.Itoa(5031)) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl, err := buffstreams.ListenTCP(cfg)

Once you've opened a listener this way, the socket is now in use, but the listener itself has not yet begun to accept connections.

To do so, you have two choices. By default, this operation will block the current thread. If you want to avoid that, and use a fire and forget approach, you can call

err := btl.StartListeningAsync()

If there is an error while starting up, it will be returned by this method. Alternatively, if you want to handle running the call yourself, or don't care that it blocks, you can call

err := btl.StartListening()

The ListenCallback

The way Buffstreams handles acting over the incoming messages is to let you provide a callback to operate on the bytes. ListenCallback takes in an array/slice of bytes, and returns an error.

type ListenCallback func([]byte) error

The callback will receive the raw bytes for a given protobuff message. The header containing the size will have been removed. It is the callbacks responsibility to deserialize and act upon the message.

The Listener gets the message, your callback does the work.

A sample callback might start like so:

  func ListenCallbackExample ([]byte data) error {
    msg := &message.ImportantProtoBuffStreamingMessage{}
    err := proto.Unmarshal(data, msg)
    // Now you do some stuff with msg
    ...
  }

The callback is currently run in it's own goroutine, which also handles reading from the connection until the reader disconnects, or there is an error. Any errors reading from a connection incoming will be up to the client to handle.

Writing messages

To begin writing messages, you'll need to dial a TCPWriter using TCPWriterConfig

cfg := TCPWriterConfig {
  EnableLogging: false, // true will have log messages printed to stdout/stderr, via log
  MaxMessageSize: 4098, // You want this to match the MaxMessageSize the server expects for messages on that socket
  Address: FormatAddress("127.0.0.1", strconv.Itoa(5031)) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}

Once you have a configuration object, you can Dial out.

btw, err := buffstreams.DialTCP(cfg)

This will open a connection to the endpoint at the specified location. From there, you can write your data

  bytesWritten, err := btw.Write(msgBytes, true)

If there is an error in writing, that connection will be closed and be reopened on the next write. There is no guarantee if any the bytesWritten value will be >0 or not in the event of an error which results in a reconnect.

Manager

There is a third option, the provided Manager class. This class will give you a simple but effective Manager abstraction over dialing and listening over ports, managing the connections for you. You provide the normal configuration for dialing out or listening for incoming connections, and let the manager hold onto the references. The Manager is considered threadsafe, as it internally uses locks to ensure consistency and coordination between concurrent access to the connections being held.

The Manager is not really a "Pool", in that it doesn't open and hold X connections for you to re-use. However, it maintains many of the same behaviors as a pool, including caching and re-using connections, and as mentioned is threadsafe.

Creating a Manager

bm := buffstreams.NewManager()

Listening on a port. Manager always makes this asyncrhonous and non blocking

// Assuming you've got a configuration object cfg, see above
err := bm.StartListening(cfg)

Dialing out to a remote endpoint

// Assuming you've got a configuration object cfg, see above
err := bm.Dial(cfg)

Having opened a connection, writing to that connection in a constant fashion

bytesWritten, err := bm.Write("127.0.0.1:5031", dataBytes)

The Manager will keep listening and dialed out connections cached internally. Once you open one, it'll be kept open. The writer will match your incoming write destination, such that any time you write to that same address, the correct writer will be re-used. The listening connection will simply remain open, waiting to receive requests.

You can forcibly close these connections, by calling either

err := bm.CloseListener("127.0.0.1:5031")

or

err := bm.CloseWriter("127.0.0.1:5031")

Roadmap

  • Release proper set of benchmarks, including more real-world cases
  • Configurable retry for the client, configurable errored-message queue for user to define failover process to handle.
  • Optional channel based streaming approach instead of callbacks
  • Further library optimizations via tools such as pprof
  • gb maybe?

LICENSE

Apache v2 - See LICENSE

Documentation

Overview

Package buffstreams provides a simple interface for creating and storing sockets that have a pre-defined set of behaviors, making them simple to use for consuming streaming protocol buffer messages over TCP

Index

Constants

View Source
const DefaultMaxMessageSize int = 4096

DefaultMaxMessageSize is the value that is used if a ManagerConfig indicates a MaxMessageSize of 0

View Source
const Version string = "1.0.0"

Version is the official semver for the library

Variables

View Source
var ErrAlreadyOpened = errors.New("A connection to this ip / port is already open.")

ErrAlreadyOpened represents the error where a caller has tried to open the same ip / port address more than once.

View Source
var ErrNotOpened = errors.New("A connection to this ip / port must be opened first.")

ErrNotOpened represents the error where a caller has tried to use a socket to an address that they have not opened yet.

Functions

func FormatAddress

func FormatAddress(address string, port string) string

FormatAddress is to cover the event that you want/need a programmtically correct way to format an address/port to use with StartListening or WriteTo

Types

type ListenCallback

type ListenCallback func([]byte) error

ListenCallback is a function type that calling code will need to implement in order to receive arrays of bytes from the socket. Each slice of bytes will be stripped of the size header, meaning you can directly serialize the raw slice. You would then perform your custom logic for interpretting the message, before returning. You can optionally return an error, which in turn will be logged if EnableLogging is set to true.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager represents the object used to govern interactions between tcp endpoints. You can use it to read from and write to streaming or non-streaming TCP connections and have it handle packaging data with a header describing the size of the data payload. This is to make it easy to work with wire formats like ProtocolBuffers, which require a custom-delimeter situation to be sent in a streaming fashion.

func NewManager

func NewManager() *Manager

NewManager creates a new *Manager based on the provided ManagerConfig

func (*Manager) CloseListener

func (bm *Manager) CloseListener(address string) error

CloseListener lets you send a signal to a TCPListener that tells it to stop accepting new requests. It will finish any requests in flight.

func (*Manager) CloseWriter

func (bm *Manager) CloseWriter(address string) error

CloseWriter lets you send a signal to a TCPWriter that tells it to stop accepting new requests. It will finish any requests in flight.

func (*Manager) Dial

func (bm *Manager) Dial(cfg TCPWriterConfig) error

Dial must be called before attempting to write. This is because the TCPWriter need certain configuration information, which should be provided upfront. Once the connection is open, there should be no need to check on it's status. WriteTo will attempt to re-use or rebuild the connection using the existing connection if any errors occur on a write. It will return the number of bytes written. While the TCPWriter makes every attempt to continue to send bytes until they are all written, you should always check to make sure this number matches the bytes you attempted to write, due to very exceptional cases.

func (*Manager) StartListening

func (bm *Manager) StartListening(cfg TCPListenerConfig) error

StartListening is an asyncrhonous, non-blocking method. It begins listening on the given port, and fire off a goroutine for every client connection it receives. That goroutine will read the fixed header, then the message payload, and then invoke the povided ListenCallbacl. In the event of an transport error, it will disconnect the client. It is the clients responsibility to re-connect if needed.

func (*Manager) Write

func (bm *Manager) Write(address string, data []byte) (int, error)

Write allows you to dial to a remote or local TCP endpoint, and send a series of bytes as messages. Each array of bytes you pass in will be pre-pended with it's size within the size of the pre-defined maximum message size. If the connection isn't open yet, WriteTo will open it, and cache it. If for anyreason the connection breaks, it will be disposed a. If not all bytes can be written, WriteTo will keep trying until the full message is delivered, or the connection is broken.

type TCPListener

type TCPListener struct {
	// contains filtered or unexported fields
}

TCPListener represents the abstraction over a raw TCP socket for reading streaming protocolbuffer data without having to write a ton of boilerplate

func ListenTCP

func ListenTCP(cfg TCPListenerConfig) (*TCPListener, error)

ListenTCP creates a TCPListener, and opens it's local connection to allow it to begin receiving, once you're ready to. So the connection is open, but it is not yet attempting to handle connections.

func (*TCPListener) Close

func (btl *TCPListener) Close()

Close represents a way to signal to the Listener that it should no longer accept incoming connections, and begin to shutdown.

func (*TCPListener) StartListening

func (btl *TCPListener) StartListening() error

StartListening represents a way to start accepting TCP connections, which are handled by the Callback provided upon initialization. This method will block the current executing thread / go-routine.

func (*TCPListener) StartListeningAsync

func (btl *TCPListener) StartListeningAsync() error

StartListeningAsync represents a way to start accepting TCP connections, which are handled by the Callback provided upon initialization. It does the listening in a go-routine, so as not to block.

type TCPListenerConfig

type TCPListenerConfig struct {
	// Controls how large the largest Message may be. The server will reject any messages whose clients
	// header size does not match this configuration
	MaxMessageSize int
	// Controls the ability to enable logging errors occuring in the library
	EnableLogging bool
	// The local address to listen for incoming connections on. Typically, you exclude
	// the ip, and just provide port, ie: ":5031"
	Address string
	// The callback to invoke once a full set of message bytes has been received. It
	// is your responsibility to handle parsing the incoming message and handling errors
	// inside the callback
	Callback ListenCallback
}

TCPListenerConfig representss the information needed to begin listening for incoming messages.

type TCPWriter

type TCPWriter struct {
	// contains filtered or unexported fields
}

TCPWriter represents the abstraction over a raw TCP socket for writing streaming protocolbuffer data without having to write a ton of boilerplate

func DialTCP

func DialTCP(cfg TCPWriterConfig) (*TCPWriter, error)

DialTCP creates a TCPWriter, and dials a connection to the remote endpoint. It does not begin writing anything until you begin to do so

func (*TCPWriter) Close

func (btw *TCPWriter) Close() error

Close will immediately call close on the connection to the remote endpoint. You should not call this if other threads may be using the underlying socktet, unless you control it in a mutex of some kind.

func (*TCPWriter) Reopen

func (btw *TCPWriter) Reopen() error

Reopen allows you to close and re-establish a connection to the existing Address without needing to create a whole new TCPWriter object

func (*TCPWriter) Write

func (btw *TCPWriter) Write(data []byte) (int, error)

Write allows you to send a stream of bytes as messages. Each array of bytes you pass in will be pre-pended with it's size. If the connection isn't open you will receive an error. If not all bytes can be written, Write will keep trying until the full message is delivered, or the connection is broken.

type TCPWriterConfig

type TCPWriterConfig struct {
	// Controls how large the largest Message may be. The server will reject any messages whose clients
	// header size does not match this configuration
	MaxMessageSize int
	// Controls the ability to enable logging errors occuring in the library
	EnableLogging bool
	// Address is the address to connect to for writing streaming messages
	Address string
}

TCPWriterConfig representss the information needed to begin listening for writing messages.

Directories

Path Synopsis
test
message
Package message is a generated protocol buffer package.
Package message is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL