leanstreams

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: Apache-2.0, Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package buffstreams provides a simple interface for creating and storing sockets that have a pre-defined set of behaviors, making them simple to use for consuming streaming protocol buffer messages over TCP

Index

Constants

View Source
const DefaultMaxMessageSize int = 4096

DefaultMaxMessageSize is the value that is used if a ManagerConfig indicates a MaxMessageSize of 0

View Source
const Version string = "2.0.0"

Version is the official semver for the library

Variables

View Source
var (
	// ErrZeroBytesReadHeader is thrown when the value parsed from the header is not valid
	ErrZeroBytesReadHeader = errors.New("0 Bytes parsed from header. Connection Closed")
	// ErrLessThanZeroBytesReadHeader is thrown when the value parsed from the header caused some kind of underrun
	ErrLessThanZeroBytesReadHeader = errors.New("Less than zero bytes parsed from header. Connection Closed")
)

Functions

func FormatAddress

func FormatAddress(address string, port string) string

FormatAddress is to cover the event that you want/need a programmtically correct way to format an address/port to use with StartListening or WriteTo

Types

type Connection added in v1.3.0

type Connection struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewConnection added in v1.3.0

func NewConnection(addr string, tlsConfig *tls.Config, maxPayloadSizeInBytes int) *Connection

func (*Connection) Client added in v1.3.0

func (c *Connection) Client() *TCPClient

func (*Connection) Close added in v1.3.0

func (c *Connection) Close() error

func (*Connection) Connect added in v1.3.0

func (c *Connection) Connect()

type ListenCallback

type ListenCallback func([]byte) error

ListenCallback is a function type that calling code will need to implement in order to receive arrays of bytes from the socket. Each slice of bytes will be stripped of the size header, meaning you can directly serialize the raw slice. You would then perform your custom logic for interpretting the message, before returning. You can optionally return an error, which in turn will be logged if EnableLogging is set to true.

type Logger added in v1.4.0

type Logger interface {
	Printf(v string, args ...interface{})
}

type MetricRegistrar added in v1.4.3

type MetricRegistrar interface {
	Set(name string, value float64, labels ...string)
}

type TCPClient

type TCPClient struct {
	MaxMessageSize int

	sync.Mutex
	// contains filtered or unexported fields
}

TCPClient is an abstraction over the normal net.TCPConn, but optimized for wtiting data encoded in a length+data format, like you would treat networked protocol buffer messages

func DialTCP

func DialTCP(cfg *TCPClientConfig) (*TCPClient, error)

DialTCP creates a TCPWriter, and dials a connection to the remote endpoint. It does not begin writing anything until you begin to do so.

func DialTCPUntilConnected

func DialTCPUntilConnected(cfg *TCPClientConfig, timeout time.Duration) (*TCPClient, error)

func (*TCPClient) Close

func (c *TCPClient) Close() error

Close will wait until any Open calls finish, then call close on the connection to the remote endpoint. Per the golang source code for the netFD object, this call uses a special mutex to control access to the underlying pool of readers/writers. This call should be threadsafe, so that any other threads writing will finish, or be blocked, when this is invoked.

func (*TCPClient) Open

func (c *TCPClient) Open() error

open will dial a connection to the remote endpoint.

func (*TCPClient) Reopen

func (c *TCPClient) Reopen() error

Reopen allows you to close and re-establish a connection to the existing Address without needing to create a whole new TCPWriter object.

func (*TCPClient) Write

func (c *TCPClient) Write(data []byte) (int, error)

type TCPClientConfig

type TCPClientConfig struct {
	// Controls how large the largest Message may be. The server will reject any messages whose clients
	// header size does not match this configuration.
	MaxMessageSize int
	// Address is the address to connect to for writing streaming messages.
	Address string

	TLSConfig *tls.Config

	Logger Logger
}

TCPClientConfig represents the information needed to begin listening for writing messages.

type TCPListener

type TCPListener struct {
	ConnConfig *TCPServerConfig

	Address string
	// contains filtered or unexported fields
}

TCPListener represents the abstraction over a raw TCP socket for reading streaming protocolbuffer data without having to write a ton of boilerplate

func ListenTCP

func ListenTCP(cfg TCPListenerConfig) (*TCPListener, error)

ListenTCP creates a TCPListener, and opens it's local connection to allow it to begin receiving, once you're ready to. So the connection is open, but it is not yet attempting to handle connections.

func (*TCPListener) Close

func (t *TCPListener) Close()

Close represents a way to signal to the Listener that it should no longer accept incoming connections, and shutdown

func (*TCPListener) OpenConnections

func (t *TCPListener) OpenConnections() int

func (*TCPListener) RestartListeningAsync

func (t *TCPListener) RestartListeningAsync() error

func (*TCPListener) StartListeningAsync

func (t *TCPListener) StartListeningAsync() error

StartListeningAsync represents a way to start accepting TCP connections, which are handled by the Callback provided upon initialization. It does the listening in a go-routine, so as not to block.

type TCPListenerConfig

type TCPListenerConfig struct {
	// Controls how large the largest Message may be. The server will reject any messages whose clients
	// header size does not match this configuration
	MaxMessageSize int
	// Controls the ability to enable logging errors occurring in the library
	Logger Logger

	// The local address to listen for incoming connections on. Typically, you exclude
	// the ip, and just provide port, ie: ":5031"
	Address string
	// The callback to invoke once a full set of message bytes has been received. It
	// is your responsibility to handle parsing the incoming message and handling errors
	// inside the callback
	Callback ListenCallback

	TLSConfig           *tls.Config
	MetricRegistrar     MetricRegistrar
	ConnCountMetricName string
}

TCPListenerConfig representss the information needed to begin listening for incoming messages.

type TCPServer

type TCPServer struct {
	MaxMessageSize int
	// contains filtered or unexported fields
}

TCPServer is an abstraction over the normal net.TCPConn, but optimized for wtiting data encoded in a length+data format, like you would treat networked protocol buffer messages

func (*TCPServer) Close

func (c *TCPServer) Close() error

Close will immediately call close on the connection to the remote endpoint. Per the golang source code for the netFD object, this call uses a special mutex to control access to the underlying pool of readers/writers. This call should be threadsafe, so that any other threads writing will finish, or be blocked, when this is invoked.

func (*TCPServer) Read

func (c *TCPServer) Read(b []byte) (int, error)

type TCPServerConfig

type TCPServerConfig struct {
	// Controls how large the largest Message may be. The server will reject any messages whose clients
	// header size does not match this configuration.
	MaxMessageSize int
	// Address is the address to connect to for writing streaming messages.
	Address string

	TLSConfig *tls.Config
}

TCPServerConfig representss the information needed to begin listening for writing messages.

Directories

Path Synopsis
test
message
Package message is a generated protocol buffer package.
Package message is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL