Documentation
¶
Index ¶
- Variables
- func CanonicalHostPort(hostport string, defaultPort int) (string, error)
- type LogFunc
- type Msg
- type MsgRing
- type MsgUnmarshaller
- type TCPMsgRing
- func (t *TCPMsgRing) Listen()
- func (t *TCPMsgRing) MaxMsgLength() uint64
- func (t *TCPMsgRing) MsgHandler(msgType uint64) MsgUnmarshaller
- func (t *TCPMsgRing) MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)
- func (t *TCPMsgRing) MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)
- func (t *TCPMsgRing) Ring() ring.Ring
- func (t *TCPMsgRing) SetChaosAddrDisconnect(addr string, disconnect bool)
- func (t *TCPMsgRing) SetChaosAddrOff(addr string, off bool)
- func (t *TCPMsgRing) SetMsgHandler(msgType uint64, handler MsgUnmarshaller)
- func (t *TCPMsgRing) SetRing(ring ring.Ring)
- func (t *TCPMsgRing) Shutdown()
- func (t *TCPMsgRing) Stats(debug bool) *TCPMsgRingStats
- type TCPMsgRingConfig
- type TCPMsgRingStats
Constants ¶
This section is empty.
Variables ¶
var TCP_MSG_RING_VERSION = []byte("TCPMSGRINGv00001")
Functions ¶
Types ¶
type Msg ¶
type Msg interface { // MsgType is the unique designator for the type of message content (such // as a pull replication request, a read request, etc.). Message types just // need to be unique values; usually picking 64 bits of a UUID is fine. MsgType() uint64 // MsgLength returns the number of bytes for the content of the message // (the amount that will be written with a call to WriteContent). MsgLength() uint64 // WriteContent will send the contents of the message to the given writer. // // Note that WriteContent may be called multiple times and may be called // concurrently. // // Also note that the content should be written as quickly as possible as // any delays may cause the message transmission to be aborted and dropped. // In other words, any significant processing to build the message content // should be done before the Msg is given to the MsgRing for delivery. WriteContent(io.Writer) (uint64, error) // Free will be called when the MsgRing no longer has any references to the // message and allows the message to free any resources it may have, or be // reused, etc. The call will be given the number of times the message was // successfully sent and/or failed. Free(successes int, failures int) }
Msg is a single message to be sent to another node or nodes.
type MsgRing ¶
type MsgRing interface { // Ring returns the ring information used to determine messaging endpoints; // note that this method may return nil if no ring information is yet // available. Ring() ring.Ring // MaxMsgLength indicates the maximum number of bytes the content of a // message may contain to be handled by this MsgRing. MaxMsgLength() uint64 // SetMsgHandler associates a message type with a handler; any incoming // messages with the type will be delivered to the handler. Message types // just need to be unique uint64 values; usually picking 64 bits of a UUID // is fine. SetMsgHandler(msgType uint64, handler MsgUnmarshaller) // MsgToNode queues the message for delivery to the indicated node; the // timeout should be considered for queueing, not for actual delivery. // // When the msg has actually been sent or has been discarded due to // delivery errors or delays, msg.Free will be called. MsgToNode(msg Msg, nodeID uint64, timeout time.Duration) // MsgToNode queues the message for delivery to all other replicas of a // partition; the timeout should be considered for queueing, not for actual // delivery. // // If the ring is not bound to a specific node (LocalNode() returns nil) // then the delivery attempts will be to all replicas. // // When the msg has actually been sent or has been discarded due to // delivery errors or delays, msg.Free will be called. MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration) }
MsgRing will send and receive Msg instances to and from ring nodes. See TCPMsgRing for a concrete implementation.
The design is such that messages are not guaranteed delivery, or even transmission. Acknowledgement and retry logic is left outside to the producers and consumers of the messages themselves.
For example, if a connection to a node is too slow to keep up with the messages wanting to be delivered to it, many of the messages will simply be dropped.
For another example, if a message is ready to be delivered to a node that currently has no established connection, the connection process will be initiated (for future messages) but that current message will be dropped.
This design is because much ring-related messaging is based on passes over the entire ring with many messages being sent constantly and conditions can change rapidly, making messages less useful as they age. It is better for the distributed system overall if a single message is just dropped if it can't be sent immediately as a similar message will be generated and attempted later on in the next pass.
For messages that require guaranteed delivery, the sender's node ID can be embedded in the message and receiver would send an acknowledgement message back upon receipt. The exact retry logic could vary greatly and depends on the sender's implementation (e.g. background passes recording acks or dedicated goroutines for each message).
In previous systems we've written, distributed algorithms could easily get hung up trying to communicate to one faulty node (for example) making the rest of the system suffer as well.
type MsgUnmarshaller ¶
type MsgUnmarshaller func(reader io.Reader, desiredBytesToRead uint64) (actualBytesRead uint64, err error)
MsgUnmarshaller will attempt to read desiredBytesToRead from the reader and will return the number of bytes actually read as well as any error that may have occurred. If error is nil then actualBytesRead must equal desiredBytesToRead.
Note that the message content should be read as quickly as possible as any delays may cause the message transmission to be aborted. In other words, any significant processing of the message should be done after the contents are read and this reader function returns.
type TCPMsgRing ¶
type TCPMsgRing struct {
// contains filtered or unexported fields
}
func NewTCPMsgRing ¶
func NewTCPMsgRing(c *TCPMsgRingConfig) (*TCPMsgRing, error)
NewTCPMsgRing creates a new MsgRing that will use TCP to send and receive Msg instances.
func (*TCPMsgRing) Listen ¶
func (t *TCPMsgRing) Listen()
Listen on the configured TCP port, accepting new connections and processing messages from those connections; this function will not return until t.Shutdown() is called.
func (*TCPMsgRing) MaxMsgLength ¶
func (t *TCPMsgRing) MaxMsgLength() uint64
MaxMsgLength indicates the maximum number of bytes the content of a message may contain to be handled by this TCPMsgRing.
func (*TCPMsgRing) MsgHandler ¶
func (t *TCPMsgRing) MsgHandler(msgType uint64) MsgUnmarshaller
MsgHandler returns the handler for the given message type, if there is any set.
func (*TCPMsgRing) MsgToNode ¶
func (t *TCPMsgRing) MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)
MsgToNode queues the message for delivery to the indicated node; the timeout should be considered for queueing, not for actual delivery.
When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.
func (*TCPMsgRing) MsgToOtherReplicas ¶
func (t *TCPMsgRing) MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)
MsgToNode queues the message for delivery to all other replicas of a partition; the timeout should be considered for queueing, not for actual delivery.
If the ring is not bound to a specific node (LocalNode() returns nil) then the delivery attempts will be to all replicas.
When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.
func (*TCPMsgRing) Ring ¶
func (t *TCPMsgRing) Ring() ring.Ring
Ring returns the ring information used to determine messaging endpoints; note that this method may return nil if no ring information is yet available.
func (*TCPMsgRing) SetChaosAddrDisconnect ¶
func (t *TCPMsgRing) SetChaosAddrDisconnect(addr string, disconnect bool)
SetChaosAddrDisconnect will allow connections to and from addr but, after 10-70 seconds, it will abruptly close a connection.
func (*TCPMsgRing) SetChaosAddrOff ¶
func (t *TCPMsgRing) SetChaosAddrOff(addr string, off bool)
SetChaosAddrOff will disable all outgoing connections to addr and immediately close any incoming connections from addr.
func (*TCPMsgRing) SetMsgHandler ¶
func (t *TCPMsgRing) SetMsgHandler(msgType uint64, handler MsgUnmarshaller)
SetMsgHandler associates a message type with a handler; any incoming messages with the type will be delivered to the handler. Message types just need to be unique uint64 values; usually picking 64 bits of a UUID is fine.
func (*TCPMsgRing) SetRing ¶
func (t *TCPMsgRing) SetRing(ring ring.Ring)
SetRing sets the ring whose information used to determine messaging endpoints.
func (*TCPMsgRing) Shutdown ¶
func (t *TCPMsgRing) Shutdown()
Shutdown will signal the shutdown of all connections, listeners, etc. related to the TCPMsgRing; once Shutdown you must create a new TCPMsgRing to restart operations.
func (*TCPMsgRing) Stats ¶
func (t *TCPMsgRing) Stats(debug bool) *TCPMsgRingStats
Stats returns the current stat counters and resets those counters. In other words, if Stats().Dials gives the value 10 and no more dials occur before Stats() is called again, that second Stats().Dials will have the value 0.
If debug=true, additional information (left undocumented because it is greatly subject to change) may be given when calling TCPMsgRingStats.String().
type TCPMsgRingConfig ¶
type TCPMsgRingConfig struct { // LogCritical sets the func to use for critical messages; these are // messages about issues that render the TCPMsgRing ring inoperative. // Defaults logging to os.Stderr. LogCritical LogFunc // LogDebug sets the func to use for debug messages. Defaults not logging // debug messages. LogDebug LogFunc // AddressIndex set the index to use with Node.Address(index) to lookup a // Node's TCP address. AddressIndex int // BufferedMessagesPerAddress indicates how many outgoing Msg instances can // be buffered before dropping additional ones. Defaults to 8. BufferedMessagesPerAddress int // ConnectTimeout indicates how many seconds before giving up on a TCP // connection establishment. Defaults to 60 seconds. ConnectTimeout int // ReconnectInterval indicates how many seconds to wait between connection // tries. Defaults to 10 seconds. ReconnectInterval int // ChunkSize indicates how many bytes to attempt to read at once with each // network read. Defaults to 16,384 bytes. ChunkSize int // WithinMessageTimeout indicates how many seconds before giving up on // reading data within a message. Defaults to 5 seconds. WithinMessageTimeout int // Default port to listen on. DefaultPort int // UseTLS enables use of TLS for server and client comms UseTLS bool MutualTLS bool SkipVerify bool CustomCertPool bool CertFile string KeyFile string CAFile string }
TCPMsgRingConfig represents the set of values for configuring a TCPMsgRing. Note that changing the values (shallow changes) in this structure will have no effect on existing TCPMsgRings; but deep changes (such as reconfiguring an existing Logger) will.
type TCPMsgRingStats ¶
type TCPMsgRingStats struct { Shutdown bool RingChanges int32 RingChangeCloses int32 MsgToNodes int32 MsgToNodeNoRings int32 MsgToNodeNoNodes int32 MsgToOtherReplicas int32 MsgToOtherReplicasNoRings int32 ListenErrors int32 IncomingConnections int32 Dials int32 DialErrors int32 OutgoingConnections int32 MsgChanCreations int32 MsgToAddrs int32 MsgToAddrQueues int32 MsgToAddrTimeoutDrops int32 MsgToAddrShutdownDrops int32 MsgReads int32 MsgReadErrors int32 MsgWrites int32 MsgWriteErrors int32 }
func (*TCPMsgRingStats) String ¶
func (s *TCPMsgRingStats) String() string