Documentation ¶
Overview ¶
Package rsm implements Replicated State Machines used in Dragonboat.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- type Commit
- type From
- type IManagedStateMachine
- type INodeProxy
- type ISnapshotter
- type ManagedStateMachineFactory
- type NativeStateMachine
- func (ds *NativeStateMachine) GetHash() uint64
- func (ds *NativeStateMachine) Loaded(from From)
- func (ds *NativeStateMachine) Lookup(data []byte) ([]byte, error)
- func (ds *NativeStateMachine) Offloaded(from From)
- func (ds *NativeStateMachine) RecoverFromSnapshot(fp string, files []statemachine.SnapshotFile) (err error)
- func (ds *NativeStateMachine) SaveSnapshot(fp string, collection statemachine.ISnapshotFileCollection) (uint64, error)
- func (ds *NativeStateMachine) Update(session *Session, seriesID uint64, data []byte) uint64
- type OffloadedStatus
- type RaftClientID
- type RaftSeriesID
- type SMFactoryFunc
- type Session
- type SessionManager
- func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result uint64)
- func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
- func (ds *SessionManager) GetSessionHash() uint64
- func (ds *SessionManager) LoadSessions(reader io.Reader) error
- func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
- func (ds *SessionManager) RegisterClientID(clientID uint64) uint64
- func (ds *SessionManager) SaveSessions(writer io.Writer) (uint64, error)
- func (ds *SessionManager) UnregisterClientID(clientID uint64) uint64
- func (ds *SessionManager) UpdateRequired(session *Session, seriesID uint64) (uint64, bool, bool)
- func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
- type SnapshotReader
- type SnapshotValidator
- type SnapshotWriter
- type StateMachine
- func (s *StateMachine) CommitC() chan Commit
- func (s *StateMachine) CommitChanBusy() bool
- func (s *StateMachine) GetBatchedLastApplied() uint64
- func (s *StateMachine) GetHash() uint64
- func (s *StateMachine) GetLastApplied() uint64
- func (s *StateMachine) GetMembership() (map[uint64]string, map[uint64]string, map[uint64]struct{}, uint64)
- func (s *StateMachine) GetMembershipHash() uint64
- func (s *StateMachine) GetSessionHash() uint64
- func (s *StateMachine) Handle(batch []Commit) (Commit, bool)
- func (s *StateMachine) Loaded(from From)
- func (s *StateMachine) Lookup(query []byte) ([]byte, error)
- func (s *StateMachine) Offloaded(from From)
- func (s *StateMachine) RecoverFromSnapshot(rec Commit) (uint64, error)
- func (s *StateMachine) SaveSnapshot() (*pb.Snapshot, *server.SnapshotEnv, error)
Constants ¶
const ( // SnapshotHeaderSize is the size of snapshot in number of bytes. SnapshotHeaderSize = settings.SnapshotHeaderSize )
Variables ¶
var ( // ErrSaveSnapshot indicates there is error when trying to save a snapshot ErrSaveSnapshot = errors.New("failed to save snapshot") // ErrRestoreSnapshot indicates there is error when trying to restore // from a snapshot ErrRestoreSnapshot = errors.New("failed to restore from snapshot") )
var ( // ErrClusterClosed indicates that the cluster has been closed ErrClusterClosed = errors.New("raft cluster already closed") )
var ( // LRUMaxSessionCount is the largest number of client sessions that can be // concurrently managed by a LRUSession instance. LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount )
Functions ¶
This section is empty.
Types ¶
type Commit ¶
type Commit struct { Index uint64 SnapshotAvailable bool InitialSnapshot bool SnapshotRequested bool Entries []pb.Entry }
Commit is the processing units that can be handled by StateMachines.
type From ¶
type From uint64
From identifies a component in the system.
const ( // FromNodeHost indicates the data store has been loaded by or offloaded from // nodehost. FromNodeHost From = iota // FromStepWorker indicates that the data store has been loaded by or // offloaded from the step worker. FromStepWorker // FromCommitWorker indicates that the data store has been loaded by or // offloaded from the commit worker. FromCommitWorker // FromSnapshotWorker indicates that the data store has been loaded by or // offloaded from the snapshot worker. FromSnapshotWorker )
type IManagedStateMachine ¶
type IManagedStateMachine interface { GetSessionHash() uint64 UpdateRespondedTo(*Session, uint64) UnregisterClientID(clientID uint64) uint64 RegisterClientID(clientID uint64) uint64 ClientRegistered(clientID uint64) (*Session, bool) UpdateRequired(*Session, uint64) (uint64, bool, bool) Update(*Session, uint64, []byte) uint64 Lookup([]byte) ([]byte, error) GetHash() uint64 SaveSnapshot(string, statemachine.ISnapshotFileCollection) (uint64, error) RecoverFromSnapshot(string, []statemachine.SnapshotFile) error Offloaded(From) Loaded(From) }
IManagedStateMachine is the interface used to manage data store.
func NewNativeStateMachine ¶
func NewNativeStateMachine(ds statemachine.IStateMachine, done <-chan struct{}) IManagedStateMachine
NewNativeStateMachine creates and returns a new NativeStateMachine object.
type INodeProxy ¶
type INodeProxy interface { RestoreRemotes(pb.Snapshot) ApplyUpdate(pb.Entry, uint64, bool, bool, bool) ApplyConfigChange(pb.ConfigChange) ConfigChangeProcessed(uint64, bool) NodeID() uint64 ClusterID() uint64 }
INodeProxy is the interface used as proxy to a nodehost.
type ISnapshotter ¶
type ISnapshotter interface { GetSnapshot(uint64) (pb.Snapshot, error) GetMostRecentSnapshot() (pb.Snapshot, error) GetFilePath(uint64) string Save(uint64, uint64, IManagedStateMachine) (*pb.Snapshot, *server.SnapshotEnv, error) IsNoSnapshotError(error) bool }
ISnapshotter is the interface for the snapshotter object.
type ManagedStateMachineFactory ¶
type ManagedStateMachineFactory func(clusterID uint64, nodeID uint64, stopc <-chan struct{}) IManagedStateMachine
ManagedStateMachineFactory is the factory function type for creating an IManagedStateMachine instance.
type NativeStateMachine ¶
type NativeStateMachine struct { OffloadedStatus SessionManager // contains filtered or unexported fields }
NativeStateMachine is the IManagedStateMachine object used to manage native data store in Golang.
func (*NativeStateMachine) GetHash ¶
func (ds *NativeStateMachine) GetHash() uint64
GetHash returns an integer value representing the state of the data store.
func (*NativeStateMachine) Loaded ¶
func (ds *NativeStateMachine) Loaded(from From)
Loaded marks the statemachine as loaded by the specified component.
func (*NativeStateMachine) Lookup ¶
func (ds *NativeStateMachine) Lookup(data []byte) ([]byte, error)
Lookup queries the data store.
func (*NativeStateMachine) Offloaded ¶
func (ds *NativeStateMachine) Offloaded(from From)
Offloaded offloads the data store from the specified part of the system.
func (*NativeStateMachine) RecoverFromSnapshot ¶
func (ds *NativeStateMachine) RecoverFromSnapshot(fp string, files []statemachine.SnapshotFile) (err error)
RecoverFromSnapshot recovers the state of the data store from the snapshot file specified by the fp input string.
func (*NativeStateMachine) SaveSnapshot ¶
func (ds *NativeStateMachine) SaveSnapshot(fp string, collection statemachine.ISnapshotFileCollection) (uint64, error)
SaveSnapshot saves the state of the data store to the snapshot file specified by the fp input string.
type OffloadedStatus ¶
type OffloadedStatus struct {
// contains filtered or unexported fields
}
OffloadedStatus is used for tracking whether the managed data store has been offloaded from various system components.
func (*OffloadedStatus) Destroyed ¶
func (o *OffloadedStatus) Destroyed() bool
Destroyed returns a boolean value indicating whether the belonging object has been destroyed.
func (*OffloadedStatus) ReadyToDestroy ¶
func (o *OffloadedStatus) ReadyToDestroy() bool
ReadyToDestroy returns a boolean value indicating whether the the managed data store is ready to be destroyed.
func (*OffloadedStatus) SetDestroyed ¶
func (o *OffloadedStatus) SetDestroyed()
SetDestroyed set the destroyed flag to be true
func (*OffloadedStatus) SetLoaded ¶
func (o *OffloadedStatus) SetLoaded(from From)
SetLoaded marks the managed data store as loaded from the specified component.
func (*OffloadedStatus) SetOffloaded ¶
func (o *OffloadedStatus) SetOffloaded(from From)
SetOffloaded marks the managed data store as offloaded from the specified component.
type RaftClientID ¶
type RaftClientID uint64
RaftClientID is the type used as client id in sessions.
func (*RaftClientID) Compare ¶
func (a *RaftClientID) Compare(b llrb.Comparable) int
Compare implements the llrb.Comparable interface.
type RaftSeriesID ¶
type RaftSeriesID uint64
RaftSeriesID is the type used as series id in sessions.
type SMFactoryFunc ¶
type SMFactoryFunc func(clusterID uint64, nodeID uint64, done <-chan struct{}) IManagedStateMachine
SMFactoryFunc is the function type for creating an IStateMachine instance
type Session ¶
type Session struct { ClientID RaftClientID RespondedUpTo RaftSeriesID History map[RaftSeriesID]uint64 }
Session is the session object maintained on the raft side.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager is the wrapper struct that implements client session related functionalites used in the IManagedStateMachine interface.
func NewSessionManager ¶
func NewSessionManager() SessionManager
NewSessionManager returns a new SessionManager instance.
func (*SessionManager) AddResponse ¶
func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result uint64)
AddResponse adds the specified result to the session.
func (*SessionManager) ClientRegistered ¶
func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
ClientRegistered returns whether the specified client exists in the system.
func (*SessionManager) GetSessionHash ¶
func (ds *SessionManager) GetSessionHash() uint64
GetSessionHash returns an uint64 integer representing the state of the session manager.
func (*SessionManager) LoadSessions ¶
func (ds *SessionManager) LoadSessions(reader io.Reader) error
LoadSessions loads and restores sessions from io.Reader.
func (*SessionManager) MustHaveClientSeries ¶
func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
MustHaveClientSeries checks whether the session manager contains a client session identified as clientID and whether it has seriesID responded.
func (*SessionManager) RegisterClientID ¶
func (ds *SessionManager) RegisterClientID(clientID uint64) uint64
RegisterClientID registers a new client, it returns the input client id if it is previously unknown, or 0 when the client has already been registered.
func (*SessionManager) SaveSessions ¶
func (ds *SessionManager) SaveSessions(writer io.Writer) (uint64, error)
SaveSessions saves the sessions to the provided io.writer.
func (*SessionManager) UnregisterClientID ¶
func (ds *SessionManager) UnregisterClientID(clientID uint64) uint64
UnregisterClientID removes the specified client session from the system. It returns the client id if the client is successfully removed, or 0 if the client session does not exist.
func (*SessionManager) UpdateRequired ¶
UpdateRequired return a tuple of request result, responded before, update required.
func (*SessionManager) UpdateRespondedTo ¶
func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
UpdateRespondedTo updates the responded to value of the specified client session.
type SnapshotReader ¶
type SnapshotReader struct {
// contains filtered or unexported fields
}
SnapshotReader is an io.Reader for reading from snapshot files.
func NewSnapshotReader ¶
func NewSnapshotReader(fp string) (*SnapshotReader, error)
NewSnapshotReader creates a new snapshot reader instance.
func (*SnapshotReader) Close ¶
func (sr *SnapshotReader) Close() error
Close closes the snapshot reader instance.
func (*SnapshotReader) GetHeader ¶
func (sr *SnapshotReader) GetHeader() (pb.SnapshotHeader, error)
GetHeader returns the snapshot header instance.
func (*SnapshotReader) Read ¶
func (sr *SnapshotReader) Read(data []byte) (int, error)
Read reads up to len(data) bytes from the snapshot file.
func (*SnapshotReader) ValidateHeader ¶
func (sr *SnapshotReader) ValidateHeader(header pb.SnapshotHeader)
ValidateHeader validates whether the header matches the header checksum recorded in the header.
func (*SnapshotReader) ValidatePayload ¶
func (sr *SnapshotReader) ValidatePayload(header pb.SnapshotHeader)
ValidatePayload validates whether the snapshot content matches the checksum recorded in the header.
type SnapshotValidator ¶
type SnapshotValidator struct {
// contains filtered or unexported fields
}
SnapshotValidator is the validator used to check incoming snapshot chunks.
func NewSnapshotValidator ¶
func NewSnapshotValidator() *SnapshotValidator
NewSnapshotValidator creates and returns a new SnapshotValidator instance.
func (*SnapshotValidator) AddChunk ¶
func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool
AddChunk adds a new snapshot chunk to the validator.
func (*SnapshotValidator) Validate ¶
func (v *SnapshotValidator) Validate() bool
Validate validates the added chunks and return a boolean flag indicating whether the snapshot chunks are valid.
type SnapshotWriter ¶
type SnapshotWriter struct {
// contains filtered or unexported fields
}
SnapshotWriter is an io.Writer used to write snapshot file.
func NewSnapshotWriter ¶
func NewSnapshotWriter(fp string) (*SnapshotWriter, error)
NewSnapshotWriter creates a new snapshot writer instance.
func (*SnapshotWriter) Close ¶
func (sw *SnapshotWriter) Close() error
Close closes the snapshot writer instance.
func (*SnapshotWriter) SaveHeader ¶
func (sw *SnapshotWriter) SaveHeader(smsz uint64, sz uint64) error
SaveHeader saves the snapshot header to the snapshot.
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
StateMachine is a manager class that manages application state machine
func NewStateMachine ¶
func NewStateMachine(sm IManagedStateMachine, snapshotter ISnapshotter, ordered bool, proxy INodeProxy) *StateMachine
NewStateMachine creates a new application state machine object.
func (*StateMachine) CommitC ¶
func (s *StateMachine) CommitC() chan Commit
CommitC returns the commit channel.
func (*StateMachine) CommitChanBusy ¶
func (s *StateMachine) CommitChanBusy() bool
CommitChanBusy returns whether the CommitC chan is busy. Busy is defined as having more than half of its buffer occupied.
func (*StateMachine) GetBatchedLastApplied ¶
func (s *StateMachine) GetBatchedLastApplied() uint64
GetBatchedLastApplied returns the batched last applied value.
func (*StateMachine) GetHash ¶
func (s *StateMachine) GetHash() uint64
GetHash returns the state machine hash.
func (*StateMachine) GetLastApplied ¶
func (s *StateMachine) GetLastApplied() uint64
GetLastApplied returns the last applied value.
func (*StateMachine) GetMembership ¶
func (s *StateMachine) GetMembership() (map[uint64]string, map[uint64]string, map[uint64]struct{}, uint64)
GetMembership returns the membership info maintained by the state machine.
func (*StateMachine) GetMembershipHash ¶
func (s *StateMachine) GetMembershipHash() uint64
GetMembershipHash returns the hash of the membership instance.
func (*StateMachine) GetSessionHash ¶
func (s *StateMachine) GetSessionHash() uint64
GetSessionHash returns the session hash.
func (*StateMachine) Handle ¶
func (s *StateMachine) Handle(batch []Commit) (Commit, bool)
Handle pulls the committed record and apply it if there is any available.
func (*StateMachine) Loaded ¶
func (s *StateMachine) Loaded(from From)
Loaded marks the state machine as loaded from the specified component.
func (*StateMachine) Lookup ¶
func (s *StateMachine) Lookup(query []byte) ([]byte, error)
Lookup performances local lookup on the data store.
func (*StateMachine) Offloaded ¶
func (s *StateMachine) Offloaded(from From)
Offloaded marks the state machine as offloaded from the specified component.
func (*StateMachine) RecoverFromSnapshot ¶
func (s *StateMachine) RecoverFromSnapshot(rec Commit) (uint64, error)
RecoverFromSnapshot applies the snapshot.
func (*StateMachine) SaveSnapshot ¶
func (s *StateMachine) SaveSnapshot() (*pb.Snapshot, *server.SnapshotEnv, error)
SaveSnapshot creates a snapshot.