net

package
v0.0.0-...-7c6133f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2016 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package net is responsible for abstracting away all of the inter-replica communication in the system. This module doesn't handle communication between replica and client.

This package provides a Demuxer and a Sender. The Demuxer is responsible for handling incoming connections, from which it receives messages and passes them on to the interested parties. The Sender is responsible for establishing connections to other replicas, as well as the sending of messages to other replicas.

For the demuxer to know which parties are interested in receiving different types of messages, they must be first registered before the networking subsystem starts up:

prepareChan := make(chan px.PrepareMsg, 8)
a.dmx.RegisterChannel(prepareChan)

The communication from the Demuxer to an actor happens through the use of channels, as shown above. First a channel is created by the actor, in this case the channel is for receiving PrepareMsg messages. Then, the RegisterChannel method is called with the channel. This tells the Demuxer that if it receives any messages of the specified type, it will send it over this channel. Multiple channels of the same type may be registered with the Demuxer.

NB: The channels should all be registered before the networking subsystem is started up, no more channels should be registered after. This is due to the fact that the GxConnection goroutines pass messages to the Demuxer, and if channels are registered after network start-up, bad things could happen.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIDIsEqual     = errors.New("id is equal to this node")
	ErrIDOutOfBounds = errors.New("id is out of bounds for current cluster size")
)

Functions

func AddToConnections

func AddToConnections(gc *GxConnection, lrArEnabled bool) error

Add a GxConnection to the connection map.

func CheckConnections

func CheckConnections(conf []grp.ID) (notconn []grp.ID)

func IsSocketClosed

func IsSocketClosed(err error) bool

func SetHeartbeatChan

func SetHeartbeatChan(hbChan chan<- grp.ID)

func UpdateConnID

func UpdateConnID(oldID grp.ID, newEpoch grp.Epoch) bool

Types

type Connection

type Connection struct {
	Dec *gob.Decoder
	Enc *gob.Encoder
	// contains filtered or unexported fields
}

A Connection represents a base connection between two replicas in Goxos.

func ConnectToAddr

func ConnectToAddr(addr string) (*Connection, error)

Connect to another replica based on the address of the replica in the form hostname:port. Returns a Connection.

func ConnectToNode

func ConnectToNode(node grp.Node) (*Connection, error)

Connect to another replica based on the address in the configuration file.

func GxConnectEphemeral

func GxConnectEphemeral(node grp.Node, callerID grp.ID) (*Connection, error)

func NewConnection

func NewConnection(conn net.Conn) *Connection

Creates a new base connection between two replicas. The required argument is a low-level connection to another replica.

func NewMockConnection

func NewMockConnection(conn io.ReadWriteCloser) *Connection

Create a new mock Connection, used for testing purposes.

func (*Connection) Close

func (c *Connection) Close() error

Close the connection.

func (*Connection) Read

func (c *Connection) Read(msg interface{}) error

Read a message off of the connection. The message is placed in the location passed to the method. Returns nil or an error.

func (Connection) String

func (c Connection) String() string

Returns a string-based representation of the Connection.

func (*Connection) Write

func (c *Connection) Write(msg interface{}) error

Write a message to the connection. Returns nil or an error.

type Demuxer

type Demuxer interface {
	Start()
	Stop()
	RegisterChannel(ch interface{})
	HandleMessage(msg interface{})
}

type GxConnection

type GxConnection struct {
	*Connection
	// contains filtered or unexported fields
}

A GxConnection represents a connection between two replicas. A GxConnection is setup after the replica has been properly validated.

func GxConnectTo

func GxConnectTo(node grp.Node, callerID, calledID grp.ID,
	dmx Demuxer) (*GxConnection, error)

Connect to another replica, and verify the ids are correct. Returns a GxConnection.

func NewGxConnection

func NewGxConnection(conn *Connection, id grp.ID, dmx Demuxer) *GxConnection

Create a new GxConnection. The low-level connection as well as the Goxos id and Demuxer must be passed in as arguments.

func (*GxConnection) Outgoing

func (gc *GxConnection) Outgoing() chan<- interface{}

func (GxConnection) String

func (gc GxConnection) String() string

Returns a string-based representation of the GxConnection.

type IDExchange

type IDExchange struct {
	ID grp.ID
}

The IdExchange message is used for verifying a replica in the Connection phase.

type IDResponse

type IDResponse struct {
	Accepted bool
	Error    string
}

The IdResponse message is used for verifying a replica in the Connection phase.

type MockDemuxer

type MockDemuxer struct{}

func NewMockDemuxer

func NewMockDemuxer() *MockDemuxer

func (*MockDemuxer) HandleMessage

func (dmx *MockDemuxer) HandleMessage(msg interface{})

func (*MockDemuxer) RegisterChannel

func (dmx *MockDemuxer) RegisterChannel(ch interface{})

func (*MockDemuxer) RegisterFD

func (dmx *MockDemuxer) RegisterFD() <-chan interface{}

func (*MockDemuxer) Start

func (dmx *MockDemuxer) Start()

func (*MockDemuxer) Stop

func (dmx *MockDemuxer) Stop()

type Packet

type Packet struct {
	DestID grp.ID
	Data   interface{}
}

A Packet contains a message, as well as the destination id of the replica. Used to send a unicast message to a replica.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

A Sender is responsible for outward communication from one replica to all others. Goroutines in a replica communicate with the Sender through channels, which are then sent out onto the network.

func NewSender

func NewSender(id grp.ID, gm grp.GroupManager, outU <-chan Packet,
	outB, outP, outA, outL <-chan interface{},
	dmx Demuxer, stopCheckIn *sync.WaitGroup) (snd *Sender)

Create a new Sender for the given replica id. Also passed in are channels which the sender receives messages from.

func (*Sender) InitialConnect

func (snd *Sender) InitialConnect()

Start the initial connection phase, where the sender tries to connect to all other replicas in the system with a higher id than ours. We wait for connections from all other replicas with ids less than or equal to our own.

This function blocks until we establish connections to all replicas in the Goxos configuration.

func (*Sender) ResetChan

func (snd *Sender) ResetChan() <-chan bool

func (*Sender) Start

func (snd *Sender) Start()

Start receiving messages from other Goxos modules.

func (*Sender) Stop

func (snd *Sender) Stop()

Stop the Sender goroutine.

type TcpDemuxer

type TcpDemuxer struct {
	// contains filtered or unexported fields
}

The Demuxer handles all of the incoming messages to a replica -- not including the client communication.

func NewTcpDemuxer

func NewTcpDemuxer(id grp.ID, gm grp.GroupManager, stopCheckIn *sync.WaitGroup) *TcpDemuxer

NewDemuxer creates a new Demuxer for a replica. A valid id from the configuration must be passed in.

func (*TcpDemuxer) HandleMessage

func (dmx *TcpDemuxer) HandleMessage(msg interface{})

Delegate handling of specific messages to a priori registered channels

func (*TcpDemuxer) RegisterChannel

func (dmx *TcpDemuxer) RegisterChannel(ch interface{})

Register channel for receiving messages of the type defined by the channel

func (*TcpDemuxer) Start

func (dmx *TcpDemuxer) Start()

Start handling new connections from other replicas.

func (*TcpDemuxer) Stop

func (dmx *TcpDemuxer) Stop()

Shut the Demuxer down. Stops the main Demuxer goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL