cluster

package
v0.0.0-...-17edc22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package cluster implements a interface for clustered services. It allows you to interace to a cluster node, and messages sent across will be received by the connected cluster nodes. You can interface freely between these N nodes. Additionaly there is also an API interface for passing commands to the cluster.

Interfacing

You can interface through the Cluster Manager using channels. Messages that are not used specificly by the cluster are forwarded to the client application using this cluster library.

manager.ToCluster <- interface{} // send data to the cluster
package := <-manager.FromCluster // recieve Package{} from the cluster containing the sent data (required)

This interface allows you to send data to the cluster, which will be broadcasted across the connected nodes.

state := <- manager.QuorumState // bool returning current quorum state, will update on node join/leave
node := <- manager.NodeJoin  // string of node joining the cluster
node := <- manager.NodeLeave // string of node leaving the cluster

These channels are available to read additional cluster status updates

request := <-manager.FromClusterApi // recieve APIRequest{} send via the API interface by a client

With APIEnabled you can recieve API requests though an authenticated web interface

log := <-manager.Log // recieve Logging from the debug package

Read the Log channel to receive cluster wide logging

Example

Example shows a single node able to send and recieve messages You can send any type of data to the cluster

// start cluster 1
manager := NewManager("node1", "secret")
manager.AddNode("node2", "127.0.0.1:9655")
err := manager.ListenAndServe("127.0.0.1:9654")
if err != nil {
	log.Fatal(err)
}

// start cluster 2
manager2 := NewManager("node2", "secret")
manager2.AddNode("node1", "127.0.0.1:9654")
err = manager2.ListenAndServe("127.0.0.1:9655")
if err != nil {
	log.Fatal(err)
}

// wait for cluster join to be complete
<-manager2.NodeJoin

// send message to all nodes of manager2
manager2.ToCluster <- "Hello World!"

// process all channels
for {
	select {
	// case logentry := <-manager2.Log:
	// log.Printf("manager2.log: %s\n", logentry)
	case p := <-manager.FromCluster:
		var cm string
		err := p.Message(&cm)
		if err != nil {
			log.Printf("Unable to get message from package: %s\n", err)
		}
		//log.Printf("we received a custom message: %s\n", cm)
		return
	}
}
Output:

Index

Examples

Constants

View Source
const (
	// StatusOffline is a new node, starting in offline state
	StatusOffline = "Offline"
	// StatusAuthenticating is a node doing authentication
	StatusAuthenticating = "Authenticating"
	// StatusShutdown is a node stopping
	StatusShutdown = "Stopping"
	// StatusOnline is a node online
	StatusOnline = "Online"
	// StatusLeaving is a node leaving
	StatusLeaving = "Leaving"
)

Variables

View Source
var (
	// APITokenSigningKey is key used to sign jtw tokens
	APITokenSigningKey = rndKey()
	// APITokenDuration is how long the jwt token is valid
	APITokenDuration = 1 * time.Hour
	// APIEnabled defines wether or not the API is enabled
	APIEnabled = true
)
View Source
var (
	// ChannelBufferSize the size of the channel buffer
	ChannelBufferSize = 100
)
View Source
var (
	// LogTraffic true/false to log all traffic sent to and received from nodes
	LogTraffic = false
)

Functions

This section is empty.

Types

type APIClusterNode

type APIClusterNode struct {
	Name     string        `json:"name"`
	Addr     string        `json:"addr"`
	Status   string        `json:"status"`
	Error    string        `json:"error"`
	JoinTime time.Time     `json:"jointime"`
	Lag      time.Duration `json:"lag"`
	Packets  int64         `json:"packets"`
}

APIClusterNode contains details of a node we might connect to used for the API

type APIClusterNodeList

type APIClusterNodeList struct {
	Nodes map[string]APIClusterNode `json:"nodes"`
}

APIClusterNodeList contains a list of configured/connected nodes used for the API

type APIRequest

type APIRequest struct {
	Action  string `json:"action"`
	Manager string `json:"manager"`
	Node    string `json:"node"`
	Data    string `json:"data"`
}

APIRequest is used to pass requests done to the cluster API to the client application

type Manager

type Manager struct {
	sync.RWMutex

	FromCluster    chan Packet      // data received from cluster
	FromClusterAPI chan APIRequest  // data received from cluster via API interface
	ToCluster      chan interface{} // data send to cluster
	ToNode         chan NodeMessage // data send to specific node
	Log            chan string      // logging messages go here
	NodeJoin       chan string      // returns string of the node joining
	NodeLeave      chan string      // returns string of the node leaving
	QuorumState    chan bool        // returns the current quorum state
	// contains filtered or unexported fields
}

Manager is the main cluster manager

func NewManager

func NewManager(name, authKey string) *Manager

NewManager creates a new cluster manager

func (*Manager) AddNode

func (m *Manager) AddNode(nodeName, nodeAddr string)

AddNode adds a cluster node to the cluster to be connected to

func (*Manager) ListenAndServe

func (m *Manager) ListenAndServe(addr string) (err error)

ListenAndServe starts the listener and serves connections to clients

func (*Manager) ListenAndServeTLS

func (m *Manager) ListenAndServeTLS(addr string, tlsConfig *tls.Config) (err error)

ListenAndServeTLS starts the TLS listener and serves connections to clients

func (*Manager) Name

func (m *Manager) Name() string

Name returns the name of a cluster node

func (*Manager) NodeConfigured

func (m *Manager) NodeConfigured(nodeName string) bool

NodeConfigured returns true or false if a node is configured in the manager

func (*Manager) NodesConfigured

func (m *Manager) NodesConfigured() map[string]bool

NodesConfigured returns all nodes configured to be part of the cluster

func (*Manager) RemoveNode

func (m *Manager) RemoveNode(nodeName string)

RemoveNode remove a cluster node from the list of servers to connect to, and close its connections

func (*Manager) Shutdown

func (m *Manager) Shutdown()

Shutdown stops the cluster node

func (*Manager) StateDump

func (m *Manager) StateDump()

StateDump dumps the current state of the cluster to the log

func (*Manager) UpdateSettings

func (m *Manager) UpdateSettings(settings Settings)

UpdateSettings allows you to update a running cluster node with new settings

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node defines a node of the cluster

type NodeMessage

type NodeMessage struct {
	Node    string      // node to send message to
	Message interface{} // message to send to node
}

NodeMessage is used for sending private messages between cluster nodes

type Packet

type Packet struct {
	Name        string    `json:"name"`
	DataType    string    `json:"datatype"`
	DataMessage string    `json:"datamessage"`
	Time        time.Time `json:"time"`
}

Packet is a cluster communication packet

func UnpackPacket

func UnpackPacket(data []byte) (packet *Packet, err error)

UnpackPacket unpacks a packet and returns its structure

func (*Packet) Message

func (packet *Packet) Message(message interface{}) error

Message returns the message of a packet

type Settings

type Settings struct {
	PingInterval    time.Duration // how over to ping a node
	JoinDelay       time.Duration // delay before announcing node (done to prevent duplicate join messages on simultainious connects) (must be shorter than ping timeout)
	ReadTimeout     time.Duration // timeout when to discard a node as broken if not read anything before this
	ConnectInterval time.Duration // how often we try to reconnect to lost cluster nodes
	ConnectTimeout  time.Duration // how long to try to connect to a node
}

Settings contains the adjustable setting for the cluster

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL