funk

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2022 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package funk contains the types needed to create a cluster using Raft and serf.

Examples

Index

Constants

View Source
const (
	TCPProtocol = ProtocolType(1)
	UDPProtocol = ProtocolType(2)
)

Protocol types exposed by the endpoints

View Source
const (
	// ZeroconfSerfKind is the type used to register serf endpoints in zeroconf.
	ZeroconfSerfKind = "serf"
	// ZeroconfManagementKind is the type used to register management endpoints
	// in zeroconf.
	ZeroconfManagementKind = "mgmt"

	// ZeroconfServiceKind is the type used to register (non-cluster) services
	ZeroconfServiceKind = "svc"
)
View Source
const (
	//These are
	SerfEndpoint           = "ep.serf"
	RaftEndpoint           = "ep.raft"
	ManagementEndpoint     = "ep.clusterfunk.management" //  gRPC endpoint for management
	SerfServiceName        = "meta.serviceName"
	MonitoringEndpointName = "ep.monitoring" // Typically Prometheus endpoint with /metrics and /healthz
)

The following is a list of well-known endpoints on nodes

View Source
const (
	AggressiveMode = TimingType("aggressive") // Aggressive mode, 5 ms commit timeout
	DefaultMode    = TimingType("default")    // Default timeout, 250 ms commit timeout
	MediumMode     = TimingType("medium")     // Medium, 50 ms commit timeout
)

Timing modes for the Raft cluster

View Source
const (
	// DefaultTimeout is the the default timeout for the watchdog
	DefaultTimeout = time.Second * 300
	// DefaultPollInterval is the polling interval for the watchdog
	DefaultPollInterval = time.Second
)
View Source
const (
	EndpointPrefix = "ep."
)

The following are internal tags and values for nodes

View Source
const (
	// SerfStatusKey is the key for the serf status
	SerfStatusKey = "serf.status"
)

Variables

View Source
var (
	// SerfLeft is the status of the serf node when it has left the cluster
	SerfLeft = serf.StatusLeft.String()
	// SerfAlive is the status of the serf node when it is alive and well
	SerfAlive = serf.StatusAlive.String()
	// SerfFailed is the status of the serf node when it has failed
	SerfFailed = serf.StatusFailed.String()
)

Functions

func GetBootstrapTime added in v0.0.19

func GetBootstrapTime(m LogMessage) int64

GetBootstrapTime reads the cluster bootstrap time from the log message. If the message can't be decoded it will return -1

func ToPublicEndpoint added in v0.0.7

func ToPublicEndpoint(listenHostPort string) (string, error)

ToPublicEndpoint converts an ip:port string into a format suitable for publishing as endpoints. If the listen address is ":1288" or "0.0.0.0:1288" it should be specified as an address reachable for external clients, i.e. "[public ip]:1288".

Types

type Cluster

type Cluster interface {
	// NodeID is the local cluster node's ID
	NodeID() string

	// Name returns the cluster's name
	Name() string

	// Start launches the cluster, ie joins a Serf cluster and announces its
	// presence
	Start() error

	// Stop stops the cluster
	Stop()

	// Role is the current role of the node
	Role() NodeRole

	// State is the current cluster state
	State() NodeState

	// Events returns an event channel for the cluster. The channel will
	// be closed when the cluster is stopped. Events are for information only
	Events() <-chan Event

	// Leader returns the node ID of the leader in the cluster. If no leader
	// is currently elected it will return a blank string.
	Leader() string

	// Nodes returns a list of the node IDs of each member
	Nodes() []string

	// GetEndpoint returns the endpoint for a particular node. If the node
	// or endpoint isn't found it will return a blank. The endpoint is
	// retrieved from the Serf cluster. Note that there's no guarantee
	// that the node will be responding on that endpoint.
	GetEndpoint(nodeID string, endpointName string) string

	// NewObserver returns a new endpoint observer for endpoints in and around
	// the cluster
	NewObserver() EndpointObserver

	// Created returns the time the cluster was created. This is set when the
	// cluster is bootstrapped.
	Created() time.Time

	EndpointRegistrator
}

Cluster is a wrapper for the Serf and Raft libraries. It will handle typical cluster operations.

func NewCluster

func NewCluster(params Parameters, shardManager sharding.ShardMap) (Cluster, error)

NewCluster returns a new cluster (client)

type ClusterWatchdog added in v0.1.1

type ClusterWatchdog struct {
	Callback     func()
	Timeout      time.Duration
	PollInterval time.Duration
}

ClusterWatchdog is a watchdog for the cluster nodes. On occasion the Raft library doesn't send an event to signal that the cluster is operational and from the outside it looks as if the node is stuck in a candidate state forever.

func NewClusterWatchdog added in v0.1.1

func NewClusterWatchdog(cb func()) *ClusterWatchdog

NewClusterWatchdog creates a new cluster watchdog with the default timeouts and poll intervals

func (*ClusterWatchdog) Start added in v0.1.1

func (wd *ClusterWatchdog) Start(c Cluster)

Start launches the watchdog. It will trigger if the cluster is not in an operational state for the time set in the Timeout field. Default is 5 minutes.

type Endpoint added in v0.0.7

type Endpoint struct {
	NodeID        string // NodeID is the ID of the node that registered the endpoint
	ListenAddress string // ListenAddress is the registered address for the endpoint
	Name          string // Name is the name of the endpoint
	Active        bool   // Active is set to true if the endpoint is from an active node
}

Endpoint stores information on endpoints

type EndpointObserver added in v0.0.7

type EndpointObserver interface {
	// Observe returns a channel that will send newly discovered endpoints. The
	// channel is closed when the observer shuts down. The consumers should read
	// the channel as soon as possible.
	Observe() <-chan Endpoint

	// Unobserve turns off observation for the channel
	Unobserve(<-chan Endpoint)

	// Shutdown closes all observer channels
	Shutdown()

	// Endpoints returns the entire list of endpoints. Inactive endpoints are
	// omitted.
	Endpoints() []Endpoint

	// Find returns matching endpoints
	Find(name string) []Endpoint

	// FindEndpoint returns the first partially or complete matching endpoint.
	FindFirst(name string) (Endpoint, error)
}

EndpointObserver observes the cluster and generates events when endpoints are registered and deregistered. Note that the endpoints might be registered but the service might not be available. The list of endpoints is only advisory. Nodes can't unregister endpoints; once it is registered it will stick around until the node goes away. The listen addresses can be changed.

func NewEndpointObserver added in v0.0.7

func NewEndpointObserver(localNodeID string, events <-chan NodeEvent, existing []Endpoint) EndpointObserver

NewEndpointObserver creates a new EndpointObserver instance.

type EndpointRegistrator added in v0.0.31

type EndpointRegistrator interface {
	// Register an endpoint.
	RegisterEndpoint(name string, listenAddress string, protocol ProtocolType) error
}

EndpointRegistrator is a type that can register endpoints; clients, services, or cluster nodes.

type Event

type Event struct {
	State NodeState // State is the state of the local cluster node
	Role  NodeRole  // Role is the role (leader, follower...) of the local cluster node
}

Event is the interface for cluster events that are triggered. The events are triggered by changes in the node state. The Role field is informational. When the cluster is in state Operational the shard map contains the current shard mapping. If the State field is different from Operational the shard map may contain an invalid or outdated mapping.

type GRPCServerParameters

type GRPCServerParameters struct {
	Endpoint string `kong:"help='Server endpoint'"`
	TLS      bool   `kong:"help='Enable TLS'"`
	CertFile string `kong:"help='Certificate file',type='existingfile'"`
	KeyFile  string `kong:"help='Certificate key file',type='existingfile'"`
}

GRPCServerParameters is a parameter struct for gRPC services The struct uses annotations from Kong (https://github.com/alecthomas/kong)

type LeaderChangedEvent

type LeaderChangedEvent struct {
}

LeaderChangedEvent is emitted when the leader of the cluster has changed.

type LeaderLostEvent

type LeaderLostEvent struct {
}

LeaderLostEvent is emitted when the leader is about to change.

type LocalNodeStoppedEvent

type LocalNodeStoppedEvent struct {
}

LocalNodeStoppedEvent is emitted when the local node is stopping

type LogMessage

type LogMessage struct {
	MessageType LogMessageType
	AckEndpoint string
	Index       uint64
	Data        []byte
}

LogMessage is log messages sent by the leader. The payload is a byte array that can be unmarshalled into another type of message.

func NewBootstrapMessage added in v0.0.19

func NewBootstrapMessage(time int64) LogMessage

NewBootstrapMessage creates a new cluster bootstrap message

func NewLogMessage

func NewLogMessage(t LogMessageType, endpoint string, data []byte) LogMessage

NewLogMessage creates a new LogMessage instance

func (*LogMessage) MarshalBinary

func (m *LogMessage) MarshalBinary() ([]byte, error)

MarshalBinary converts a Raft log message into a LogMessage struct.

func (*LogMessage) UnmarshalBinary

func (m *LogMessage) UnmarshalBinary(buf []byte) error

UnmarshalBinary unmarshals the byte array into this instance

type LogMessageType

type LogMessageType byte

LogMessageType is the log message type we're writing at the start of every log message

const (
	// ProposedShardMap is the new shard map the leader proposes (dictatorship-style)
	// to the followers. The payload for this message is a marshalled ShardManager
	// instance.
	ProposedShardMap LogMessageType = 1

	// ShardMapCommitted is a  message that will synchronize the
	// shard map distributed in the previous message.
	ShardMapCommitted LogMessageType = 2

	// Bootstrap is a message that the leader appends to the log when the
	// cluster is first bootstrapped. The message holds the (local) time stamp
	// for the leader and indicates that the cluster was restarted. A replicated
	// log might contain more than one ClusterBootstrap message.
	Bootstrap LogMessageType = 3
)

type NodeAddedEvent

type NodeAddedEvent struct {
}

NodeAddedEvent is emitted when a new node is added to the cluster.

type NodeEvent

type NodeEvent struct {
	Event SerfEventType
	Node  SerfMember
}

NodeEvent is used for channel notifications

type NodeRemovedEvent

type NodeRemovedEvent struct {
}

NodeRemovedEvent is emitted when a node is removed from the cluster

type NodeRetiredEvent

type NodeRetiredEvent struct {
}

NodeRetiredEvent is emitted when a node is about to be retired.

type NodeRole

type NodeRole int32

NodeRole is the roles the node can have in the cluster

const (
	Unknown   NodeRole = iota // Uknown state
	Follower                  // A follower in a cluster
	Leader                    // The current leader node
	NonVoter                  // Non voting role in cluster
	NonMember                 // NonMember nodes are part of the Serf cluster but not the Raft cluste
)

These are the roles the node might have in the cluster

func (NodeRole) String

func (n NodeRole) String() string

type NodeState

type NodeState int32

NodeState is the enumeration of different states a node can be in.

const (
	Invalid     NodeState = iota // Invalid or unknown state
	Joining                      // Joining the cluster
	Operational                  // Operational, normal operation
	Voting                       // Leader election in progress
	Resharding                   // Leader is elected, resharding in progress
	Starting                     // Starting the node
	Stopping                     // Stopping the node
)

These are the (local) states the cluster node can be in

func (NodeState) String

func (n NodeState) String() string

type Parameters

type Parameters struct {
	AutoJoin       bool                 `kong:"help='Auto join via SerfEvents',default='true'"`
	Name           string               `kong:"help='Cluster name',default='clusterfunk'"`
	Interface      string               `kong:"help='Interface address for services'"`
	Verbose        bool                 `kong:"help='Verbose logging for Serf and Raft'"`
	NodeID         string               `kong:"help='Node ID for Serf and Raft',default=''"`
	ZeroConf       bool                 `kong:"help='Zero-conf startup',default='true'"`
	NonVoting      bool                 `kong:"help='Nonvoting node',default='false'"`
	NonMember      bool                 `kong:"help='Non-member',default='false'"`
	AckTimeout     time.Duration        `kong:"help='Ack timeout for nodes in the cluster',default='500ms'"`
	Metrics        string               `kong:"help='Metrics sink to use',enum='blackhole,prometheus',default='prometheus'"`
	Raft           RaftParameters       `kong:"embed,prefix='raft-'"`
	Serf           SerfParameters       `kong:"embed,prefix='serf-'"`
	Management     GRPCServerParameters `kong:"embed,prefix='management-'"`
	LeaderEndpoint string               // This isn't a parameter, it's set by the service
}

Parameters is the parameters required for the cluster. The defaults are suitable for a development cluster but not for a production cluster.

func (*Parameters) Final

func (p *Parameters) Final()

Final sets the defaults for the parameters that haven't got a sensible value, f.e. endpoints and defaults. Defaults that are random values can't be set via the parameter library. Yet.

type ProtocolType added in v0.0.31

type ProtocolType int

ProtocolType is the type of protocol the endpoint supports.

type RaftEventType

type RaftEventType int

RaftEventType is the event type for events emitted by the RaftNode type

const (
	//RaftClusterSizeChanged is emitted when a new node is added
	RaftClusterSizeChanged RaftEventType = iota
	// RaftLeaderLost is emitted when the leader is lost, ie the node enters the candidate state
	RaftLeaderLost
	// RaftBecameLeader is emitted when the leader becomes the leader
	RaftBecameLeader
	// RaftBecameFollower is emitted when the node becomes a follower
	RaftBecameFollower
	// RaftReceivedLog is emitted when a log entry is receievd
	RaftReceivedLog
	// RaftShutdown is emitted when the node is shutting down
	RaftShutdown
	// RaftUndefinedEvent is the undefined event type
	RaftUndefinedEvent
)

func (RaftEventType) String

func (r RaftEventType) String() string

String is the string representation of the event

type RaftNode

type RaftNode struct {
	Nodes toolbox.StringSet
	// contains filtered or unexported fields
}

RaftNode is a wrapper for the Raft library. The raw events are coalesced into higher level events (particularly RaftClusterSizeChanged). Coalesced events introduce a small (millisecond) delay on the events but everything on top of this library will operate in the millisecond range.

In addition this type keeps track of the active nodes at all times via the raft events. There's no guarantee that the list of nodes in the cluster will be up to date or correct for the followers. The followers will only interact with the leader of the cluster.

func NewRaftNode

func NewRaftNode() *RaftNode

NewRaftNode creates a new RaftNode instance

func (*RaftNode) AddClusterNode

func (r *RaftNode) AddClusterNode(nodeID string, endpoint string) error

AddClusterNode adds a new node to the cluster. Must be leader to perform this operation.

func (*RaftNode) AppendLogEntry

func (r *RaftNode) AppendLogEntry(data []byte) (uint64, error)

AppendLogEntry appends a log entry to the log. The function returns when there's a quorum in the cluster

func (*RaftNode) Apply

func (r *RaftNode) Apply(l *raft.Log) interface{}

Apply log is invoked once a log entry is committed. It returns a value which will be made available in the ApplyFuture returned by Raft.Apply method if that method was called on the same Raft node as the FSM.

func (*RaftNode) DisableNode

func (r *RaftNode) DisableNode(id string)

DisableNode disables a node (in reality removing it from the local node list but it is still a member of the Raft cluster)

func (*RaftNode) EnableNode

func (r *RaftNode) EnableNode(id string)

EnableNode enables a node that has been disabled. The node might be a part of the cluster but not available.

func (*RaftNode) Endpoint

func (r *RaftNode) Endpoint() string

Endpoint returns the Raft endpoint (aka bind address)

func (*RaftNode) Events

func (r *RaftNode) Events() <-chan RaftEventType

Events returns the event channel. There is only one event channel so use multiple listeners at your own peril. You will get NodeAdded, NodeRemoved, LeaderLost and LeaderChanged events on this channel.

func (*RaftNode) GetLogMessages

func (r *RaftNode) GetLogMessages(startingIndex uint64) []LogMessage

GetLogMessages returns the replicated log message with the specified type ID

func (*RaftNode) LastLogIndex

func (r *RaftNode) LastLogIndex() uint64

LastLogIndex returns the last log index received

func (*RaftNode) Leader

func (r *RaftNode) Leader() bool

Leader returns true if this node is the leader. This will verify the leadership with the Raft library.

func (*RaftNode) LeaderNodeID

func (r *RaftNode) LeaderNodeID() string

LeaderNodeID returns the Node ID for the leader

func (*RaftNode) LocalNodeID

func (r *RaftNode) LocalNodeID() string

LocalNodeID returns the local NodeID

func (*RaftNode) RefreshNodes

func (r *RaftNode) RefreshNodes()

RefreshNodes refreshes the node list by reading the Raft configuration. If any events are skipped by the Raft library we'll get the appropriate events and an updated node list after this call.

func (*RaftNode) RemoveClusterNode

func (r *RaftNode) RemoveClusterNode(nodeID string, endpoint string) error

RemoveClusterNode removes a node from the cluster. Must be leader to perform this operation.

func (*RaftNode) Restore

func (r *RaftNode) Restore(io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state.

func (*RaftNode) Snapshot

func (r *RaftNode) Snapshot() (raft.FSMSnapshot, error)

Snapshot is used to support log compaction. This call should return an FSMSnapshot which can be used to save a point-in-time snapshot of the FSM. Apply and Snapshot are not called in multiple threads, but Apply will be called concurrently with Persist. This means the FSM should be implemented in a fashion that allows for concurrent updates while a snapshot is happening.

func (*RaftNode) Start

func (r *RaftNode) Start(nodeID string, cfg RaftParameters) (bool, error)

Start launches the node. The return value is set to true if the cluster is bootstrapped

func (*RaftNode) StepDown

func (r *RaftNode) StepDown() error

StepDown steps down as a leader if this is the leader node

func (*RaftNode) Stop

func (r *RaftNode) Stop(removeWhenStopping bool) error

Stop stops the node. If the removeWhenStopping flag is set and the server is the leader it will remove itself.

type RaftParameters

type RaftParameters struct {
	Endpoint   string     `kong:"help='Endpoint for Raft',default=''"`
	DiskStore  string     `kong:"help='Disk-based store',default=''"`
	Bootstrap  bool       `kong:"help='Bootstrap a new Raft cluster',default='true'"`
	Verbose    bool       `kong:"help='Verbose Raft logging',default='false'"`
	TimingMode TimingType `kong:"help='Timing mode',default='default',enum='aggressive,medium,default'"`
	DebugLog   bool       `kong:"help='Show debug log messages for Raft',default='false'"`
}

RaftParameters is the configuration for the Raft cluster

type SerfEventType

type SerfEventType int

SerfEventType is the type of events the SerfNode emits

const (
	SerfNodeJoined  SerfEventType = iota // A node joins the cluster
	SerfNodeLeft                         // A node has left the cluster
	SerfNodeUpdated                      // A node's tags are updated
)

Serf event types.

func (SerfEventType) String

func (s SerfEventType) String() string

type SerfMember

type SerfMember struct {
	NodeID string
	State  string
	Tags   map[string]string
}

SerfMember holds information on members in the Serf cluster.

type SerfNode

type SerfNode struct {
	// contains filtered or unexported fields
}

SerfNode is a wrapper around the Serf library

func NewSerfNode

func NewSerfNode() *SerfNode

NewSerfNode creates a new SerfNode instance

func (*SerfNode) Endpoints added in v0.0.7

func (s *SerfNode) Endpoints() []Endpoint

Endpoints returns all the endpoints in the cluster

func (*SerfNode) Events

func (s *SerfNode) Events() <-chan NodeEvent

Events returns a notification channel. If the client isn't reading the events will be dropped.

func (*SerfNode) GetClusterTag added in v0.0.8

func (s *SerfNode) GetClusterTag(name string) string

GetClusterTag returns the first tag in the cluster that matches the name. if no matching tag is found an empty string is returned.

func (*SerfNode) ID added in v0.0.7

func (s *SerfNode) ID() string

ID returns the

func (*SerfNode) LoadMembers

func (s *SerfNode) LoadMembers() []SerfMember

LoadMembers reads the list of existing members in the Serf cluster

func (*SerfNode) Node

func (s *SerfNode) Node(nodeID string) SerfMember

Node returns information on a particular node. If the node isn't found the node returned will be empty

func (*SerfNode) Nodes

func (s *SerfNode) Nodes() []SerfMember

Nodes returns a list of known member nodes

func (*SerfNode) PublishTags

func (s *SerfNode) PublishTags() error

PublishTags publishes the tags to the other members of the cluster

func (*SerfNode) SetTag

func (s *SerfNode) SetTag(name, value string)

SetTag sets a tag on the serf node. The tags are not updated until PublishTags are called by the client

func (*SerfNode) Size

func (s *SerfNode) Size() int

Size returns the size of the member list

func (*SerfNode) Start

func (s *SerfNode) Start(nodeID string, serviceName string, cfg SerfParameters) error

Start launches the serf node. The service name can be left blank for clients.

func (*SerfNode) Stop

func (s *SerfNode) Stop() error

Stop shuts down the node

type SerfParameters

type SerfParameters struct {
	Endpoint     string   `kong:"help='Endpoint for Serf',default=''"`
	JoinAddress  []string `kong:"help='Join address and port for Serf cluster'"`
	Verbose      bool     `kong:"help='Verbose logging for Serf'"`
	SnapshotPath string   `kong:"help='Snapshot path (for persistence)'"`
}

SerfParameters holds parameters for the Serf client

func (*SerfParameters) Final

func (s *SerfParameters) Final()

Final populates empty fields with default values

type ServiceNode added in v0.0.7

type ServiceNode interface {
	RegisterServiceEndpoint(endpointName string, listenAddress string) error
	Stop()
}

ServiceNode is a node for services that will be discoverable but not a member of the cluster. Nonvoting nodes requires a fair bit of capacity in the cluster but service nodes are more loosely related and may provide services to the cluster itself.

func NewServiceNode added in v0.0.7

func NewServiceNode(serviceName string, params ServiceParameters) (ServiceNode, error)

NewServiceNode creates a new ServiceNode instance

type ServiceParameters added in v0.0.7

type ServiceParameters struct {
	Name     string         `kong:"help='Cluster name',default='clusterfunk'"`
	NodeID   string         `kong:"help='Node ID for Serf node',default=''"`
	Verbose  bool           `kong:"help='Verbose logging for Serf and Raft'"`
	ZeroConf bool           `kong:"help='Zero-conf startup',default='true'"`
	Serf     SerfParameters `kong:"embed,prefix='serf-'"`
}

ServiceParameters is configuration parameters for service nodes, ie nodes that aren't part of the cluster but provides discoverable services to the cluster (and cluster clients)

func (*ServiceParameters) Final added in v0.0.7

func (s *ServiceParameters) Final()

Final assigns default values to the fields that have defaults if they're not asigned yet (like the NodeID)

type TimingType added in v0.0.33

type TimingType string

TimingType is the timing for the Raft cluster. Aggressive mode works well on the loopback adapter and on nodes connected directly by a switch. Default mode is the default values in the Raft library and might work better on nodes with high load (and possibly on burstable AWS instances).

Directories

Path Synopsis
Package clusterpb contains the generate gRPC code
Package clusterpb contains the generate gRPC code
Package managepb contains the protobuf-generate code for the cluster management interface
Package managepb contains the protobuf-generate code for the cluster management interface
Package metrics handles metrics for the cluster.
Package metrics handles metrics for the cluster.
shardpb
Package shardpb contains the protobuf-generated code for the internal cluster communication
Package shardpb contains the protobuf-generated code for the internal cluster communication

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL