Documentation ¶
Overview ¶
Package rsm implements Replicated State Machines used in Dragonboat.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- func GetDefaultChecksum() hash.Hash
- func GetEmptyLRUSession() []byte
- func GetEncodedPayload(ct dio.CompressionType, cmd []byte, dst []byte) []byte
- func GetEntryPayload(e pb.Entry) []byte
- func GetMaxBlockSize(ct config.CompressionType) uint64
- func GetV2PayloadChecksum(fp string, fs vfs.IFS) (crc []byte, err error)
- func GetV2PayloadSize(sz uint64) uint64
- func GetWitnessSnapshot(fs vfs.IFS) ([]byte, error)
- func IsShrinkedSnapshotFile(fp string, fs vfs.IFS) (shrunk bool, err error)
- func ReplaceSnapshotFile(newFp string, fp string, fs vfs.IFS) error
- func ShrinkSnapshot(fp string, newFp string, fs vfs.IFS) (err error)
- func ToDioCompressionType(ct config.CompressionType) dio.CompressionType
- type BlockWriter
- type ChunkWriter
- type ConcurrentStateMachine
- func (s *ConcurrentStateMachine) Close() error
- func (s *ConcurrentStateMachine) Concurrent() bool
- func (s *ConcurrentStateMachine) GetHash() (uint64, error)
- func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)
- func (s *ConcurrentStateMachine) OnDisk() bool
- func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (s *ConcurrentStateMachine) Prepare() (interface{}, error)
- func (s *ConcurrentStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (s *ConcurrentStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (s *ConcurrentStateMachine) Sync() error
- func (s *ConcurrentStateMachine) Type() pb.StateMachineType
- func (s *ConcurrentStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type Files
- type From
- type IBlockWriter
- type ILoadable
- type IManagedStateMachine
- type INode
- type IRecoverable
- type ISavable
- type ISnapshotter
- type IStateMachine
- type IStreamable
- type ITestFS
- type IVReader
- type IVValidator
- type IVWriter
- type ManagedStateMachineFactory
- type NativeSM
- func (ds *NativeSM) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error)
- func (ds *NativeSM) Concurrent() bool
- func (ds *NativeSM) DestroyedC() <-chan struct{}
- func (ds *NativeSM) GetHash() (uint64, error)
- func (ds *NativeSM) Loaded(from From)
- func (ds *NativeSM) Lookup(query interface{}) (interface{}, error)
- func (ds *NativeSM) NALookup(query []byte) ([]byte, error)
- func (ds *NativeSM) Offloaded(from From) bool
- func (ds *NativeSM) OnDisk() bool
- func (ds *NativeSM) Open() (uint64, error)
- func (ds *NativeSM) Prepare() (interface{}, error)
- func (ds *NativeSM) Recover(r io.Reader, files []sm.SnapshotFile) error
- func (ds *NativeSM) Save(meta *SSMeta, w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)
- func (ds *NativeSM) Stream(ssctx interface{}, w io.Writer) error
- func (ds *NativeSM) Sync() error
- func (ds *NativeSM) Type() pb.StateMachineType
- func (ds *NativeSM) Update(e sm.Entry) (sm.Result, error)
- type OffloadedStatus
- type OnDiskStateMachine
- func (s *OnDiskStateMachine) Close() error
- func (s *OnDiskStateMachine) Concurrent() bool
- func (s *OnDiskStateMachine) GetHash() (uint64, error)
- func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)
- func (s *OnDiskStateMachine) OnDisk() bool
- func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (s *OnDiskStateMachine) Prepare() (interface{}, error)
- func (s *OnDiskStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (s *OnDiskStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)
- func (s *OnDiskStateMachine) Sync() error
- func (s *OnDiskStateMachine) Type() pb.StateMachineType
- func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type RaftClientID
- type RaftSeriesID
- type RegularStateMachine
- func (s *RegularStateMachine) Close() error
- func (s *RegularStateMachine) Concurrent() bool
- func (s *RegularStateMachine) GetHash() (uint64, error)
- func (s *RegularStateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *RegularStateMachine) NALookup(query []byte) ([]byte, error)
- func (s *RegularStateMachine) OnDisk() bool
- func (s *RegularStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (s *RegularStateMachine) Prepare() (interface{}, error)
- func (s *RegularStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (s *RegularStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (s *RegularStateMachine) Sync() error
- func (s *RegularStateMachine) Type() pb.StateMachineType
- func (s *RegularStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type SMFactoryFunc
- type SSMeta
- type SSReqType
- type SSRequest
- type SSVersion
- type Session
- type SessionManager
- func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result sm.Result)
- func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
- func (ds *SessionManager) GetSessionHash() uint64
- func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error
- func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
- func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result
- func (ds *SessionManager) SaveSessions(writer io.Writer) error
- func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result
- func (ds *SessionManager) UpdateRequired(session *Session, seriesID uint64) (sm.Result, bool, bool)
- func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
- type SnapshotReader
- type SnapshotValidator
- type SnapshotWriter
- type StateMachine
- func (s *StateMachine) Concurrent() bool
- func (s *StateMachine) DestroyedC() <-chan struct{}
- func (s *StateMachine) GetBatchedLastApplied() uint64
- func (s *StateMachine) GetHash() (uint64, error)
- func (s *StateMachine) GetLastApplied() uint64
- func (s *StateMachine) GetMembership() pb.Membership
- func (s *StateMachine) GetMembershipHash() uint64
- func (s *StateMachine) GetSessionHash() uint64
- func (s *StateMachine) GetSyncedIndex() uint64
- func (s *StateMachine) Handle(batch []Task, apply []sm.Entry) (Task, error)
- func (s *StateMachine) Loaded(from From)
- func (s *StateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *StateMachine) NALookup(query []byte) ([]byte, error)
- func (s *StateMachine) Offloaded(from From) bool
- func (s *StateMachine) OnDiskStateMachine() bool
- func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)
- func (s *StateMachine) ReadyToStreamSnapshot() bool
- func (s *StateMachine) RecoverFromSnapshot(t Task) (uint64, error)
- func (s *StateMachine) SaveSnapshot(req SSRequest) (*pb.Snapshot, *server.SSEnv, error)
- func (s *StateMachine) SetBatchedLastApplied(index uint64)
- func (s *StateMachine) StreamSnapshot(sink pb.IChunkSink) error
- func (s *StateMachine) Sync() error
- func (s *StateMachine) TaskChanBusy() bool
- func (s *StateMachine) TaskQ() *TaskQueue
- type Task
- type TaskQueue
Constants ¶
const ( EEHeaderSize uint8 = 1 EEVersion uint8 = 0 << 4 EEV0 uint8 = 0 << 4 // for V0 format, entries with empty payload will cause panic as such // entries always have their TYPE value set to ApplicationEntry EENoCompression uint8 = 0 << 1 EESnappy uint8 = 1 << 1 EENoSession uint8 = 0 EEHasSession uint8 = 1 // uncompressed size is binary.Uvarint encoded EEV0SizeOffset int = 1 )
Entry Cmd format when Type = pb.EncodedEntry
|Version|CompressionFlag|SessionFlag| | 4Bits | 3Bits | 1Bit |
const ( // ChunkSize is the size of each snapshot chunk. ChunkSize = settings.SnapshotChunkSize )
const ( // EmptyClientSessionLength defines the length of an empty sessions instance. EmptyClientSessionLength uint64 = 16 )
Variables ¶
var ( // ErrTestKnobReturn is the error returned when returned earlier due to test // knob. ErrTestKnobReturn = errors.New("returned earlier due to test knob") // ErrRestoreSnapshot indicates there is error when trying to restore // from a snapshot ErrRestoreSnapshot = errors.New("failed to restore from snapshot") )
var ( // ErrClusterClosed indicates that the cluster has been closed ErrClusterClosed = errors.New("raft cluster already closed") )
var ( // LRUMaxSessionCount is the largest number of client sessions that can be // concurrently managed by a LRUSession instance. LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount )
Functions ¶
func GetDefaultChecksum ¶
GetDefaultChecksum returns the default hash.Hash instance.
func GetEmptyLRUSession ¶
func GetEmptyLRUSession() []byte
GetEmptyLRUSession returns an marshaled empty sessions instance.
func GetEncodedPayload ¶
func GetEncodedPayload(ct dio.CompressionType, cmd []byte, dst []byte) []byte
GetEncodedPayload returns the encoded payload using the specified compression type and the default encoded entry version.
func GetEntryPayload ¶
GetEntryPayload returns the payload of the entry ready to be applied into the state machine.
func GetMaxBlockSize ¶
func GetMaxBlockSize(ct config.CompressionType) uint64
GetMaxBlockSize returns the maximum block length supported by the specified compression type.
func GetV2PayloadChecksum ¶
GetV2PayloadChecksum calculates the payload checksum of the specified snapshot file.
func GetV2PayloadSize ¶
GetV2PayloadSize returns the actual on disk size for the input user payload size.
func GetWitnessSnapshot ¶
GetWitnessSnapshot returns the content of a witness snapshot.
func IsShrinkedSnapshotFile ¶
IsShrinkedSnapshotFile returns a boolean flag indicating whether the specified snapshot file is already shrunk.
func ReplaceSnapshotFile ¶
ReplaceSnapshotFile replace the specified snapshot file with the shrunk version atomically.
func ShrinkSnapshot ¶
ShrinkSnapshot shrinks the specified snapshot file and save the generated shrunk version to the path specified by newFp.
func ToDioCompressionType ¶
func ToDioCompressionType(ct config.CompressionType) dio.CompressionType
ToDioCompressionType converts the CompressionType type defined in the config package to the CompressionType value defined in the dio package.
Types ¶
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter is a writer type that writes the input data to the underlying storage with checksum appended at the end of each block.
func NewBlockWriter ¶
func NewBlockWriter(blockSize uint64, onNewBlock func(data []byte, crc []byte) error, t pb.ChecksumType) *BlockWriter
NewBlockWriter creates and returns a block writer.
func (*BlockWriter) Flush ¶
func (bw *BlockWriter) Flush() error
Flush writes all in memory buffered data.
func (*BlockWriter) GetPayloadChecksum ¶
func (bw *BlockWriter) GetPayloadChecksum() []byte
GetPayloadChecksum returns the checksum for the entire payload.
type ChunkWriter ¶
type ChunkWriter struct {
// contains filtered or unexported fields
}
ChunkWriter is an io.WriteCloser type that streams snapshot chunks to its intended remote nodes.
func NewChunkWriter ¶
func NewChunkWriter(sink pb.IChunkSink, meta *SSMeta) *ChunkWriter
NewChunkWriter creates and returns a chunk writer instance.
type ConcurrentStateMachine ¶
type ConcurrentStateMachine struct {
// contains filtered or unexported fields
}
ConcurrentStateMachine is an IStateMachine type capable of taking concurrent snapshots.
func NewConcurrentStateMachine ¶
func NewConcurrentStateMachine(s sm.IConcurrentStateMachine) *ConcurrentStateMachine
NewConcurrentStateMachine creates a new ConcurrentStateMachine instance.
func (*ConcurrentStateMachine) Close ¶
func (s *ConcurrentStateMachine) Close() error
Close closes the state machine.
func (*ConcurrentStateMachine) Concurrent ¶
func (s *ConcurrentStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*ConcurrentStateMachine) GetHash ¶
func (s *ConcurrentStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*ConcurrentStateMachine) Lookup ¶
func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*ConcurrentStateMachine) NALookup ¶
func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*ConcurrentStateMachine) OnDisk ¶
func (s *ConcurrentStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is a on disk state machine.
func (*ConcurrentStateMachine) Open ¶
func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*ConcurrentStateMachine) Prepare ¶
func (s *ConcurrentStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*ConcurrentStateMachine) Recover ¶
func (s *ConcurrentStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*ConcurrentStateMachine) Save ¶
func (s *ConcurrentStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*ConcurrentStateMachine) Sync ¶
func (s *ConcurrentStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*ConcurrentStateMachine) Type ¶
func (s *ConcurrentStateMachine) Type() pb.StateMachineType
StateMachineType returns the type of the state machine.
type Files ¶
type Files struct {
// contains filtered or unexported fields
}
Files is a collection of external files specified by the SaveSnapshot method of the state machine type.
func NewFileCollection ¶
func NewFileCollection() *Files
NewFileCollection creates and returns a Files instance.
func (*Files) GetFileAt ¶
func (fc *Files) GetFileAt(idx uint64) *pb.SnapshotFile
GetFileAt returns the specified file.
func (*Files) PrepareFiles ¶
PrepareFiles finalize the external files added to the collection.
type From ¶
type From uint64
From identifies a component in the system.
const ( // FromNodeHost indicates the data store has been loaded by or offloaded from // nodehost. FromNodeHost From = iota // FromStepWorker indicates that the data store has been loaded by or // offloaded from the step worker. FromStepWorker // FromCommitWorker indicates that the data store has been loaded by or // offloaded from the commit worker. FromCommitWorker // FromApplyWorker indicates that the data store has been loaded by or // offloaded from the apply worker. FromApplyWorker // FromSnapshotWorker indicates that the data store has been loaded by or // offloaded from the snapshot worker. FromSnapshotWorker )
type IBlockWriter ¶
type IBlockWriter interface { Write(bs []byte) (int, error) Flush() error GetPayloadChecksum() []byte }
IBlockWriter is the interface for writing checksumed data blocks.
type ILoadable ¶
ILoadable is the interface for types that can load client session state from a snapshot.
type IManagedStateMachine ¶
type IManagedStateMachine interface { Open() (uint64, error) Update(sm.Entry) (sm.Result, error) BatchedUpdate([]sm.Entry) ([]sm.Entry, error) Lookup(interface{}) (interface{}, error) NALookup([]byte) ([]byte, error) Sync() error GetHash() (uint64, error) Prepare() (interface{}, error) Save(*SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) Recover(io.Reader, []sm.SnapshotFile) error Stream(interface{}, io.Writer) error Offloaded(From) bool Loaded(From) DestroyedC() <-chan struct{} Concurrent() bool OnDisk() bool Type() pb.StateMachineType }
IManagedStateMachine is the interface used to manage data store.
func NewNativeSM ¶
func NewNativeSM(config config.Config, ism IStateMachine, done <-chan struct{}) IManagedStateMachine
NewNativeSM creates and returns a new NativeSM object.
type INode ¶
type INode interface { StepReady() RestoreRemotes(pb.Snapshot) ApplyUpdate(pb.Entry, sm.Result, bool, bool, bool) ApplyConfigChange(pb.ConfigChange) ConfigChangeProcessed(uint64, bool) NodeID() uint64 ClusterID() uint64 ShouldStop() <-chan struct{} }
INode is the interface of a dragonboat node.
type IRecoverable ¶
type IRecoverable interface {
Recover(io.Reader, []sm.SnapshotFile) error
}
IRecoverable is the interface for types that can have its state restored from snapshots.
type ISavable ¶
type ISavable interface {
Save(*SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
}
ISavable is the interface for types that can its content saved as snapshots.
type ISnapshotter ¶
type ISnapshotter interface { GetSnapshot(uint64) (pb.Snapshot, error) GetMostRecentSnapshot() (pb.Snapshot, error) GetFilePath(uint64) string Stream(IStreamable, *SSMeta, pb.IChunkSink) error Save(ISavable, *SSMeta) (*pb.Snapshot, *server.SSEnv, error) Load(ILoadable, IRecoverable, string, []sm.SnapshotFile) error IsNoSnapshotError(error) bool }
ISnapshotter is the interface for the snapshotter object.
type IStateMachine ¶
type IStateMachine interface { Open(<-chan struct{}) (uint64, error) Update(entries []sm.Entry) ([]sm.Entry, error) Lookup(query interface{}) (interface{}, error) NALookup(query []byte) ([]byte, error) Sync() error Prepare() (interface{}, error) Save(interface{}, io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error Recover(io.Reader, []sm.SnapshotFile, <-chan struct{}) error Close() error GetHash() (uint64, error) Concurrent() bool OnDisk() bool Type() pb.StateMachineType }
IStateMachine is an adapter interface for underlying IStateMachine or IConcurrentStateMachine instances.
type IStreamable ¶
IStreamable is the interface for types that can be snapshot streamed.
type IVValidator ¶
IVValidator is the interface for versioned validator.
type IVWriter ¶
type IVWriter interface { Write(data []byte) (int, error) GetVersion() SSVersion GetPayloadSum() []byte GetPayloadSize(uint64) uint64 Flush() error }
IVWriter is the interface for versioned snapshot writer.
type ManagedStateMachineFactory ¶
type ManagedStateMachineFactory func(clusterID uint64, nodeID uint64, stopc <-chan struct{}) IManagedStateMachine
ManagedStateMachineFactory is the factory function type for creating an IManagedStateMachine instance.
type NativeSM ¶
type NativeSM struct { OffloadedStatus // contains filtered or unexported fields }
NativeSM is the IManagedStateMachine object used to manage native data store in Golang.
func (*NativeSM) BatchedUpdate ¶
BatchedUpdate applies committed entries in a batch to hide latency.
func (*NativeSM) Concurrent ¶
Concurrent returns a boolean flag to indicate whether the managed state machine instance is capable of doing concurrent snapshots.
func (*NativeSM) DestroyedC ¶
func (ds *NativeSM) DestroyedC() <-chan struct{}
DestroyedC returns a chan struct{} used to indicate whether the SM has been fully offloaded.
func (*NativeSM) GetHash ¶
GetHash returns an integer value representing the state of the data store.
func (*NativeSM) Offloaded ¶
Offloaded offloads the data store from the specified part of the system.
func (*NativeSM) OnDisk ¶
OnDisk returns a boolean flag indicating whether the state machine is an on disk state machine.
func (*NativeSM) Save ¶
func (ds *NativeSM) Save(meta *SSMeta, w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)
Save saves the state of the data store to the specified writer.
func (*NativeSM) Type ¶
func (ds *NativeSM) Type() pb.StateMachineType
Type returns the state machine type.
type OffloadedStatus ¶
type OffloadedStatus struct { DestroyedC chan struct{} // contains filtered or unexported fields }
OffloadedStatus is used for tracking whether the managed data store has been offloaded from various system components.
func (*OffloadedStatus) Destroyed ¶
func (o *OffloadedStatus) Destroyed() bool
Destroyed returns a boolean value indicating whether the belonging object has been destroyed.
func (*OffloadedStatus) ReadyToDestroy ¶
func (o *OffloadedStatus) ReadyToDestroy() bool
ReadyToDestroy returns a boolean value indicating whether the the managed data store is ready to be destroyed.
func (*OffloadedStatus) SetDestroyed ¶
func (o *OffloadedStatus) SetDestroyed()
SetDestroyed set the destroyed flag to be true
func (*OffloadedStatus) SetLoaded ¶
func (o *OffloadedStatus) SetLoaded(from From)
SetLoaded marks the managed data store as loaded from the specified component.
func (*OffloadedStatus) SetOffloaded ¶
func (o *OffloadedStatus) SetOffloaded(from From)
SetOffloaded marks the managed data store as offloaded from the specified component.
type OnDiskStateMachine ¶
type OnDiskStateMachine struct {
// contains filtered or unexported fields
}
OnDiskStateMachine is the type to represent an on disk state machine.
func NewOnDiskStateMachine ¶
func NewOnDiskStateMachine(s sm.IOnDiskStateMachine) *OnDiskStateMachine
NewOnDiskStateMachine creates and returns an on disk state machine.
func (*OnDiskStateMachine) Close ¶
func (s *OnDiskStateMachine) Close() error
Close closes the state machine.
func (*OnDiskStateMachine) Concurrent ¶
func (s *OnDiskStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*OnDiskStateMachine) GetHash ¶
func (s *OnDiskStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*OnDiskStateMachine) Lookup ¶
func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*OnDiskStateMachine) NALookup ¶
func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*OnDiskStateMachine) OnDisk ¶
func (s *OnDiskStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is an on disk state machine.
func (*OnDiskStateMachine) Open ¶
func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*OnDiskStateMachine) Prepare ¶
func (s *OnDiskStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*OnDiskStateMachine) Recover ¶
func (s *OnDiskStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*OnDiskStateMachine) Save ¶
func (s *OnDiskStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*OnDiskStateMachine) SetTestFS ¶
func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)
SetTestFS injects the specified fs to the test SM.
func (*OnDiskStateMachine) Sync ¶
func (s *OnDiskStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*OnDiskStateMachine) Type ¶
func (s *OnDiskStateMachine) Type() pb.StateMachineType
Type returns the type of the state machine.
type RaftClientID ¶
type RaftClientID uint64
RaftClientID is the type used as client id in sessions.
func (*RaftClientID) Compare ¶
func (a *RaftClientID) Compare(b llrb.Comparable) int
Compare implements the llrb.Comparable interface.
type RaftSeriesID ¶
type RaftSeriesID uint64
RaftSeriesID is the type used as series id in sessions.
type RegularStateMachine ¶
type RegularStateMachine struct {
// contains filtered or unexported fields
}
RegularStateMachine is a regular state machine not capable of taking concurrent snapshots.
func NewRegularStateMachine ¶
func NewRegularStateMachine(s sm.IStateMachine) *RegularStateMachine
NewRegularStateMachine creates a new RegularStateMachine instance.
func (*RegularStateMachine) Close ¶
func (s *RegularStateMachine) Close() error
Close closes the state machine.
func (*RegularStateMachine) Concurrent ¶
func (s *RegularStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*RegularStateMachine) GetHash ¶
func (s *RegularStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*RegularStateMachine) Lookup ¶
func (s *RegularStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*RegularStateMachine) NALookup ¶
func (s *RegularStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*RegularStateMachine) OnDisk ¶
func (s *RegularStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is an on disk state machine.
func (*RegularStateMachine) Open ¶
func (s *RegularStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*RegularStateMachine) Prepare ¶
func (s *RegularStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*RegularStateMachine) Recover ¶
func (s *RegularStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*RegularStateMachine) Save ¶
func (s *RegularStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*RegularStateMachine) Sync ¶
func (s *RegularStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*RegularStateMachine) Type ¶
func (s *RegularStateMachine) Type() pb.StateMachineType
Type returns the type of the state machine.
type SMFactoryFunc ¶
type SMFactoryFunc func(clusterID uint64, nodeID uint64, done <-chan struct{}) IManagedStateMachine
SMFactoryFunc is the function type for creating an IStateMachine instance
type SSMeta ¶
type SSMeta struct { From uint64 Index uint64 Term uint64 OnDiskIndex uint64 // applied index of IOnDiskStateMachine Request SSRequest Membership pb.Membership Type pb.StateMachineType Session *bytes.Buffer Ctx interface{} CompressionType config.CompressionType }
SSMeta is the metadata of a snapshot.
type SSReqType ¶
type SSReqType uint64
SSReqType is the type of a snapshot request.
const ( // Periodic is the value to indicate periodic snapshot. Periodic SSReqType = iota // UserRequested is the value to indicate user requested snapshot. UserRequested // Exported is the value to indicate exported snapshot. Exported // Streaming is the value to indicate snapshot streaming. Streaming )
type SSRequest ¶
type SSRequest struct { Type SSReqType Key uint64 Path string OverrideCompaction bool CompactionOverhead uint64 }
SSRequest is the type for describing the details of a snapshot request.
type SSVersion ¶
type SSVersion uint64
SSVersion is the snapshot version value type.
const ( // V1SnapshotVersion is the value of snapshot version 1. V1SnapshotVersion SSVersion = 1 // V2SnapshotVersion is the value of snapshot version 2. V2SnapshotVersion SSVersion = 2 // SnapshotVersion is the snapshot binary format version. SnapshotVersion SSVersion = V2SnapshotVersion // SnapshotHeaderSize is the size of snapshot in number of bytes. SnapshotHeaderSize = settings.SnapshotHeaderSize // DefaultChecksumType is the default checksum type. DefaultChecksumType = defaultChecksumType )
type Session ¶
type Session struct { ClientID RaftClientID RespondedUpTo RaftSeriesID History map[RaftSeriesID]sm.Result }
Session is the session object maintained on the raft side.
func (*Session) AddResponse ¶
func (s *Session) AddResponse(id RaftSeriesID, result sm.Result)
AddResponse adds a response.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager is the wrapper struct that implements client session related functionalites used in the IManagedStateMachine interface.
func NewSessionManager ¶
func NewSessionManager() *SessionManager
NewSessionManager returns a new SessionManager instance.
func (*SessionManager) AddResponse ¶
func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result sm.Result)
AddResponse adds the specified result to the session.
func (*SessionManager) ClientRegistered ¶
func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
ClientRegistered returns whether the specified client exists in the system.
func (*SessionManager) GetSessionHash ¶
func (ds *SessionManager) GetSessionHash() uint64
GetSessionHash returns an uint64 integer representing the state of the session manager.
func (*SessionManager) LoadSessions ¶
func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error
LoadSessions loads and restores sessions from io.Reader.
func (*SessionManager) MustHaveClientSeries ¶
func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
MustHaveClientSeries checks whether the session manager contains a client session identified as clientID and whether it has seriesID responded.
func (*SessionManager) RegisterClientID ¶
func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result
RegisterClientID registers a new client, it returns the input client id if it is previously unknown, or 0 when the client has already been registered.
func (*SessionManager) SaveSessions ¶
func (ds *SessionManager) SaveSessions(writer io.Writer) error
SaveSessions saves the sessions to the provided io.writer.
func (*SessionManager) UnregisterClientID ¶
func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result
UnregisterClientID removes the specified client session from the system. It returns the client id if the client is successfully removed, or 0 if the client session does not exist.
func (*SessionManager) UpdateRequired ¶
UpdateRequired return a tuple of request result, responded before, update required.
func (*SessionManager) UpdateRespondedTo ¶
func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
UpdateRespondedTo updates the responded to value of the specified client session.
type SnapshotReader ¶
type SnapshotReader struct {
// contains filtered or unexported fields
}
SnapshotReader is an io.Reader for reading from snapshot files.
func NewSnapshotReader ¶
func NewSnapshotReader(fp string, fs vfs.IFS) (*SnapshotReader, error)
NewSnapshotReader creates a new snapshot reader instance.
func (*SnapshotReader) Close ¶
func (sr *SnapshotReader) Close() error
Close closes the snapshot reader instance.
func (*SnapshotReader) GetHeader ¶
func (sr *SnapshotReader) GetHeader() (pb.SnapshotHeader, error)
GetHeader returns the snapshot header instance.
func (*SnapshotReader) Read ¶
func (sr *SnapshotReader) Read(data []byte) (int, error)
Read reads up to len(data) bytes from the snapshot file.
func (*SnapshotReader) ValidatePayload ¶
func (sr *SnapshotReader) ValidatePayload(header pb.SnapshotHeader)
ValidatePayload validates whether the snapshot content matches the checksum recorded in the header.
type SnapshotValidator ¶
type SnapshotValidator struct {
// contains filtered or unexported fields
}
SnapshotValidator is the validator used to check incoming snapshot chunks.
func NewSnapshotValidator ¶
func NewSnapshotValidator() *SnapshotValidator
NewSnapshotValidator creates and returns a new SnapshotValidator instance.
func (*SnapshotValidator) AddChunk ¶
func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool
AddChunk adds a new snapshot chunk to the validator.
func (*SnapshotValidator) Validate ¶
func (v *SnapshotValidator) Validate() bool
Validate validates the added chunks and return a boolean flag indicating whether the snapshot chunks are valid.
type SnapshotWriter ¶
type SnapshotWriter struct {
// contains filtered or unexported fields
}
SnapshotWriter is an io.Writer used to write snapshot file.
func NewSnapshotWriter ¶
func NewSnapshotWriter(fp string, v SSVersion, ct pb.CompressionType, fs vfs.IFS) (*SnapshotWriter, error)
NewSnapshotWriter creates a new snapshot writer instance.
func (*SnapshotWriter) Close ¶
func (sw *SnapshotWriter) Close() error
Close closes the snapshot writer instance.
func (*SnapshotWriter) GetPayloadChecksum ¶
func (sw *SnapshotWriter) GetPayloadChecksum() []byte
GetPayloadChecksum returns the payload checksum.
func (*SnapshotWriter) GetPayloadSize ¶
func (sw *SnapshotWriter) GetPayloadSize(sz uint64) uint64
GetPayloadSize returns the payload size.
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
StateMachine is a manager class that manages application state machine
func NewStateMachine ¶
func NewStateMachine(sm IManagedStateMachine, snapshotter ISnapshotter, cfg config.Config, node INode, fs vfs.IFS) *StateMachine
NewStateMachine creates a new application state machine object.
func (*StateMachine) Concurrent ¶
func (s *StateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*StateMachine) DestroyedC ¶
func (s *StateMachine) DestroyedC() <-chan struct{}
DestroyedC return a chan struct{} used to indicate whether the SM has been fully unloaded.
func (*StateMachine) GetBatchedLastApplied ¶
func (s *StateMachine) GetBatchedLastApplied() uint64
GetBatchedLastApplied returns the batched last applied value.
func (*StateMachine) GetHash ¶
func (s *StateMachine) GetHash() (uint64, error)
GetHash returns the state machine hash.
func (*StateMachine) GetLastApplied ¶
func (s *StateMachine) GetLastApplied() uint64
GetLastApplied returns the last applied value.
func (*StateMachine) GetMembership ¶
func (s *StateMachine) GetMembership() pb.Membership
GetMembership returns the membership info maintained by the state machine.
func (*StateMachine) GetMembershipHash ¶
func (s *StateMachine) GetMembershipHash() uint64
GetMembershipHash returns the hash of the membership instance.
func (*StateMachine) GetSessionHash ¶
func (s *StateMachine) GetSessionHash() uint64
GetSessionHash returns the session hash.
func (*StateMachine) GetSyncedIndex ¶
func (s *StateMachine) GetSyncedIndex() uint64
GetSyncedIndex returns the index value that is known to have been synchronized.
func (*StateMachine) Handle ¶
Handle pulls the committed record and apply it if there is any available.
func (*StateMachine) Loaded ¶
func (s *StateMachine) Loaded(from From)
Loaded marks the state machine as loaded from the specified compone.
func (*StateMachine) Lookup ¶
func (s *StateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the local state machine.
func (*StateMachine) NALookup ¶
func (s *StateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the local state machine.
func (*StateMachine) Offloaded ¶
func (s *StateMachine) Offloaded(from From) bool
Offloaded marks the state machine as offloaded from the specified compone. It returns a boolean value indicating whether the node has been fully unloaded after unloading from the specified compone.
func (*StateMachine) OnDiskStateMachine ¶
func (s *StateMachine) OnDiskStateMachine() bool
OnDiskStateMachine returns a boolean flag indicating whether it is an on disk state machine.
func (*StateMachine) OpenOnDiskStateMachine ¶
func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)
OpenOnDiskStateMachine opens the on disk state machine.
func (*StateMachine) ReadyToStreamSnapshot ¶
func (s *StateMachine) ReadyToStreamSnapshot() bool
ReadyToStreamSnapshot returns a boolean flag to indicate whether the state machine is ready to stream snapshot. It can not stream a full snapshot when membership state is catching up with the all disk SM state. however, meta only snapshot can be taken at any time.
func (*StateMachine) RecoverFromSnapshot ¶
func (s *StateMachine) RecoverFromSnapshot(t Task) (uint64, error)
RecoverFromSnapshot applies the snapshot.
func (*StateMachine) SaveSnapshot ¶
SaveSnapshot creates a snapshot.
func (*StateMachine) SetBatchedLastApplied ¶
func (s *StateMachine) SetBatchedLastApplied(index uint64)
SetBatchedLastApplied sets the batched last applied value. This method is mostly used in tests.
func (*StateMachine) StreamSnapshot ¶
func (s *StateMachine) StreamSnapshot(sink pb.IChunkSink) error
StreamSnapshot starts to stream snapshot from the current SM to a remote node targeted by the provided sink.
func (*StateMachine) Sync ¶
func (s *StateMachine) Sync() error
Sync synchronizes state machine's in-core state with that on disk.
func (*StateMachine) TaskChanBusy ¶
func (s *StateMachine) TaskChanBusy() bool
TaskChanBusy returns whether the TaskC chan is busy. Busy is defined as having more than half of its buffer occupied.
func (*StateMachine) TaskQ ¶
func (s *StateMachine) TaskQ() *TaskQueue
TaskQ returns the task queue.
type Task ¶
type Task struct { ClusterID uint64 NodeID uint64 Index uint64 SnapshotAvailable bool InitialSnapshot bool SnapshotRequested bool StreamSnapshot bool PeriodicSync bool NewNode bool SSRequest SSRequest Entries []pb.Entry }
Task describes a task that need to be handled by StateMachine.
func (*Task) IsSnapshotTask ¶
IsSnapshotTask returns a boolean flag indicating whether it is a snapshot task.
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue is a queue of tasks to be processed by the state machine.
func NewTaskQueue ¶
func NewTaskQueue() *TaskQueue
NewTaskQueue creates and returns a new task queue.
func (*TaskQueue) MoreEntryToApply ¶
MoreEntryToApply returns a boolean value indicating whether it is ok to queue more entries to apply.