raft

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2018 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KB = 1 << (10 * iota)
	MB
)
View Source
const NoLeader uint64 = 0

NoLeader is a placeholder nodeID used when there is no leader.

Variables

View Source
var (
	ErrCompacted     = errors.New("requested index is unavailable due to compaction.")
	ErrRaftExists    = errors.New("raft already exists.")
	ErrRaftNotExists = errors.New("raft not exists.")
	ErrNotLeader     = errors.New("raft is not the leader.")
	ErrStopped       = errors.New("raft is already shutdown.")
	ErrSnapping      = errors.New("raft is doing snapshot.")
	ErrCanceled      = errors.New("raft request canceled by caller")
)

Functions

This section is empty.

Types

type AppPanicError

type AppPanicError string

AppPanicError is panic error when repl occurred fatal error. The server will recover this panic and stop the shard repl.

func (*AppPanicError) Error

func (pe *AppPanicError) Error() string

type Config

type Config struct {
	TransportConfig
	// NodeID is the identity of the local node. NodeID cannot be 0.
	// This parameter is required.
	NodeID uint64
	// TickInterval is the interval of timer which check heartbeat and election timeout.
	// The default value is 2s.
	TickInterval time.Duration
	// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
	// message to maintain the leadership every heartbeat interval.
	// The default value is 2s.
	HeartbeatTick int
	// ElectionTick is the election timeout. If a follower does not receive any message
	// from the leader of current term during ElectionTick, it will become candidate and start an election.
	// ElectionTick must be greater than HeartbeatTick.
	// We suggest to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary leader switching.
	// The default value is 10s.
	ElectionTick int
	// MaxSizePerMsg limits the max size of each append message.
	// The default value is 1M.
	MaxSizePerMsg uint64
	// MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
	// The application transportation layer usually has its own sending buffer over TCP/UDP.
	// Setting MaxInflightMsgs to avoid overflowing that sending buffer.
	// The default value is 128.
	MaxInflightMsgs int
	// ReqBufferSize limits the max number of recive request chan buffer.
	// The default value is 1024.
	ReqBufferSize int
	// AppBufferSize limits the max number of apply chan buffer.
	// The default value is 2048.
	AppBufferSize int
	// RetainLogs controls how many logs we leave after truncate.
	// This is used so that we can quickly replay logs on a follower instead of being forced to send an entire snapshot.
	// The default value is 20000.
	RetainLogs uint64
	// LeaseCheck whether to use the lease mechanism.
	// The default value is false.
	LeaseCheck bool
	// contains filtered or unexported fields
}

Config contains the parameters to start a raft server. Default: Do not use lease mechanism. NOTE: NodeID and Resolver must be required.Other parameter has default value.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with usable defaults.

type DownReplica

type DownReplica struct {
	NodeID      uint64
	DownSeconds int
}

DownReplica down replica

type FatalError

type FatalError struct {
	ID  uint64
	Err error
}

type Future

type Future struct {
	// contains filtered or unexported fields
}

func (*Future) Response

func (f *Future) Response() (resp interface{}, err error)

type MultiTransport

type MultiTransport struct {
	// contains filtered or unexported fields
}

func (*MultiTransport) Send

func (t *MultiTransport) Send(m *proto.Message)

func (*MultiTransport) SendSnapshot

func (t *MultiTransport) SendSnapshot(m *proto.Message, rs *snapshotStatus)

func (*MultiTransport) Stop

func (t *MultiTransport) Stop()

type RaftConfig

type RaftConfig struct {
	ID           uint64
	Term         uint64
	Leader       uint64
	Applied      uint64
	Peers        []proto.Peer
	Storage      storage.Storage
	StateMachine StateMachine
}

ReplConfig contains the parameters to create a replication.

type RaftServer

type RaftServer struct {
	// contains filtered or unexported fields
}

func NewRaftServer

func NewRaftServer(config *Config) (*RaftServer, error)

func (*RaftServer) AppliedIndex

func (rs *RaftServer) AppliedIndex(id uint64) uint64

func (*RaftServer) ChangeMember

func (rs *RaftServer) ChangeMember(ctx context.Context, id uint64, changeType proto.ConfChangeType, peer proto.Peer, context []byte) (future *Future)

func (*RaftServer) CreateRaft

func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error

func (*RaftServer) GetDownReplicas

func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)

GetDownReplicas 获取down的副本

func (*RaftServer) GetPendingReplica

func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)

GetPendingReplica get snapshot pending followers

func (*RaftServer) GetUnreachable

func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)

func (*RaftServer) IsLeader

func (rs *RaftServer) IsLeader(id uint64) bool

func (*RaftServer) LeaderTerm

func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)

func (*RaftServer) RemoveRaft

func (rs *RaftServer) RemoveRaft(id uint64) error

func (*RaftServer) Status

func (rs *RaftServer) Status(id uint64) (status *Status)

func (*RaftServer) Stop

func (rs *RaftServer) Stop()

func (*RaftServer) Submit

func (rs *RaftServer) Submit(ctx context.Context, id uint64, cmd []byte) (future *Future)

func (*RaftServer) Truncate

func (rs *RaftServer) Truncate(id uint64, index uint64)

func (*RaftServer) TryToLeader

func (rs *RaftServer) TryToLeader(ctx context.Context, id uint64) (future *Future)

type ReplicaStatus

type ReplicaStatus struct {
	Match       uint64 // 复制进度
	Commit      uint64 // commmit位置
	Next        uint64
	State       string
	Snapshoting bool
	Paused      bool
	Active      bool
	LastActive  time.Time
	Inflight    int
}

ReplicaStatus replica status

type SocketResolver

type SocketResolver interface {
	NodeAddress(nodeID uint64, stype SocketType) (addr string, err error)
}

The SocketResolver interface is supplied by the application to resolve NodeID to net.Addr addresses.

type SocketType

type SocketType byte
const (
	HeartBeat SocketType = 0
	Replicate SocketType = 1
)

func (SocketType) String

func (t SocketType) String() string

type StateMachine

type StateMachine interface {
	Apply(command []byte, index uint64) (interface{}, error)
	ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
	Snapshot() (proto.Snapshot, error)
	ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error
	HandleFatalEvent(err *FatalError)
	HandleLeaderChange(leader uint64)
}

The StateMachine interface is supplied by the application to persist/snapshot data of application.

type Status

type Status struct {
	ID                uint64
	NodeID            uint64
	Leader            uint64
	Term              uint64
	Index             uint64
	Commit            uint64
	Applied           uint64
	Vote              uint64
	PendQueue         int
	RecvQueue         int
	AppQueue          int
	Stopped           bool
	RestoringSnapshot bool
	State             string // leader、follower、candidate
	Replicas          map[uint64]*ReplicaStatus
}

Status raft status

func (*Status) String

func (s *Status) String() string

type Transport

type Transport interface {
	Send(m *proto.Message)
	SendSnapshot(m *proto.Message, rs *snapshotStatus)
	Stop()
}

func NewMultiTransport

func NewMultiTransport(raft *RaftServer, config *TransportConfig) (Transport, error)

type TransportConfig

type TransportConfig struct {
	// HeartbeatAddr is the Heartbeat port.
	// The default value is 3016.
	HeartbeatAddr string
	// ReplicateAddr is the Replation port.
	// The default value is 2015.
	ReplicateAddr string
	// 发送队列大小
	SendBufferSize int
	//复制并发数(node->node)
	MaxReplConcurrency int
	// MaxSnapConcurrency limits the max number of snapshot concurrency.
	// The default value is 10.
	MaxSnapConcurrency int
	// This parameter is required.
	Resolver SocketResolver
}

Directories

Path Synopsis
wal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL