graft

package module
v0.0.0-...-3f8f888 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0 Imports: 19 Imported by: 10

README

Graft

A RAFT Election implementation in Go. More information on RAFT can be found in this research paper and this video.

License Apache 2 Build Status Coverage Status

Overview

RAFT is a consensus based algorithm that produces consistent state through replicated logs and leader elections.

Example usage of the election algorithm is to produce guaranteed leaders for N-wise scalability and elimination of SPOF (Single Point of Failure) within a system.

Example Usage


ci := graft.ClusterInfo{Name: "health_manager", Size: 3}
rpc, err := graft.NewNatsRpc(&nats.DefaultOptions)
errChan := make(chan error)
stateChangeChan := make(chan StateChange)
handler := graft.NewChanHandler(stateChangeChan, errChan)

node, err := graft.New(ci, handler, rpc, "/tmp/graft.log");

// ...

if node.State() == graft.LEADER {
  // Process as a LEADER
}

select {
  case sc := <- stateChangeChan:
    // Process a state change
  case err := <- errChan:
    // Process an error, log etc.
}

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

Documentation

Overview

Graft is a RAFT implementation. Currently only the election functionality is supported.

Index

Constants

View Source
const (
	VERSION = "0.7"

	// Election timeout MIN and MAX per RAFT spec suggestion.
	MIN_ELECTION_TIMEOUT = 500 * time.Millisecond
	MAX_ELECTION_TIMEOUT = 2 * MIN_ELECTION_TIMEOUT

	// Heartbeat tick for LEADERS.
	// Should be << MIN_ELECTION_TIMEOUT per RAFT spec.
	HEARTBEAT_INTERVAL = 100 * time.Millisecond

	NO_LEADER = ""
	NO_VOTE   = ""
)
View Source
const (
	NO_MEMBERSHIP = int32(iota)
	GRP_A
	GRP_B
)

Membership designations for split network simulations.

View Source
const (
	HEARTBEAT_SUB = "graft.%s.heartbeat"
	VOTE_REQ_SUB  = "graft.%s.vote_request"
	VOTE_RESP_SUB = "graft.%s.vote_response"
)

The subject space for the nats rpc driver is based on the cluster name, which is filled in below on the heartbeats and vote requests. The vote responses are directed by using the node.Id().

Variables

View Source
var (
	ErrClusterName  = errors.New("graft: Cluster name can not be empty")
	ErrClusterSize  = errors.New("graft: Cluster size can not be 0")
	ErrHandlerReq   = errors.New("graft: Handler is required")
	ErrRpcDriverReq = errors.New("graft: RPCDriver is required")
	ErrLogReq       = errors.New("graft: Log is required")
	ErrLogNoExist   = errors.New("graft: Log file does not exist")
	ErrLogNoState   = errors.New("graft: Log file does not have any state")
	ErrLogCorrupt   = errors.New("graft: Encountered corrupt log file")
	ErrNotImpl      = errors.New("graft: Not implemented")
)
View Source
var (
	ErrNotInitialized = errors.New("graft(nats_rpc): Driver is not properly initialized")
)

Functions

This section is empty.

Types

type ChanHandler

type ChanHandler struct {
	StateMachineHandler
	// contains filtered or unexported fields
}

ChanHandler is a convenience handler when a user wants to simply use channels for the async handling of errors and state changes.

func NewChanHandler

func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler

NewChanHandler returns a Handler implementation which uses channels for handling errors and state changes.

func NewChanHandlerWithStateMachine

func NewChanHandlerWithStateMachine(
	stateHandler StateMachineHandler,
	scCh chan<- StateChange,
	errCh chan<- error) *ChanHandler

NewChanHandlerWithStateMachine returns a Handler implementation which uses channels for handling errors and state changes and a StateMachineHandler for hooking into external state. The external state machine influences leader election votes.

func (*ChanHandler) AsyncError

func (chand *ChanHandler) AsyncError(err error)

Queue the error onto the channel

func (*ChanHandler) StateChange

func (chand *ChanHandler) StateChange(from, to State)

Queue the state change onto the channel

type ClusterInfo

type ClusterInfo struct {
	// The cluster's name
	Name string

	// Expected members
	Size int
}

ClusterInfo expresses the name and expected size of the cluster.

type Handler

type Handler interface {
	StateMachineHandler

	// Process async errors that are encountered by the node.
	AsyncError(error)

	// Process state changes.
	StateChange(from, to State)
}

A Handler can process async callbacks from a Graft node.

type MockRpcDriver

type MockRpcDriver struct {
	// contains filtered or unexported fields
}

func NewMockRpc

func NewMockRpc() *MockRpcDriver

func (*MockRpcDriver) Close

func (rpc *MockRpcDriver) Close()

func (*MockRpcDriver) HeartBeat

func (rpc *MockRpcDriver) HeartBeat(hb *pb.Heartbeat) error

func (*MockRpcDriver) Init

func (rpc *MockRpcDriver) Init(n *Node) error

func (*MockRpcDriver) RequestVote

func (rpc *MockRpcDriver) RequestVote(vr *pb.VoteRequest) error

func (*MockRpcDriver) SendVoteResponse

func (rpc *MockRpcDriver) SendVoteResponse(candidate string, vresp *pb.VoteResponse) error

type NatsRpcDriver

type NatsRpcDriver struct {
	sync.Mutex
	// contains filtered or unexported fields
}

NatsRpcDriver is an implementation of the RPCDriver using NATS.

func NewNatsRpc

func NewNatsRpc(opts *nats.Options) (*NatsRpcDriver, error)

NewNatsRpc creates a new instance of the driver. The NATS connection will use the options passed in.

func NewNatsRpcFromConn

func NewNatsRpcFromConn(nc *nats.Conn) (*NatsRpcDriver, error)

NewNatsRpcFromConn creates a new instance of the driver using an existing NATS connection.

func (*NatsRpcDriver) Close

func (rpc *NatsRpcDriver) Close()

Close down the subscriptions and the NATS encoded connection. Will nil everything out.

func (*NatsRpcDriver) HeartBeat

func (rpc *NatsRpcDriver) HeartBeat(hb *pb.Heartbeat) error

HeartBeat is called from the Graft node to send out a heartbeat while it is a LEADER.

func (*NatsRpcDriver) HeartbeatCallback

func (rpc *NatsRpcDriver) HeartbeatCallback(hb *pb.Heartbeat)

HeartbeatCallback will place the heartbeat on the Graft node's appropriate channel.

func (*NatsRpcDriver) Init

func (rpc *NatsRpcDriver) Init(n *Node) (err error)

Init initializes the driver via the Graft node.

func (*NatsRpcDriver) RequestVote

func (rpc *NatsRpcDriver) RequestVote(vr *pb.VoteRequest) error

RequestVote is sent from the Graft node when it has become a candidate.

func (*NatsRpcDriver) SendVoteResponse

func (rpc *NatsRpcDriver) SendVoteResponse(id string, vresp *pb.VoteResponse) error

SendVoteResponse is called from the Graft node to respond to a vote request.

func (*NatsRpcDriver) VoteRequestCallback

func (rpc *NatsRpcDriver) VoteRequestCallback(vreq *pb.VoteRequest)

VoteRequestCallback will place the request on the Graft node's appropriate channel.

func (*NatsRpcDriver) VoteResponseCallback

func (rpc *NatsRpcDriver) VoteResponseCallback(vresp *pb.VoteResponse)

VoteResponseCallback will place the response on the Graft node's appropriate channel.

type Node

type Node struct {

	// Channel to receive VoteRequests.
	VoteRequests chan *pb.VoteRequest

	// Channel to receive the VoteResponses.
	VoteResponses chan *pb.VoteResponse

	// Channel to receive Heartbeats.
	HeartBeats chan *pb.Heartbeat
	// contains filtered or unexported fields
}

func New

func New(info ClusterInfo, handler Handler, rpc RPCDriver, logPath string) (*Node, error)

New will create a new Graft node. All arguments are required.

func (*Node) Close

func (n *Node) Close()

Close will shutdown the Graft node and wait until the state is processed. We will clear timers, channels, etc. and close the log.

func (*Node) ClusterInfo

func (n *Node) ClusterInfo() ClusterInfo

Convenience function for accessing the ClusterInfo.

func (*Node) CurrentTerm

func (n *Node) CurrentTerm() uint64

func (*Node) CurrentVote

func (n *Node) CurrentVote() string

func (*Node) Id

func (n *Node) Id() string

Convenience function for accessing the node's Id().

func (*Node) Leader

func (n *Node) Leader() string

func (*Node) LogPath

func (n *Node) LogPath() string

func (*Node) State

func (n *Node) State() State

Return the current state.

type RPCDriver

type RPCDriver interface {
	// Used to initialize the driver
	Init(*Node) error
	// Used to close down any state
	Close()
	// Used to respond to VoteResponses to candidates
	SendVoteResponse(candidate string, vresp *pb.VoteResponse) error
	// Used by Candidate Nodes to issue a new vote for a leader.
	RequestVote(*pb.VoteRequest) error
	// Used by Leader Nodes to Heartbeat
	HeartBeat(*pb.Heartbeat) error
}

An RPCDriver allows multiple transports to be utilized for the RAFT protocol RPCs. An instance of RPCDriver will use the *Node passed to Init() to call back into the Node when VoteRequests, VoteResponses and Heartbeat RPCs are received. They will be placed on the appropriate node's channels.

type State

type State int8
const (
	FOLLOWER State = iota
	LEADER
	CANDIDATE
	CLOSED
)

Allowable states for a Graft node.

func (State) String

func (s State) String() string

Convenience for printing, etc.

type StateChange

type StateChange struct {
	// From is the previous state.
	From State

	// To is the new state.
	To State
}

StateChange captures "from" and "to" States for the ChanHandler.

type StateMachineHandler

type StateMachineHandler interface {
	// CurrentState returns an opaque byte slice that represents the current
	// state of the state machine.
	CurrentState() []byte

	// GrantVote is called when a candidate peer has requested a vote. The
	// peer's state machine position is passed as an opaque byte slice as
	// returned by CurrentState. The returned bool determines if the vote
	// should be granted because the candidate's state machine is at least as
	// up-to-date as the receiver's state machine.
	GrantVote(position []byte) bool
}

StateMachineHandler is used to interrogate an external state machine.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL