Documentation ¶
Overview ¶
Package cluster implements a interface for clustered services. It allows you to interace to a cluster node, and messages sent across will be received by the connected cluster nodes. You can interface freely between these N nodes. Additionaly there is also an API interface for passing commands to the cluster.
Interfacing ¶
You can interface through the Cluster Manager using channels. Messages that are not used specificly by the cluster are forwarded to the client application using this cluster library.
manager.ToCluster <- interface{} // send data to the cluster package := <-manager.FromCluster // recieve Package{} from the cluster containing the sent data (required)
This interface allows you to send data to the cluster, which will be broadcasted across the connected nodes.
state := <- manager.QuorumState // bool returning current quorum state, will update on node join/leave node := <- manager.NodeJoin // string of node joining the cluster node := <- manager.NodeLeave // string of node leaving the cluster
These channels are available to read additional cluster status updates
request := <-manager.FromClusterApi // recieve APIRequest{} send via the API interface by a client
With APIEnabled you can recieve API requests though an authenticated web interface
log := <-manager.Log // recieve Logging from the debug package
Read the Log channel to receive cluster wide logging
Example ¶
Example shows a single node able to send and recieve messages You can send any type of data to the cluster
// start cluster 1 manager := NewManager("node1", "secret") manager.AddNode("node2", "127.0.0.1:9655") err := manager.ListenAndServe("127.0.0.1:9654") if err != nil { log.Fatal(err) } // start cluster 2 manager2 := NewManager("node2", "secret") manager2.AddNode("node1", "127.0.0.1:9654") err = manager2.ListenAndServe("127.0.0.1:9655") if err != nil { log.Fatal(err) } // wait for cluster join to be complete <-manager2.NodeJoin // send message to all nodes of manager2 manager2.ToCluster <- "Hello World!" // process all channels for { select { // case logentry := <-manager2.Log: // log.Printf("manager2.log: %s\n", logentry) case p := <-manager.FromCluster: var cm string err := p.Message(&cm) if err != nil { log.Printf("Unable to get message from package: %s\n", err) } //log.Printf("we received a custom message: %s\n", cm) return } }
Output:
Index ¶
- Constants
- Variables
- type APIClusterNode
- type APIClusterNodeList
- type APIRequest
- type Manager
- func (m *Manager) AddNode(nodeName, nodeAddr string)
- func (m *Manager) ListenAndServe(addr string) (err error)
- func (m *Manager) ListenAndServeTLS(addr string, tlsConfig *tls.Config) (err error)
- func (m *Manager) Name() string
- func (m *Manager) NodeConfigured(nodeName string) bool
- func (m *Manager) NodesConfigured() map[string]bool
- func (m *Manager) RemoveNode(nodeName string)
- func (m *Manager) Shutdown()
- func (m *Manager) StateDump()
- func (m *Manager) UpdateSettings(settings Settings)
- type Node
- type NodeMessage
- type Packet
- type Settings
Examples ¶
Constants ¶
const ( // StatusOffline is a new node, starting in offline state StatusOffline = "Offline" // StatusAuthenticating is a node doing authentication StatusAuthenticating = "Authenticating" // StatusShutdown is a node stopping StatusShutdown = "Stopping" // StatusOnline is a node online StatusOnline = "Online" // StatusLeaving is a node leaving StatusLeaving = "Leaving" )
Variables ¶
var ( // APITokenSigningKey is key used to sign jtw tokens APITokenSigningKey = rndKey() // APITokenDuration is how long the jwt token is valid APITokenDuration = 1 * time.Hour // APIEnabled defines wether or not the API is enabled APIEnabled = true )
var (
// ChannelBufferSize the size of the channel buffer
ChannelBufferSize = 100
)
var ( // LogTraffic true/false to log all traffic sent to and received from nodes LogTraffic = false )
Functions ¶
This section is empty.
Types ¶
type APIClusterNode ¶
type APIClusterNode struct { Name string `json:"name"` Addr string `json:"addr"` Status string `json:"status"` Error string `json:"error"` JoinTime time.Time `json:"jointime"` Lag time.Duration `json:"lag"` Packets int64 `json:"packets"` }
APIClusterNode contains details of a node we might connect to used for the API
type APIClusterNodeList ¶
type APIClusterNodeList struct {
Nodes map[string]APIClusterNode `json:"nodes"`
}
APIClusterNodeList contains a list of configured/connected nodes used for the API
type APIRequest ¶
type APIRequest struct { Action string `json:"action"` Manager string `json:"manager"` Node string `json:"node"` Data string `json:"data"` }
APIRequest is used to pass requests done to the cluster API to the client application
type Manager ¶
type Manager struct { sync.RWMutex FromCluster chan Packet // data received from cluster FromClusterAPI chan APIRequest // data received from cluster via API interface ToCluster chan interface{} // data send to cluster ToNode chan NodeMessage // data send to specific node Log chan string // logging messages go here NodeJoin chan string // returns string of the node joining NodeLeave chan string // returns string of the node leaving QuorumState chan bool // returns the current quorum state // contains filtered or unexported fields }
Manager is the main cluster manager
func NewManager ¶
NewManager creates a new cluster manager
func (*Manager) ListenAndServe ¶
ListenAndServe starts the listener and serves connections to clients
func (*Manager) ListenAndServeTLS ¶
ListenAndServeTLS starts the TLS listener and serves connections to clients
func (*Manager) NodeConfigured ¶
NodeConfigured returns true or false if a node is configured in the manager
func (*Manager) NodesConfigured ¶
NodesConfigured returns all nodes configured to be part of the cluster
func (*Manager) RemoveNode ¶
RemoveNode remove a cluster node from the list of servers to connect to, and close its connections
func (*Manager) StateDump ¶
func (m *Manager) StateDump()
StateDump dumps the current state of the cluster to the log
func (*Manager) UpdateSettings ¶
UpdateSettings allows you to update a running cluster node with new settings
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node defines a node of the cluster
type NodeMessage ¶
type NodeMessage struct { Node string // node to send message to Message interface{} // message to send to node }
NodeMessage is used for sending private messages between cluster nodes
type Packet ¶
type Packet struct { Name string `json:"name"` DataType string `json:"datatype"` DataMessage string `json:"datamessage"` Time time.Time `json:"time"` }
Packet is a cluster communication packet
func UnpackPacket ¶
UnpackPacket unpacks a packet and returns its structure
type Settings ¶
type Settings struct { PingInterval time.Duration // how over to ping a node JoinDelay time.Duration // delay before announcing node (done to prevent duplicate join messages on simultainious connects) (must be shorter than ping timeout) ReadTimeout time.Duration // timeout when to discard a node as broken if not read anything before this ConnectInterval time.Duration // how often we try to reconnect to lost cluster nodes ConnectTimeout time.Duration // how long to try to connect to a node }
Settings contains the adjustable setting for the cluster