raft

package
v0.0.0-...-e36a8a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2024 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotImplemented        = errors.New("Not Implemented")
	ErrTimeoutLeaderTransfer = errors.New("request timed out, leader transfer took too long")
)
View Source
var CtxExpectedTermKey = ctxExpectedTerm{}
View Source
var CtxReqDoneCallback = ctxReqDoneCallback{}
View Source
var ErrStopped = errors.New("stopped")

Functions

func ErrSkipEntry

func ErrSkipEntry() error

func NewMemberUnaryInterceptor

func NewMemberUnaryInterceptor(cs ClusterServer) grpc.UnaryServerInterceptor

func WithExpectedTerm

func WithExpectedTerm(ctx context.Context, term uint64) context.Context

func WithReqDoneCallback

func WithReqDoneCallback(ctx context.Context, cb ReqDoneCallback) context.Context

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(storage Storage) *Channel

func (*Channel) Close

func (c *Channel) Close()

func (*Channel) Describe

func (c *Channel) Describe(w io.Writer)

func (*Channel) Invoke

func (c *Channel) Invoke(ctx context.Context, method string, req, resp interface{}) error

func (*Channel) ResetConn

func (c *Channel) ResetConn()

func (*Channel) Update

func (c *Channel) Update(lead PeerID)

type ClusterServer

type ClusterServer = *clusterServer

func NewClusterServer

func NewClusterServer(config ServerConfig) ClusterServer

type ConfChangeContext

type ConfChangeContext struct {
	refs.Member
	IsPromote bool
}

type Config

type Config = *config

func NewConfig

func NewConfig() Config

type DoingRequest

type DoingRequest interface {
	Done() <-chan struct{}
	TermIndex() (uint64, uint64)
	Err() error
}

type Entries

type Entries []pb.Entry

func (Entries) String

func (es Entries) String() string

type Entry

type Entry pb.Entry

func (*Entry) String

func (e *Entry) String() string

type Executor

type Executor interface {
	Runner

	OnLeaderStart(term uint64)
	OnLeaderStop()

	OnEntry(entry *pb.Entry) error
	OnConfState(index uint64, state pb.ConfState, members []refs.Member, opType pb.ConfChangeType) error

	OnSnapshot(snapsnot pb.Snapshot, srcId PeerID) error
}

func StartExecutor

func StartExecutor(
	raft Raft,
	sm StateMachine,
) (Executor, error)

type InitialState

type InitialState struct {
	AppliedIndex uint64

	ConfState pb.ConfState
	ConfIndex uint64

	LocalID uint64
	Members []refs.Member
}

type Key

type Key = plumbing.ReferenceName

type MaintenanceServer

type MaintenanceServer = *maintenanceServer

func NewMaintenanceServer

func NewMaintenanceServer(config ServerConfig) MaintenanceServer

type MapLocker

type MapLocker = *mapLocker

func NewMapLocker

func NewMapLocker() MapLocker

type MemberStatus

type MemberStatus struct {
	ID        string              `json:"id"`
	Lead      string              `json:"lead"`
	Commit    uint64              `json:"commit"`
	RaftState string              `json:"raftState"`
	Progress  map[string]Tracker  `json:"progress"`
	Members   map[string][]string `json:"members"`
}

func (MemberStatus) String

func (s MemberStatus) String() string

type Message

type Message pb.Message

func (*Message) String

func (m *Message) String() string

type MockStorage

type MockStorage struct {
	mock.Mock
}

MockStorage is an autogenerated mock type for the Storage type

func (*MockStorage) Describe

func (_m *MockStorage) Describe(w io.Writer)

Describe provides a mock function with given fields: w

func (*MockStorage) Entries

func (_m *MockStorage) Entries(lo uint64, hi uint64, maxSize uint64) ([]raftpb.Entry, error)

Entries provides a mock function with given fields: lo, hi, maxSize

func (*MockStorage) FirstIndex

func (_m *MockStorage) FirstIndex() (uint64, error)

FirstIndex provides a mock function with given fields:

func (*MockStorage) GetAllMemberURLs

func (_m *MockStorage) GetAllMemberURLs() (map[refs.PeerID][]string, error)

GetAllMemberURLs provides a mock function with given fields:

func (*MockStorage) GetAllRefs

func (_m *MockStorage) GetAllRefs() (map[string]refs.Hash, error)

GetAllRefs provides a mock function with given fields:

func (*MockStorage) GetConfState

func (_m *MockStorage) GetConfState() (*raftpb.ConfState, error)

GetConfState provides a mock function with given fields:

func (*MockStorage) GetInitState

func (_m *MockStorage) GetInitState() (*InitialState, error)

GetInitState provides a mock function with given fields:

func (*MockStorage) GetLeaderTerm

func (_m *MockStorage) GetLeaderTerm() uint64

GetLeaderTerm provides a mock function with given fields:

func (*MockStorage) GetURLsByMemberID

func (_m *MockStorage) GetURLsByMemberID(id refs.PeerID) ([]string, error)

GetURLsByMemberID provides a mock function with given fields: id

func (*MockStorage) InitialState

func (_m *MockStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error)

InitialState provides a mock function with given fields:

func (*MockStorage) LastIndex

func (_m *MockStorage) LastIndex() (uint64, error)

LastIndex provides a mock function with given fields:

func (*MockStorage) OnConfIndexChange

func (_m *MockStorage) OnConfIndexChange(confIndex uint64)

OnConfIndexChange provides a mock function with given fields: confIndex

func (*MockStorage) Save

func (_m *MockStorage) Save(hardState raftpb.HardState, entries []raftpb.Entry) error

Save provides a mock function with given fields: hardState, entries

func (*MockStorage) Snapshot

func (_m *MockStorage) Snapshot() (raftpb.Snapshot, error)

Snapshot provides a mock function with given fields:

func (*MockStorage) Term

func (_m *MockStorage) Term(i uint64) (uint64, error)

Term provides a mock function with given fields: i

type Node

type Node interface {
	Handler() http.Handler
	ClusterService() ClusterServer
	MaintenanceService() MaintenanceServer
	InitRouter(mux *http.ServeMux)
	GetStatus(ctx context.Context) (*Status, error)
	ReadIndex(ctx context.Context) (uint64, error)
	Propose(ctx context.Context, cmds []*packp.Command, pack []byte, handle refs.ReqHandle) (DoingRequest, error)
	BeginTx(initer TxIniter) (*Tx, error)
}

A key-value stream backed by raft

func RunNode

func RunNode(c Config,
	opts ...NodeOptions,
) (Node, error)

type NodeOptions

type NodeOptions interface {
	// contains filtered or unexported methods
}

func WithDialOptions

func WithDialOptions(options ...grpc.DialOption) NodeOptions

func WithNewMemberID

func WithNewMemberID(fn func(peerURLs []string) refs.PeerID) NodeOptions

func WithReadIndexTimeout

func WithReadIndexTimeout(d time.Duration) NodeOptions

func WithTLSInfo

func WithTLSInfo(tlsInfo transport.TLSInfo) NodeOptions

func WithTxnLocker

func WithTxnLocker(txnLocker MapLocker) NodeOptions

type OpEvent

type OpEvent struct {
	Term  uint64
	Index uint64
	Ops   []*refs.Oplog_Op
}

type PeerID

type PeerID = refs.PeerID

type Raft

type Raft interface {
	rafthttp.Raft

	Runner
	Start(node *raft.RawNode, readyHandler ReadyHandler, d time.Duration)

	InitRouter(mux *http.ServeMux)

	Propose(ctx context.Context, cmds []*packp.Command, pack []byte, handle refs.ReqHandle) (DoingRequest, error)

	Describe(w io.Writer)
	// contains filtered or unexported methods
}

func NewRaft

func NewRaft(config Config,
	opts ...NodeOptions) Raft

type ReadyHandler

type ReadyHandler interface {
	Runner

	InitRouter(mux *http.ServeMux)

	ReadyC() chan<- <-chan *raft.Ready
	AdvanceC() <-chan struct{}
}

type ReqDoneCallback

type ReqDoneCallback func(err error)

type RequestContextManager

type RequestContextManager = *requestContextManager

func NewRequestContextManager

func NewRequestContextManager() RequestContextManager

type RpcChannel

type RpcChannel interface {
	Invoke(ctx context.Context, method string, req, resp interface{}) error
	Update(lead PeerID)
	ResetConn()
	Describe(w io.Writer)
	Close()
}

type Runner

type Runner interface {
	Stop()
	Done() <-chan struct{}
	Error() error
}

func StartRunner

func StartRunner(fn RunnerFunc) Runner

type RunnerFunc

type RunnerFunc = func(stopC <-chan struct{}) error

type ServerConfig

type ServerConfig struct {
	// contains filtered or unexported fields
}

type Snapshot

type Snapshot pb.Snapshot

func (*Snapshot) String

func (m *Snapshot) String() string

type StateMachine

type StateMachine interface {
	OnStart() error

	OnLeaderStart(term uint64)
	OnLeaderStop()

	OnApply(term, index uint64, oplog *refs.Oplog, handle refs.ReqHandle) error
	OnConfState(index uint64, confState pb.ConfState, members []refs.Member, opType pb.ConfChangeType) error

	OnSnapshot(snapshot pb.Snapshot, srcId PeerID) error

	StartSubscriber(ctx context.Context, appliedIndex uint64, eventCh chan<- OpEvent) error

	WaitForApplyIndex(ctx context.Context, index uint64) error
}

type Status

type Status raft.Status

func (Status) MarshalJSON

func (s Status) MarshalJSON() ([]byte, error)

func (Status) MemberStatus

func (s Status) MemberStatus(
	getMemberURLs func() (map[PeerID][]string, error),
) (*MemberStatus, error)

func (Status) String

func (s Status) String() string

type Storage

type Storage interface {
	raft.Storage

	GetInitState() (*InitialState, error)
	Save(hardState pb.HardState, entries []pb.Entry) error

	GetLeaderTerm() uint64
	GetAllRefs() (map[string]refs.Hash, error)
	GetAllMemberURLs() (map[PeerID][]string, error)
	GetURLsByMemberID(id PeerID) ([]string, error)

	GetConfState() (*pb.ConfState, error)

	Describe(w io.Writer)

	OnConfIndexChange(confIndex uint64)
}

type Tracker

type Tracker struct {
	Match     uint64 `json:"match"`
	Next      uint64 `json:"next"`
	State     string `json:"state"`
	IsLearner bool   `json:"isLearner"`
}

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

func (*Tx) Close

func (t *Tx) Close()

func (*Tx) Commit

func (t *Tx) Commit(ctx context.Context, pack []byte, handle refs.ReqHandle) (DoingRequest, error)

func (*Tx) Get

func (t *Tx) Get(refName plumbing.ReferenceName) *plumbing.Hash

func (*Tx) Set

func (t *Tx) Set(refName plumbing.ReferenceName, hash plumbing.Hash) bool

type TxIniter

type TxIniter func(
	txnLocker MapLocker,
	storage Storage,
) (
	map[plumbing.ReferenceName]plumbing.Hash,
	bool,
	Unlocker,
	error,
)

type Unlocker

type Unlocker func()

func LockGlobal

func LockGlobal(
	ctx context.Context,
	txnLocker MapLocker,
	lockErr error,
	storage Storage) (
	map[plumbing.ReferenceName]plumbing.Hash,
	bool,
	Unlocker,
	error)

func LockRefList

func LockRefList(
	ctx context.Context,
	txnLocker MapLocker,
	storage Storage,
	refName plumbing.ReferenceName, refNames ...plumbing.ReferenceName) (
	map[plumbing.ReferenceName]plumbing.Hash,
	bool,
	Unlocker,
	error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL