Documentation ¶
Overview ¶
Package manager contains the management code for EliasDB's clustering feature.
The management code deals with cluster building, general communication between cluster members, verification of communicating peers and monitoring of members.
The cluster structure is pure peer-to-peer design with no single point of failure. All members of the cluster share a versioned cluster state which is persisted. Members have to manually be added or removed from the cluster. Each member also has a member info object which can be used by the application which uses the cluster to store additional member related information.
Temporary failures are detected automatically. Every member of the cluster monitors the state of all its peers by sending ping requests to them on a regular schedule.
Index ¶
- Constants
- Variables
- type Client
- func (mc *Client) FailedPeerErrors() []string
- func (mc *Client) FailedPeers() []string
- func (mc *Client) FailedTotal() int
- func (mc *Client) IsFailed(name string) bool
- func (mc *Client) OperationalPeers() ([]string, error)
- func (mc *Client) SendAcquireClusterLock(lockName string) error
- func (mc *Client) SendDataRequest(member string, reqdata interface{}) (interface{}, error)
- func (mc *Client) SendEjectMember(member string, memberToEject string) error
- func (mc *Client) SendJoinCluster(targetMember string, targetMemberRPC string) (map[string]interface{}, error)
- func (mc *Client) SendMemberInfoRequest(member string) (map[string]interface{}, error)
- func (mc *Client) SendPing(member string, rpc string) ([]string, error)
- func (mc *Client) SendReleaseClusterLock(lockName string) error
- func (mc *Client) SendRequest(member string, remoteCall RPCFunction, args map[RequestArgument]interface{}) (interface{}, error)
- func (mc *Client) SendStateInfoRequest(member string) (map[string]interface{}, error)
- type DefaultStateInfo
- type Error
- type Logger
- type MemStateInfo
- type MemberManager
- func (mm *MemberManager) EjectMember(memberToEject string) error
- func (mm *MemberManager) HousekeepingWorker()
- func (mm *MemberManager) JoinCluster(newMemberName string, newMemberRPC string) error
- func (mm *MemberManager) JoinNewMember(newMemberName string, newMemberRPC string) error
- func (mm *MemberManager) LogInfo(v ...interface{})
- func (mm *MemberManager) MemberInfo() map[string]interface{}
- func (mm *MemberManager) MemberInfoCluster() map[string]map[string]interface{}
- func (mm *MemberManager) Members() []string
- func (mm *MemberManager) Name() string
- func (mm *MemberManager) NetAddr() string
- func (mm *MemberManager) SetEventHandler(notifyStateUpdate func(), notifyHouseKeeping func())
- func (mm *MemberManager) SetHandleDataRequest(handleDataRequest func(interface{}, *interface{}) error)
- func (mm *MemberManager) Shutdown() error
- func (mm *MemberManager) Start() error
- func (mm *MemberManager) StateInfo() StateInfo
- func (mm *MemberManager) UpdateClusterStateInfo() error
- type MemberToken
- type RPCFunction
- type RequestArgument
- type Server
- func (ms *Server) AcquireLock(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) AddMember(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) DataRequest(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) EjectMember(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) JoinCluster(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) Ping(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) ReleaseLock(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{}, response *interface{}) error
- func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{}, response *interface{}) error
- type StateInfo
Constants ¶
const ( StateInfoTS = "ts" // Timestamp of state info StateInfoTSOLD = "tsold" // Previous timestamp of state info StateInfoMEMBERS = "members" // List of known cluster members StateInfoFAILED = "failed" // List of failed peers StateInfoREPFAC = "replication" // Replication factor of the cluster )
Known StateInfo entries
const ( MemberInfoError = "error" // Error message if a member was not reachable MemberInfoTermURL = "termurl" // URL to the cluster terminal of the member )
Known MemberInfo entries
const ( RPCPing RPCFunction = "Ping" RPCSIRequest = "StateInfoRequest" RPCMIRequest = "MemberInfoRequest" RPCAcquireLock = "AcquireLock" RPCReleaseLock = "ReleaseLock" RPCJoinCluster = "JoinCluster" RPCAddMember = "AddMember" RPCEjectMember = "EjectMember" RPCUpdateStateInfo = "UpdateStateInfo" RPCDataRequest = "DataRequest" )
List of all possible RPC functions. The list includes all RPC callable functions in this file.
const (
ClusterLockUpdateStateInfo = "ClusterLockUpdateStateInfo"
)
Known cluster locks
const ConfigClusterSecret = "ClusterSecret"
ConfigClusterSecret is the secret which authorizes a cluster member (the secret must never be send directly over the network)
const ConfigMemberName = "ClusterMemberName"
ConfigMemberName is the name of the cluster member
const ConfigRPC = "ClusterMemberRPC"
ConfigRPC is the PRC network interface for the local cluster manager
const ConfigReplicationFactor = "ReplicationFactor"
ConfigReplicationFactor is the number of times a given datum must be stored redundently. The cluster can suffer n-1 member losses before it becomes inoperational. The value is set once in the configuration and becomes afterwards part of the global cluster state info (once this is there the config value is ignored).
Variables ¶
var ( ErrMemberComm = errors.New("Network error") ErrMemberError = errors.New("Member error") ErrClusterConfig = errors.New("Cluster configuration error") ErrClusterState = errors.New("Cluster state error") ErrUnknownPeer = errors.New("Unknown peer member") ErrUnknownTarget = errors.New("Unknown target member") ErrInvalidToken = errors.New("Invalid member token") ErrNotMember = errors.New("Client is not a cluster member") ErrLockTaken = errors.New("Requested lock is already taken") ErrLockNotOwned = errors.New("Requested lock not owned") )
Cluster related error types
var DefaultConfig = map[string]interface{}{ ConfigRPC: "127.0.0.1:9030", ConfigMemberName: "member1", ConfigClusterSecret: "secret123", ConfigReplicationFactor: 1.0, }
DefaultConfig is the defaut configuration
var DialTimeout = 10 * time.Second
DialTimeout is the dial timeout for RPC connections
var FreqHousekeeping float64 = 1000
FreqHousekeeping is the frequency of running housekeeping tasks (ms)
var LogDebug = Logger(LogNull)
LogDebug is called if a debug message is logged in the cluster code (by default disabled)
var LogInfo = Logger(log.Print)
LogInfo is called if an info message is logged in the cluster code
var LogNull = func(v ...interface{}) {
}
LogNull is a discarding logger to be used for disabling loggers
var MemberErrorExceptions map[string][]string
MemberErrorExceptions map to exclude members from simulated member errors (only used for testing)
var MemberErrors map[string]error
MemberErrors map for simulated member errors (only used for testing)
var MsiRetFlush error
MsiRetFlush nil or the error which should be returned by a Flush call
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the client for the RPC cluster API of a cluster member.
func (*Client) FailedPeerErrors ¶
FailedPeerErrors returns the same list as FailedPeers but with error messages.
func (*Client) FailedPeers ¶
FailedPeers returns a list of failed members.
func (*Client) FailedTotal ¶
FailedTotal returns the total number of failed members.
func (*Client) OperationalPeers ¶
OperationalPeers returns all operational peers and an error if too many cluster members have failed.
func (*Client) SendAcquireClusterLock ¶
SendAcquireClusterLock tries to acquire a named lock on all members of the cluster. It fails if the lock is alread acquired or if not enough cluster members can be reached.
func (*Client) SendDataRequest ¶
SendDataRequest sends a data request to a member and returns its response.
func (*Client) SendEjectMember ¶
SendEjectMember sends a request to eject a member from the cluster.
func (*Client) SendJoinCluster ¶
func (mc *Client) SendJoinCluster(targetMember string, targetMemberRPC string) (map[string]interface{}, error)
SendJoinCluster sends a request to a cluster member to join the caller to the cluster. Pure clients cannot use this function as this call requires the Client.rpc field to be set.
func (*Client) SendMemberInfoRequest ¶
SendMemberInfoRequest requests the static member info of a member and returns it.
func (*Client) SendPing ¶
SendPing sends a ping to a member and returns the result. Second argument is optional if the target member is not a known peer. Should be an empty string in all other cases.
func (*Client) SendReleaseClusterLock ¶
SendReleaseClusterLock tries to release a named lock on all members of the cluster. It is not an error if a lock is not takfen (or has expired) on this member or any other target member.
func (*Client) SendRequest ¶
func (mc *Client) SendRequest(member string, remoteCall RPCFunction, args map[RequestArgument]interface{}) (interface{}, error)
SendRequest sends a request to another cluster member. Not reachable members get an entry in the failed map and the error return is ErrMemberComm. All other error returns should be considered serious errors.
type DefaultStateInfo ¶
type DefaultStateInfo struct { *datautil.PersistentMap // contains filtered or unexported fields }
DefaultStateInfo is the default state info which uses a file to persist its data.
func (*DefaultStateInfo) Flush ¶
func (dsi *DefaultStateInfo) Flush() error
Flush persists the state info.
func (*DefaultStateInfo) Get ¶
func (dsi *DefaultStateInfo) Get(key string) (interface{}, bool)
Get retrieves some data from the state info.
func (*DefaultStateInfo) Map ¶
func (dsi *DefaultStateInfo) Map() map[string]interface{}
Map returns the state info as a map.
func (*DefaultStateInfo) Put ¶
func (dsi *DefaultStateInfo) Put(key string, value interface{})
Put stores some data in the state info.
type Error ¶
type Error struct { Type error // Error type (to be used for equal checks) Detail string // Details of this error }
Error is a cluster related error
type Logger ¶
type Logger func(v ...interface{})
Logger is a function which processes log messages from the cluster
type MemStateInfo ¶
type MemStateInfo struct {
// contains filtered or unexported fields
}
MemStateInfo is a state info object which does not persist its data.
func (*MemStateInfo) Get ¶
func (msi *MemStateInfo) Get(key string) (interface{}, bool)
Get retrieves some data from the state info.
func (*MemStateInfo) Map ¶
func (msi *MemStateInfo) Map() map[string]interface{}
Map returns the state info as a map.
func (*MemStateInfo) Put ¶
func (msi *MemStateInfo) Put(key string, value interface{})
Put stores some data in the state info.
type MemberManager ¶
type MemberManager struct { StopHousekeeping bool // Flag to temporarily stop housekeeping Client *Client // RPC client object // contains filtered or unexported fields }
MemberManager is the management object for a cluster member.
This is the main object of the clustering code it contains the main API. A member registers itself to the rpc server which is the global ManagerServer (server) object. Each cluster member needs to have a unique name. Communication between members is secured by using a secret string which is never exchanged over the network and a hash generated token which identifies a member.
Each MemberManager object contains a Client object which can be used to communicate with other cluster members. This object should be used by pure clients - code which should communicate with the cluster without running an actual member.
func NewMemberManager ¶
func NewMemberManager(rpcInterface string, name string, secret string, stateInfo StateInfo) *MemberManager
NewMemberManager create a new MemberManager object.
func (*MemberManager) EjectMember ¶
func (mm *MemberManager) EjectMember(memberToEject string) error
EjectMember ejects a member from the current cluster. Trying to remove a non-existent member has no effect.
func (*MemberManager) HousekeepingWorker ¶
func (mm *MemberManager) HousekeepingWorker()
HousekeepingWorker is the background thread which handles various tasks to provide "eventual" consistency for the cluster.
func (*MemberManager) JoinCluster ¶
func (mm *MemberManager) JoinCluster(newMemberName string, newMemberRPC string) error
JoinCluster lets this member try to join an existing cluster. The secret must be correct otherwise the member will be rejected.
func (*MemberManager) JoinNewMember ¶
func (mm *MemberManager) JoinNewMember(newMemberName string, newMemberRPC string) error
JoinNewMember joins a new member to the current cluster. It is assumed that the new members token has already been verified.
func (*MemberManager) LogInfo ¶
func (mm *MemberManager) LogInfo(v ...interface{})
LogInfo logs a member related message at info level.
func (*MemberManager) MemberInfo ¶
func (mm *MemberManager) MemberInfo() map[string]interface{}
MemberInfo returns the current static member info. Clients may modify the returned map. Member info can be used to store additional information on every member (e.g. a member specific URL).
func (*MemberManager) MemberInfoCluster ¶
func (mm *MemberManager) MemberInfoCluster() map[string]map[string]interface{}
MemberInfoCluster returns the current static member info for every known cluster member. This calls every member in the cluster.
func (*MemberManager) Members ¶
func (mm *MemberManager) Members() []string
Members returns a list of all cluster members.
func (*MemberManager) NetAddr ¶
func (mm *MemberManager) NetAddr() string
NetAddr returns the network address of the member.
func (*MemberManager) SetEventHandler ¶
func (mm *MemberManager) SetEventHandler(notifyStateUpdate func(), notifyHouseKeeping func())
SetEventHandler sets event handler funtions which are called when the state info is updated or when housekeeping has been done.
func (*MemberManager) SetHandleDataRequest ¶
func (mm *MemberManager) SetHandleDataRequest(handleDataRequest func(interface{}, *interface{}) error)
SetHandleDataRequest sets the data request handler.
func (*MemberManager) Shutdown ¶
func (mm *MemberManager) Shutdown() error
Shutdown shuts the member manager rpc server for this cluster member down.
func (*MemberManager) Start ¶
func (mm *MemberManager) Start() error
Start starts the manager process for this cluster member.
func (*MemberManager) StateInfo ¶
func (mm *MemberManager) StateInfo() StateInfo
StateInfo returns the current state info.
func (*MemberManager) UpdateClusterStateInfo ¶
func (mm *MemberManager) UpdateClusterStateInfo() error
UpdateClusterStateInfo updates the members state info and sends it to all members in the cluster.
type MemberToken ¶
MemberToken is used to authenticate a member in the cluster
type RPCFunction ¶
type RPCFunction string
RPCFunction is used to identify the called function in a RPC call
type RequestArgument ¶
type RequestArgument int
RequestArgument is used to identify arguments in a RPC call
const ( RequestTARGET RequestArgument = iota // Required argument which identifies the target cluster member RequestTOKEN // Client token which is used for authorization checks RequestLOCK // Lock name which a member requests to take RequestMEMBERNAME // Name for a member RequestMEMBERRPC // Rpc address and port for a member RequestSTATEINFOMAP // StateInfo object as a map RequestDATA // Data request object )
List of all possible arguments in a RPC request. There are usually no checks which give back an error if a required argument is missing. The RPC API is an internal API and might change without backwards compatibility.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the RPC exposed cluster API of a cluster member. Server is a singleton and will route incoming (authenticated) requests to registered MemberManagers. The calling member is referred to as source member and the called member is referred to as target member.
func (*Server) AcquireLock ¶
func (ms *Server) AcquireLock(request map[RequestArgument]interface{}, response *interface{}) error
AcquireLock tries to acquire a named lock for the source member on the target member. It fails if the lock is alread acquired by a different member. The lock can only be held for a limited amount of time.
func (*Server) AddMember ¶
func (ms *Server) AddMember(request map[RequestArgument]interface{}, response *interface{}) error
AddMember adds a new member on the target member.
func (*Server) DataRequest ¶
func (ms *Server) DataRequest(request map[RequestArgument]interface{}, response *interface{}) error
DataRequest handles a data request.
func (*Server) EjectMember ¶
func (ms *Server) EjectMember(request map[RequestArgument]interface{}, response *interface{}) error
EjectMember can be called by a cluster member to eject itself or another cluster member.
func (*Server) JoinCluster ¶
func (ms *Server) JoinCluster(request map[RequestArgument]interface{}, response *interface{}) error
JoinCluster is used by a new member if it wants to join the cluster.
func (*Server) MemberInfoRequest ¶
func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{}, response *interface{}) error
MemberInfoRequest answers with the member's static info.
func (*Server) Ping ¶
func (ms *Server) Ping(request map[RequestArgument]interface{}, response *interface{}) error
Ping answers with a Pong if the given client token was verified and the local cluster member exists.
func (*Server) ReleaseLock ¶
func (ms *Server) ReleaseLock(request map[RequestArgument]interface{}, response *interface{}) error
ReleaseLock releases a lock. Only the member which holds the lock can release it.
func (*Server) StateInfoRequest ¶
func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{}, response *interface{}) error
StateInfoRequest answers with the member's state info.
func (*Server) UpdateStateInfo ¶
func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{}, response *interface{}) error
UpdateStateInfo updates the state info of the target member.
type StateInfo ¶
type StateInfo interface { /* Put stores some data in the state info. */ Put(key string, value interface{}) /* Get retrievtes some data from the state info. */ Get(key string) (interface{}, bool) /* Map returns the state info as a map. */ Map() map[string]interface{} /* Flush persists the state info. */ Flush() error }
StateInfo models a state object which stores cluster related data. This information is exchanged between cluster members. It is not expected that the info changes frequently.
func NewDefaultStateInfo ¶
NewDefaultStateInfo creates a new DefaultStateInfo.