Documentation ¶
Index ¶
- Constants
- Variables
- func LogLevel() int
- func RegisterCommand(command Command)
- func SetLogLevel(level int)
- type AppendEntriesRequest
- type AppendEntriesResponse
- func (aer *AppendEntriesResponse) CommitIndex() uint64
- func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
- func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
- func (aer *AppendEntriesResponse) Index() uint64
- func (aer *AppendEntriesResponse) Success() bool
- func (aer *AppendEntriesResponse) Term() uint64
- type Command
- type CommandApply
- type CommandEncoder
- type Config
- type Context
- type DefaultJoinCommand
- type DefaultLeaveCommand
- type Event
- type EventListener
- type GrpcServer
- func (t *GrpcServer) OnSendAppendEntriesRequest(ctx context2.Context, pbReq *protobuf.AppendEntriesRequest) (*protobuf.AppendEntriesResponse, error)
- func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq *protobuf.SnapshotRecoveryRequest) (*protobuf.SnapshotRecoveryResponse, error)
- func (t *GrpcServer) OnSendSnapshotRequest(ctx context2.Context, pbReq *protobuf.SnapshotRequest) (*protobuf.SnapshotResponse, error)
- func (t *GrpcServer) OnSendVoteRequest(ctx context2.Context, pbReq *protobuf.RequestVoteRequest) (*protobuf.RequestVoteResponse, error)
- type GrpcTransporter
- func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse)
- func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse)
- func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse)
- func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse)
- type HTTPMuxer
- type HTTPTransporter
- func (t *HTTPTransporter) AppendEntriesPath() string
- func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
- func (t *HTTPTransporter) Prefix() string
- func (t *HTTPTransporter) RequestVotePath() string
- func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
- func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
- func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
- func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
- func (t *HTTPTransporter) SnapshotPath() string
- func (t *HTTPTransporter) SnapshotRecoveryPath() string
- type JoinCommand
- type LeaveCommand
- type Log
- type LogEntry
- type NOPCommand
- type Peer
- type RequestVoteRequest
- type RequestVoteResponse
- type Server
- type Snapshot
- type SnapshotRecoveryRequest
- type SnapshotRecoveryResponse
- type SnapshotRequest
- type SnapshotResponse
- type StateMachine
- type Transporter
Constants ¶
const ( Debug = 1 Trace = 2 )
const ( StateChangeEventType = "stateChange" LeaderChangeEventType = "leaderChange" TermChangeEventType = "termChange" CommitEventType = "commit" AddPeerEventType = "addPeer" RemovePeerEventType = "removePeer" HeartbeatIntervalEventType = "heartbeatInterval" ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" HeartbeatEventType = "heartbeat" )
const ( Stopped = "stopped" Initialized = "initialized" Follower = "follower" Candidate = "candidate" Leader = "leader" Snapshotting = "snapshotting" )
const ( MaxLogEntriesPerRequest = 2000 NumberOfLogEntriesAfterSnapshot = 200 )
const ( // DefaultHeartbeatInterval is the interval that the leader will send // AppendEntriesRequests to followers to maintain leadership. DefaultHeartbeatInterval = 50 * time.Millisecond DefaultElectionTimeout = 150 * time.Millisecond )
const ElectionTimeoutThresholdPercent = 0.8
ElectionTimeoutThresholdPercent specifies the threshold at which the server will dispatch warning events that the heartbeat RTT is too close to the election timeout.
Variables ¶
var CommandTimeoutError = errors.New("raft: Command timeout")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var NotLeaderError = errors.New("raft.Server: Not current leader")
var StopError = errors.New("raft: Has been stopped")
Functions ¶
func RegisterCommand ¶
func RegisterCommand(command Command)
Registers a command by storing a reference to an instance of it.
func SetLogLevel ¶
func SetLogLevel(level int)
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { Term uint64 PrevLogIndex uint64 PrevLogTerm uint64 CommitIndex uint64 LeaderName string Entries []*protobuf.LogEntry }
The request sent to a server to append entries to the log.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
// contains filtered or unexported fields
}
The response returned from a server appending entries to the log.
func (*AppendEntriesResponse) CommitIndex ¶
func (aer *AppendEntriesResponse) CommitIndex() uint64
func (*AppendEntriesResponse) Decode ¶
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and any error that occurs.
func (*AppendEntriesResponse) Encode ¶
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes written and any error that may have occurred.
func (*AppendEntriesResponse) Index ¶
func (aer *AppendEntriesResponse) Index() uint64
func (*AppendEntriesResponse) Success ¶
func (aer *AppendEntriesResponse) Success() bool
func (*AppendEntriesResponse) Term ¶
func (aer *AppendEntriesResponse) Term() uint64
type Command ¶
type Command interface {
CommandName() string
}
Command represents an action to be taken on the replicated state machine.
type CommandApply ¶
CommandApply represents the interface to apply a command to the server.
type CommandEncoder ¶
type Context ¶
type Context interface { Server() Server CurrentTerm() uint64 CurrentIndex() uint64 CommitIndex() uint64 }
Context represents the current state of the server. It is passed into a command when the command is being applied since the server methods are locked.
type DefaultJoinCommand ¶
type DefaultJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` }
Join command
func (*DefaultJoinCommand) Apply ¶
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error)
func (*DefaultJoinCommand) CommandName ¶
func (c *DefaultJoinCommand) CommandName() string
The name of the Join command in the log
func (*DefaultJoinCommand) NodeName ¶
func (c *DefaultJoinCommand) NodeName() string
type DefaultLeaveCommand ¶
type DefaultLeaveCommand struct {
Name string `json:"name"`
}
Leave command
func (*DefaultLeaveCommand) Apply ¶
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error)
func (*DefaultLeaveCommand) CommandName ¶
func (c *DefaultLeaveCommand) CommandName() string
The name of the Leave command in the log
func (*DefaultLeaveCommand) NodeName ¶
func (c *DefaultLeaveCommand) NodeName() string
type Event ¶
type Event interface { Type() string Source() interface{} Value() interface{} PrevValue() interface{} }
Event represents an action that occurred within the Raft library. Listeners can subscribe to event types by using the Server.AddEventListener() function.
type EventListener ¶
type EventListener func(Event)
EventListener is a function that can receive event notifications.
type GrpcServer ¶
type GrpcServer struct { protobuf.UnimplementedRaftServer // contains filtered or unexported fields }
func NewGrpcServer ¶
func NewGrpcServer(server Server) *GrpcServer
Creates a new HTTP transporter with the given path prefix.
func (*GrpcServer) OnSendAppendEntriesRequest ¶
func (t *GrpcServer) OnSendAppendEntriesRequest(ctx context2.Context, pbReq *protobuf.AppendEntriesRequest) (*protobuf.AppendEntriesResponse, error)
Handles incoming AppendEntries requests.
func (*GrpcServer) OnSendSnapshotRecoveryRequest ¶
func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq *protobuf.SnapshotRecoveryRequest) (*protobuf.SnapshotRecoveryResponse, error)
Handles incoming SnapshotRecovery requests.
func (*GrpcServer) OnSendSnapshotRequest ¶
func (t *GrpcServer) OnSendSnapshotRequest(ctx context2.Context, pbReq *protobuf.SnapshotRequest) (*protobuf.SnapshotResponse, error)
Handles incoming Snapshot requests.
func (*GrpcServer) OnSendVoteRequest ¶
func (t *GrpcServer) OnSendVoteRequest(ctx context2.Context, pbReq *protobuf.RequestVoteRequest) (*protobuf.RequestVoteResponse, error)
Handles incoming RequestVote requests.
type GrpcTransporter ¶
type GrpcTransporter struct {
// contains filtered or unexported fields
}
An GrpcTransporter is a default transport layer used to communicate between multiple servers.
func NewGrpcTransporter ¶
func NewGrpcTransporter(grpcDialOptions ...grpc.DialOption) *GrpcTransporter
Creates a new HTTP transporter with the given path prefix.
func (*GrpcTransporter) SendAppendEntriesRequest ¶
func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse)
Sends an AppendEntries RPC to a peer.
func (*GrpcTransporter) SendSnapshotRecoveryRequest ¶
func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse)
Sends a SnapshotRequest RPC to a peer.
func (*GrpcTransporter) SendSnapshotRequest ¶
func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse)
Sends a SnapshotRequest RPC to a peer.
func (*GrpcTransporter) SendVoteRequest ¶
func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse)
Sends a RequestVote RPC to a peer.
type HTTPMuxer ¶
type HTTPMuxer interface {
HandleFunc(string, func(http.ResponseWriter, *http.Request))
}
type HTTPTransporter ¶
type HTTPTransporter struct { DisableKeepAlives bool Transport *http.Transport // contains filtered or unexported fields }
An HTTPTransporter is a default transport layer used to communicate between multiple servers.
func NewHTTPTransporter ¶
func NewHTTPTransporter(prefix string, timeout time.Duration) *HTTPTransporter
Creates a new HTTP transporter with the given path prefix.
func (*HTTPTransporter) AppendEntriesPath ¶
func (t *HTTPTransporter) AppendEntriesPath() string
Retrieves the AppendEntries path.
func (*HTTPTransporter) Install ¶
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
Applies Raft routes to an HTTP router for a given server.
func (*HTTPTransporter) Prefix ¶
func (t *HTTPTransporter) Prefix() string
Retrieves the path prefix used by the transporter.
func (*HTTPTransporter) RequestVotePath ¶
func (t *HTTPTransporter) RequestVotePath() string
Retrieves the RequestVote path.
func (*HTTPTransporter) SendAppendEntriesRequest ¶
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
Sends an AppendEntries RPC to a peer.
func (*HTTPTransporter) SendSnapshotRecoveryRequest ¶
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendSnapshotRequest ¶
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendVoteRequest ¶
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
Sends a RequestVote RPC to a peer.
func (*HTTPTransporter) SnapshotPath ¶
func (t *HTTPTransporter) SnapshotPath() string
Retrieves the Snapshot path.
func (*HTTPTransporter) SnapshotRecoveryPath ¶
func (t *HTTPTransporter) SnapshotRecoveryPath() string
Retrieves the SnapshotRecovery path.
type LeaveCommand ¶
Leave command interface
type Log ¶
type Log struct { ApplyFunc func(*LogEntry, Command) (interface{}, error) // contains filtered or unexported fields }
A log is a collection of log entries that are persisted to durable storage.
type LogEntry ¶
type LogEntry struct { Position int64 // position in the log file // contains filtered or unexported fields }
A log entry stores a single item in the log.
func (*LogEntry) CommandName ¶
func (*LogEntry) Decode ¶
Decodes the log entry from a buffer. Returns the number of bytes read and any error that occurs.
type NOPCommand ¶
type NOPCommand struct { }
NOP command
func (NOPCommand) Apply ¶
func (c NOPCommand) Apply(server Server) (interface{}, error)
func (NOPCommand) CommandName ¶
func (c NOPCommand) CommandName() string
The name of the NOP command in the log
type Peer ¶
type Peer struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` sync.RWMutex AppendEntryRequestChan chan bool `json:"-"` // contains filtered or unexported fields }
A peer is a reference to another server involved in the consensus protocol.
func (*Peer) LastActivity ¶
LastActivity returns the last time any response was received from the peer.
type RequestVoteRequest ¶
type RequestVoteRequest struct { Term uint64 LastLogIndex uint64 LastLogTerm uint64 CandidateName string // contains filtered or unexported fields }
The request sent to a server to vote for a candidate to become a leader.
type RequestVoteResponse ¶
type RequestVoteResponse struct { Term uint64 VoteGranted bool // contains filtered or unexported fields }
The response returned from a server after a vote for a candidate to become a leader.
type Server ¶
type Server interface { Name() string Context() interface{} StateMachine() StateMachine Leader() string State() string Path() string LogPath() string SnapshotPath(lastIndex uint64, lastTerm uint64) string Term() uint64 CommitIndex() uint64 VotedFor() string MemberCount() int QuorumSize() int IsLogEmpty() bool LogEntries() []*LogEntry LastCommandName() string GetState() string ElectionTimeout() time.Duration SetElectionTimeout(duration time.Duration) HeartbeatInterval() time.Duration SetHeartbeatInterval(duration time.Duration) Transporter() Transporter SetTransporter(t Transporter) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse RequestVote(req *RequestVoteRequest) *RequestVoteResponse RequestSnapshot(req *SnapshotRequest) *SnapshotResponse SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse AddPeer(name string, connectiongString string) error RemovePeer(name string) error Peers() map[string]*Peer Init() error Start() error Stop() Running() bool Do(command Command) (interface{}, error) TakeSnapshot() error LoadSnapshot() error AddEventListener(string, EventListener) FlushCommitIndex() }
A server is involved in the consensus protocol and can act as a follower, candidate or a leader.
func NewServer ¶
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error)
Creates a new server with a log at the given path. transporter must not be nil. stateMachine can be nil if snapshotting and log compaction is to be disabled. context can be anything (including nil) and is not used by the raft package except returned by Server.Context(). connectionString can be anything.
type Snapshot ¶
type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // Cluster configuration. Peers []*Peer `json:"peers"` State []byte `json:"state"` Path string `json:"path"` }
Snapshot represents an in-memory representation of the current state of the system.
type SnapshotRecoveryRequest ¶
type SnapshotRecoveryRequest struct { LeaderName string LastIndex uint64 LastTerm uint64 Peers []*Peer State []byte }
The request sent to a server to start from the snapshot.
type SnapshotRecoveryResponse ¶
The response returned from a server appending entries to the log.
type SnapshotRequest ¶
The request sent to a server to start from the snapshot.
type SnapshotResponse ¶
type SnapshotResponse struct {
Success bool `json:"success"`
}
The response returned if the follower entered snapshot state
type StateMachine ¶
StateMachine is the interface for allowing the host application to save and recovery the state machine. This makes it possible to make snapshots and compact the log.
type Transporter ¶
type Transporter interface { SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse }
Transporter is the interface for allowing the host application to transport requests to other nodes.