msgring

package module
v0.0.0-...-67a8577 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2018 License: BSD-3-Clause Imports: 19 Imported by: 2

README

MsgRing

Development Repository

Experimental: No stable version of this package yet exists; it is still in early development.

Tools for using a ring as a messaging hub, easing communication between nodes in the ring.

See github.com/gholt/ring for more information on the ring code itself.

API Documentation

This is the latest development area for the package.
Eventually a stable version of the package will be established but, for now, all things about this package are subject to change.

Copyright See AUTHORS. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var TCP_MSG_RING_VERSION = []byte("TCPMSGRINGv00001")

Functions

func CanonicalHostPort

func CanonicalHostPort(hostport string, defaultPort int) (string, error)

Types

type LogFunc

type LogFunc func(format string, v ...interface{})

type Msg

type Msg interface {
	// MsgType is the unique designator for the type of message content (such
	// as a pull replication request, a read request, etc.). Message types just
	// need to be unique values; usually picking 64 bits of a UUID is fine.
	MsgType() uint64
	// MsgLength returns the number of bytes for the content of the message
	// (the amount that will be written with a call to WriteContent).
	MsgLength() uint64
	// WriteContent will send the contents of the message to the given writer.
	//
	// Note that WriteContent may be called multiple times and may be called
	// concurrently.
	//
	// Also note that the content should be written as quickly as possible as
	// any delays may cause the message transmission to be aborted and dropped.
	// In other words, any significant processing to build the message content
	// should be done before the Msg is given to the MsgRing for delivery.
	WriteContent(io.Writer) (uint64, error)
	// Free will be called when the MsgRing no longer has any references to the
	// message and allows the message to free any resources it may have, or be
	// reused, etc. The call will be given the number of times the message was
	// successfully sent and/or failed.
	Free(successes int, failures int)
}

Msg is a single message to be sent to another node or nodes.

type MsgRing

type MsgRing interface {
	// Ring returns the ring information used to determine messaging endpoints;
	// note that this method may return nil if no ring information is yet
	// available.
	Ring() ring.Ring
	// MaxMsgLength indicates the maximum number of bytes the content of a
	// message may contain to be handled by this MsgRing.
	MaxMsgLength() uint64
	// SetMsgHandler associates a message type with a handler; any incoming
	// messages with the type will be delivered to the handler. Message types
	// just need to be unique uint64 values; usually picking 64 bits of a UUID
	// is fine.
	SetMsgHandler(msgType uint64, handler MsgUnmarshaller)
	// MsgToNode queues the message for delivery to the indicated node; the
	// timeout should be considered for queueing, not for actual delivery.
	//
	// When the msg has actually been sent or has been discarded due to
	// delivery errors or delays, msg.Free will be called.
	MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)
	// MsgToNode queues the message for delivery to all other replicas of a
	// partition; the timeout should be considered for queueing, not for actual
	// delivery.
	//
	// If the ring is not bound to a specific node (LocalNode() returns nil)
	// then the delivery attempts will be to all replicas.
	//
	// When the msg has actually been sent or has been discarded due to
	// delivery errors or delays, msg.Free will be called.
	MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)
}

MsgRing will send and receive Msg instances to and from ring nodes. See TCPMsgRing for a concrete implementation.

The design is such that messages are not guaranteed delivery, or even transmission. Acknowledgement and retry logic is left outside to the producers and consumers of the messages themselves.

For example, if a connection to a node is too slow to keep up with the messages wanting to be delivered to it, many of the messages will simply be dropped.

For another example, if a message is ready to be delivered to a node that currently has no established connection, the connection process will be initiated (for future messages) but that current message will be dropped.

This design is because much ring-related messaging is based on passes over the entire ring with many messages being sent constantly and conditions can change rapidly, making messages less useful as they age. It is better for the distributed system overall if a single message is just dropped if it can't be sent immediately as a similar message will be generated and attempted later on in the next pass.

For messages that require guaranteed delivery, the sender's node ID can be embedded in the message and receiver would send an acknowledgement message back upon receipt. The exact retry logic could vary greatly and depends on the sender's implementation (e.g. background passes recording acks or dedicated goroutines for each message).

In previous systems we've written, distributed algorithms could easily get hung up trying to communicate to one faulty node (for example) making the rest of the system suffer as well.

type MsgUnmarshaller

type MsgUnmarshaller func(reader io.Reader, desiredBytesToRead uint64) (actualBytesRead uint64, err error)

MsgUnmarshaller will attempt to read desiredBytesToRead from the reader and will return the number of bytes actually read as well as any error that may have occurred. If error is nil then actualBytesRead must equal desiredBytesToRead.

Note that the message content should be read as quickly as possible as any delays may cause the message transmission to be aborted. In other words, any significant processing of the message should be done after the contents are read and this reader function returns.

type TCPMsgRing

type TCPMsgRing struct {
	// contains filtered or unexported fields
}

func NewTCPMsgRing

func NewTCPMsgRing(c *TCPMsgRingConfig) (*TCPMsgRing, error)

NewTCPMsgRing creates a new MsgRing that will use TCP to send and receive Msg instances.

func (*TCPMsgRing) Listen

func (t *TCPMsgRing) Listen()

Listen on the configured TCP port, accepting new connections and processing messages from those connections; this function will not return until t.Shutdown() is called.

func (*TCPMsgRing) MaxMsgLength

func (t *TCPMsgRing) MaxMsgLength() uint64

MaxMsgLength indicates the maximum number of bytes the content of a message may contain to be handled by this TCPMsgRing.

func (*TCPMsgRing) MsgHandler

func (t *TCPMsgRing) MsgHandler(msgType uint64) MsgUnmarshaller

MsgHandler returns the handler for the given message type, if there is any set.

func (*TCPMsgRing) MsgToNode

func (t *TCPMsgRing) MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)

MsgToNode queues the message for delivery to the indicated node; the timeout should be considered for queueing, not for actual delivery.

When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.

func (*TCPMsgRing) MsgToOtherReplicas

func (t *TCPMsgRing) MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)

MsgToNode queues the message for delivery to all other replicas of a partition; the timeout should be considered for queueing, not for actual delivery.

If the ring is not bound to a specific node (LocalNode() returns nil) then the delivery attempts will be to all replicas.

When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.

func (*TCPMsgRing) Ring

func (t *TCPMsgRing) Ring() ring.Ring

Ring returns the ring information used to determine messaging endpoints; note that this method may return nil if no ring information is yet available.

func (*TCPMsgRing) SetChaosAddrDisconnect

func (t *TCPMsgRing) SetChaosAddrDisconnect(addr string, disconnect bool)

SetChaosAddrDisconnect will allow connections to and from addr but, after 10-70 seconds, it will abruptly close a connection.

func (*TCPMsgRing) SetChaosAddrOff

func (t *TCPMsgRing) SetChaosAddrOff(addr string, off bool)

SetChaosAddrOff will disable all outgoing connections to addr and immediately close any incoming connections from addr.

func (*TCPMsgRing) SetMsgHandler

func (t *TCPMsgRing) SetMsgHandler(msgType uint64, handler MsgUnmarshaller)

SetMsgHandler associates a message type with a handler; any incoming messages with the type will be delivered to the handler. Message types just need to be unique uint64 values; usually picking 64 bits of a UUID is fine.

func (*TCPMsgRing) SetRing

func (t *TCPMsgRing) SetRing(ring ring.Ring)

SetRing sets the ring whose information used to determine messaging endpoints.

func (*TCPMsgRing) Shutdown

func (t *TCPMsgRing) Shutdown()

Shutdown will signal the shutdown of all connections, listeners, etc. related to the TCPMsgRing; once Shutdown you must create a new TCPMsgRing to restart operations.

func (*TCPMsgRing) Stats

func (t *TCPMsgRing) Stats(debug bool) *TCPMsgRingStats

Stats returns the current stat counters and resets those counters. In other words, if Stats().Dials gives the value 10 and no more dials occur before Stats() is called again, that second Stats().Dials will have the value 0.

If debug=true, additional information (left undocumented because it is greatly subject to change) may be given when calling TCPMsgRingStats.String().

type TCPMsgRingConfig

type TCPMsgRingConfig struct {
	// LogCritical sets the func to use for critical messages; these are
	// messages about issues that render the TCPMsgRing ring inoperative.
	// Defaults logging to os.Stderr.
	LogCritical LogFunc
	// LogDebug sets the func to use for debug messages. Defaults not logging
	// debug messages.
	LogDebug LogFunc
	// AddressIndex set the index to use with Node.Address(index) to lookup a
	// Node's TCP address.
	AddressIndex int
	// BufferedMessagesPerAddress indicates how many outgoing Msg instances can
	// be buffered before dropping additional ones. Defaults to 8.
	BufferedMessagesPerAddress int
	// ConnectTimeout indicates how many seconds before giving up on a TCP
	// connection establishment. Defaults to 60 seconds.
	ConnectTimeout int
	// ReconnectInterval indicates how many seconds to wait between connection
	// tries. Defaults to 10 seconds.
	ReconnectInterval int
	// ChunkSize indicates how many bytes to attempt to read at once with each
	// network read. Defaults to 16,384 bytes.
	ChunkSize int
	// WithinMessageTimeout indicates how many seconds before giving up on
	// reading data within a message. Defaults to 5 seconds.
	WithinMessageTimeout int
	// Default port to listen on.
	DefaultPort int
	// UseTLS enables use of TLS for server and client comms
	UseTLS         bool
	MutualTLS      bool
	SkipVerify     bool
	CustomCertPool bool
	CertFile       string
	KeyFile        string
	CAFile         string
}

TCPMsgRingConfig represents the set of values for configuring a TCPMsgRing. Note that changing the values (shallow changes) in this structure will have no effect on existing TCPMsgRings; but deep changes (such as reconfiguring an existing Logger) will.

type TCPMsgRingStats

type TCPMsgRingStats struct {
	Shutdown                  bool
	RingChanges               int32
	RingChangeCloses          int32
	MsgToNodes                int32
	MsgToNodeNoRings          int32
	MsgToNodeNoNodes          int32
	MsgToOtherReplicas        int32
	MsgToOtherReplicasNoRings int32
	ListenErrors              int32
	IncomingConnections       int32
	Dials                     int32
	DialErrors                int32
	OutgoingConnections       int32
	MsgChanCreations          int32
	MsgToAddrs                int32
	MsgToAddrQueues           int32
	MsgToAddrTimeoutDrops     int32
	MsgToAddrShutdownDrops    int32
	MsgReads                  int32
	MsgReadErrors             int32
	MsgWrites                 int32
	MsgWriteErrors            int32
}

func (*TCPMsgRingStats) String

func (s *TCPMsgRingStats) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL