Documentation ¶
Index ¶
- Constants
- Variables
- type AppPanicError
- type Config
- type DownReplica
- type FatalError
- type Future
- type MultiTransport
- type RaftConfig
- type RaftServer
- func (rs *RaftServer) AppliedIndex(id uint64) uint64
- func (rs *RaftServer) ChangeMember(ctx context.Context, id uint64, changeType proto.ConfChangeType, ...) (future *Future)
- func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error
- func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)
- func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)
- func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)
- func (rs *RaftServer) IsLeader(id uint64) bool
- func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)
- func (rs *RaftServer) RemoveRaft(id uint64) error
- func (rs *RaftServer) Status(id uint64) (status *Status)
- func (rs *RaftServer) Stop()
- func (rs *RaftServer) Submit(ctx context.Context, id uint64, cmd []byte) (future *Future)
- func (rs *RaftServer) Truncate(id uint64, index uint64)
- func (rs *RaftServer) TryToLeader(ctx context.Context, id uint64) (future *Future)
- type ReplicaStatus
- type SocketResolver
- type SocketType
- type StateMachine
- type Status
- type Transport
- type TransportConfig
Constants ¶
const ( KB = 1 << (10 * iota) MB )
const NoLeader uint64 = 0
NoLeader is a placeholder nodeID used when there is no leader.
Variables ¶
var ( ErrCompacted = errors.New("requested index is unavailable due to compaction.") ErrRaftExists = errors.New("raft already exists.") ErrRaftNotExists = errors.New("raft not exists.") ErrNotLeader = errors.New("raft is not the leader.") ErrStopped = errors.New("raft is already shutdown.") ErrSnapping = errors.New("raft is doing snapshot.") ErrCanceled = errors.New("raft request canceled by caller") )
Functions ¶
This section is empty.
Types ¶
type AppPanicError ¶
type AppPanicError string
AppPanicError is panic error when repl occurred fatal error. The server will recover this panic and stop the shard repl.
func (*AppPanicError) Error ¶
func (pe *AppPanicError) Error() string
type Config ¶
type Config struct { TransportConfig // NodeID is the identity of the local node. NodeID cannot be 0. // This parameter is required. NodeID uint64 // TickInterval is the interval of timer which check heartbeat and election timeout. // The default value is 2s. TickInterval time.Duration // HeartbeatTick is the heartbeat interval. A leader sends heartbeat // message to maintain the leadership every heartbeat interval. // The default value is 2s. HeartbeatTick int // ElectionTick is the election timeout. If a follower does not receive any message // from the leader of current term during ElectionTick, it will become candidate and start an election. // ElectionTick must be greater than HeartbeatTick. // We suggest to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary leader switching. // The default value is 10s. ElectionTick int // MaxSizePerMsg limits the max size of each append message. // The default value is 1M. MaxSizePerMsg uint64 // MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase. // The application transportation layer usually has its own sending buffer over TCP/UDP. // Setting MaxInflightMsgs to avoid overflowing that sending buffer. // The default value is 128. MaxInflightMsgs int // ReqBufferSize limits the max number of recive request chan buffer. // The default value is 1024. ReqBufferSize int // AppBufferSize limits the max number of apply chan buffer. // The default value is 2048. AppBufferSize int // RetainLogs controls how many logs we leave after truncate. // This is used so that we can quickly replay logs on a follower instead of being forced to send an entire snapshot. // The default value is 20000. RetainLogs uint64 // LeaseCheck whether to use the lease mechanism. // The default value is false. LeaseCheck bool // contains filtered or unexported fields }
Config contains the parameters to start a raft server. Default: Do not use lease mechanism. NOTE: NodeID and Resolver must be required.Other parameter has default value.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with usable defaults.
type DownReplica ¶
DownReplica down replica
type FatalError ¶
type MultiTransport ¶
type MultiTransport struct {
// contains filtered or unexported fields
}
func (*MultiTransport) Send ¶
func (t *MultiTransport) Send(m *proto.Message)
func (*MultiTransport) SendSnapshot ¶
func (t *MultiTransport) SendSnapshot(m *proto.Message, rs *snapshotStatus)
func (*MultiTransport) Stop ¶
func (t *MultiTransport) Stop()
type RaftConfig ¶
type RaftConfig struct { ID uint64 Term uint64 Leader uint64 Applied uint64 Peers []proto.Peer Storage storage.Storage StateMachine StateMachine }
ReplConfig contains the parameters to create a replication.
type RaftServer ¶
type RaftServer struct {
// contains filtered or unexported fields
}
func NewRaftServer ¶
func NewRaftServer(config *Config) (*RaftServer, error)
func (*RaftServer) AppliedIndex ¶
func (rs *RaftServer) AppliedIndex(id uint64) uint64
func (*RaftServer) ChangeMember ¶
func (rs *RaftServer) ChangeMember(ctx context.Context, id uint64, changeType proto.ConfChangeType, peer proto.Peer, context []byte) (future *Future)
func (*RaftServer) CreateRaft ¶
func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error
func (*RaftServer) GetDownReplicas ¶
func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)
GetDownReplicas 获取down的副本
func (*RaftServer) GetPendingReplica ¶
func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)
GetPendingReplica get snapshot pending followers
func (*RaftServer) GetUnreachable ¶
func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)
func (*RaftServer) IsLeader ¶
func (rs *RaftServer) IsLeader(id uint64) bool
func (*RaftServer) LeaderTerm ¶
func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)
func (*RaftServer) RemoveRaft ¶
func (rs *RaftServer) RemoveRaft(id uint64) error
func (*RaftServer) Status ¶
func (rs *RaftServer) Status(id uint64) (status *Status)
func (*RaftServer) Stop ¶
func (rs *RaftServer) Stop()
func (*RaftServer) Truncate ¶
func (rs *RaftServer) Truncate(id uint64, index uint64)
func (*RaftServer) TryToLeader ¶
func (rs *RaftServer) TryToLeader(ctx context.Context, id uint64) (future *Future)
type ReplicaStatus ¶
type ReplicaStatus struct { Match uint64 // 复制进度 Commit uint64 // commmit位置 Next uint64 State string Snapshoting bool Paused bool Active bool LastActive time.Time Inflight int }
ReplicaStatus replica status
type SocketResolver ¶
type SocketResolver interface {
NodeAddress(nodeID uint64, stype SocketType) (addr string, err error)
}
The SocketResolver interface is supplied by the application to resolve NodeID to net.Addr addresses.
type SocketType ¶
type SocketType byte
const ( HeartBeat SocketType = 0 Replicate SocketType = 1 )
func (SocketType) String ¶
func (t SocketType) String() string
type StateMachine ¶
type StateMachine interface { Apply(command []byte, index uint64) (interface{}, error) ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error) Snapshot() (proto.Snapshot, error) ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error HandleFatalEvent(err *FatalError) HandleLeaderChange(leader uint64) }
The StateMachine interface is supplied by the application to persist/snapshot data of application.
type Status ¶
type Status struct { ID uint64 NodeID uint64 Leader uint64 Term uint64 Index uint64 Commit uint64 Applied uint64 Vote uint64 PendQueue int RecvQueue int AppQueue int Stopped bool RestoringSnapshot bool State string // leader、follower、candidate Replicas map[uint64]*ReplicaStatus }
Status raft status
type Transport ¶
type Transport interface { Send(m *proto.Message) SendSnapshot(m *proto.Message, rs *snapshotStatus) Stop() }
func NewMultiTransport ¶
func NewMultiTransport(raft *RaftServer, config *TransportConfig) (Transport, error)
type TransportConfig ¶
type TransportConfig struct { // HeartbeatAddr is the Heartbeat port. // The default value is 3016. HeartbeatAddr string // ReplicateAddr is the Replation port. // The default value is 2015. ReplicateAddr string // 发送队列大小 SendBufferSize int //复制并发数(node->node) MaxReplConcurrency int // MaxSnapConcurrency limits the max number of snapshot concurrency. // The default value is 10. MaxSnapConcurrency int // This parameter is required. Resolver SocketResolver }
Source Files ¶
- config.go
- errors.go
- future.go
- pool.go
- raft.go
- raft_fsm.go
- raft_fsm_candidate.go
- raft_fsm_follower.go
- raft_fsm_leader.go
- raft_fsm_state.go
- raft_log.go
- raft_log_unstable.go
- raft_replica.go
- raft_snapshot.go
- server.go
- statemachine.go
- status.go
- transport.go
- transport_heartbeat.go
- transport_multi.go
- transport_replicate.go
- transport_sender.go