Documentation ¶
Overview ¶
Package rsm implements State Machines used in Dragonboat.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- func GetDefaultChecksum() hash.Hash
- func GetEmptyLRUSession() []byte
- func GetEncoded(ct dio.CompressionType, cmd []byte, dst []byte) []byte
- func GetMaxBlockSize(ct config.CompressionType) uint64
- func GetPayload(e pb.Entry) ([]byte, error)
- func GetV2PayloadChecksum(fp string, fs vfs.IFS) (crc []byte, err error)
- func GetV2PayloadSize(sz uint64) uint64
- func GetWitnessSnapshot(fs vfs.IFS) (result []byte, err error)
- func IsShrunkSnapshotFile(fp string, fs vfs.IFS) (shrunk bool, err error)
- func ReplaceSnapshot(newFp string, fp string, fs vfs.IFS) error
- func ShrinkSnapshot(fp string, newFp string, fs vfs.IFS) (err error)
- func ToDioType(ct config.CompressionType) dio.CompressionType
- type BlockWriter
- type ChunkWriter
- type ConcurrentStateMachine
- func (s *ConcurrentStateMachine) Close() error
- func (s *ConcurrentStateMachine) Concurrent() bool
- func (s *ConcurrentStateMachine) GetHash() (uint64, error)
- func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)
- func (s *ConcurrentStateMachine) OnDisk() bool
- func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (s *ConcurrentStateMachine) Prepare() (interface{}, error)
- func (s *ConcurrentStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (s *ConcurrentStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (s *ConcurrentStateMachine) Sync() error
- func (s *ConcurrentStateMachine) Type() pb.StateMachineType
- func (s *ConcurrentStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type Files
- type IBlockWriter
- type ILoadable
- type IManagedStateMachine
- type INode
- type IRecoverable
- type ISavable
- type ISnapshotter
- type IStateMachine
- type IStreamable
- type ITestFS
- type IVReader
- type IVValidator
- type IVWriter
- type InMemStateMachine
- func (i *InMemStateMachine) Close() error
- func (i *InMemStateMachine) Concurrent() bool
- func (i *InMemStateMachine) GetHash() (uint64, error)
- func (i *InMemStateMachine) Lookup(query interface{}) (interface{}, error)
- func (i *InMemStateMachine) NALookup(query []byte) ([]byte, error)
- func (i *InMemStateMachine) OnDisk() bool
- func (i *InMemStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (i *InMemStateMachine) Prepare() (interface{}, error)
- func (i *InMemStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (i *InMemStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (i *InMemStateMachine) Sync() error
- func (i *InMemStateMachine) Type() pb.StateMachineType
- func (i *InMemStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type ManagedStateMachineFactory
- type NativeSM
- func (ds *NativeSM) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error)
- func (ds *NativeSM) Close() error
- func (ds *NativeSM) Concurrent() bool
- func (ds *NativeSM) ConcurrentLookup(query interface{}) (interface{}, error)
- func (ds *NativeSM) DestroyedC() <-chan struct{}
- func (ds *NativeSM) GetHash() (uint64, error)
- func (ds *NativeSM) Loaded()
- func (ds *NativeSM) Lookup(query interface{}) (interface{}, error)
- func (ds *NativeSM) NAConcurrentLookup(query []byte) ([]byte, error)
- func (ds *NativeSM) NALookup(query []byte) ([]byte, error)
- func (ds *NativeSM) Offloaded() bool
- func (ds *NativeSM) OnDisk() bool
- func (ds *NativeSM) Open() (uint64, error)
- func (ds *NativeSM) Prepare() (interface{}, error)
- func (ds *NativeSM) Recover(r io.Reader, files []sm.SnapshotFile) error
- func (ds *NativeSM) Save(meta SSMeta, w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)
- func (ds *NativeSM) Stream(ctx interface{}, w io.Writer) error
- func (ds *NativeSM) Sync() error
- func (ds *NativeSM) Type() pb.StateMachineType
- func (ds *NativeSM) Update(e sm.Entry) (sm.Result, error)
- type OffloadedStatus
- type OnDiskStateMachine
- func (s *OnDiskStateMachine) Close() error
- func (s *OnDiskStateMachine) Concurrent() bool
- func (s *OnDiskStateMachine) GetHash() (uint64, error)
- func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)
- func (s *OnDiskStateMachine) OnDisk() bool
- func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)
- func (s *OnDiskStateMachine) Prepare() (interface{}, error)
- func (s *OnDiskStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
- func (s *OnDiskStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error
- func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)
- func (s *OnDiskStateMachine) Sync() error
- func (s *OnDiskStateMachine) Type() pb.StateMachineType
- func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type RaftClientID
- type RaftSeriesID
- type SMFactoryFunc
- type SSEnv
- type SSMeta
- type SSReqType
- type SSRequest
- type SSVersion
- type Session
- type SessionManager
- func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result sm.Result)
- func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
- func (ds *SessionManager) GetSessionHash() uint64
- func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error
- func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
- func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result
- func (ds *SessionManager) SaveSessions(writer io.Writer) error
- func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result
- func (ds *SessionManager) UpdateRequired(session *Session, seriesID uint64) (sm.Result, bool, bool)
- func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
- type SnapshotReader
- type SnapshotValidator
- type SnapshotWriter
- type StateMachine
- func (s *StateMachine) Close() error
- func (s *StateMachine) Concurrent() bool
- func (s *StateMachine) DestroyedC() <-chan struct{}
- func (s *StateMachine) GetHash() (uint64, error)
- func (s *StateMachine) GetLastApplied() uint64
- func (s *StateMachine) GetMembership() pb.Membership
- func (s *StateMachine) GetMembershipHash() uint64
- func (s *StateMachine) GetSessionHash() uint64
- func (s *StateMachine) GetSyncedIndex() uint64
- func (s *StateMachine) Handle(batch []Task, apply []sm.Entry) (Task, error)
- func (s *StateMachine) Loaded()
- func (s *StateMachine) Lookup(query interface{}) (interface{}, error)
- func (s *StateMachine) NALookup(query []byte) ([]byte, error)
- func (s *StateMachine) Offloaded() bool
- func (s *StateMachine) OnDiskStateMachine() bool
- func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)
- func (s *StateMachine) ReadyToStream() bool
- func (s *StateMachine) Recover(t Task) (_ pb.Snapshot, err error)
- func (s *StateMachine) Save(req SSRequest) (pb.Snapshot, SSEnv, error)
- func (s *StateMachine) SetLastApplied(index uint64)
- func (s *StateMachine) Stream(sink pb.IChunkSink) error
- func (s *StateMachine) Sync() error
- func (s *StateMachine) TaskChanBusy() bool
- func (s *StateMachine) TaskQ() *TaskQueue
- func (s *StateMachine) Type() pb.StateMachineType
- type Task
- type TaskQueue
Constants ¶
const ( EEHeaderSize uint8 = 1 EEVersion uint8 = 0 << 4 EEV0 uint8 = 0 << 4 // for V0 format, entries with empty payload will cause panic as such // entries always have their TYPE value set to ApplicationEntry EENoCompression uint8 = 0 << 1 EESnappy uint8 = 1 << 1 EENoSession uint8 = 0 EEHasSession uint8 = 1 // uncompressed size is binary.Uvarint encoded EEV0SizeOffset int = 1 )
Entry Cmd format when Type = pb.EncodedEntry
------------------------------------- |Version|CompressionFlag|SessionFlag| | 4Bits | 3Bits | 1Bit | -------------------------------------
const ( // ChunkSize is the size of each snapshot chunk. ChunkSize = settings.SnapshotChunkSize )
const ( // EmptyClientSessionLength defines the length of an empty sessions instance. EmptyClientSessionLength uint64 = 16 )
Variables ¶
var DefaultSSRequest = SSRequest{}
DefaultSSRequest is the default SSRequest.
var ( // ErrClusterClosed indicates that the cluster has been closed ErrClusterClosed = errors.New("raft cluster already closed") )
var ( // ErrRestoreSnapshot indicates there is error when trying to restore // from a snapshot ErrRestoreSnapshot = errors.New("failed to restore snapshot") )
var ( // LRUMaxSessionCount is the largest number of client sessions that can be // concurrently managed by a LRUSession instance. LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount )
Functions ¶
func GetDefaultChecksum ¶
GetDefaultChecksum returns the default hash.Hash instance.
func GetEmptyLRUSession ¶
func GetEmptyLRUSession() []byte
GetEmptyLRUSession returns an marshaled empty sessions instance.
func GetEncoded ¶
func GetEncoded(ct dio.CompressionType, cmd []byte, dst []byte) []byte
GetEncoded returns the encoded payload using the specified compression type and the default encoded entry version.
func GetMaxBlockSize ¶
func GetMaxBlockSize(ct config.CompressionType) uint64
GetMaxBlockSize returns the maximum block length supported by the specified compression type.
func GetPayload ¶
GetPayload returns the payload of the entry ready to be applied into the state machine.
func GetV2PayloadChecksum ¶
GetV2PayloadChecksum calculates the payload checksum of the specified snapshot file.
func GetV2PayloadSize ¶
GetV2PayloadSize returns the actual on disk size for the input user payload size.
func GetWitnessSnapshot ¶
GetWitnessSnapshot returns the content of a witness snapshot.
func IsShrunkSnapshotFile ¶
IsShrunkSnapshotFile returns a boolean flag indicating whether the specified snapshot file is already shrunk.
func ReplaceSnapshot ¶
ReplaceSnapshot replace the specified snapshot file with the shrunk version atomically.
func ShrinkSnapshot ¶
ShrinkSnapshot shrinks the specified snapshot file and save the generated shrunk version to the path specified by newFp.
func ToDioType ¶
func ToDioType(ct config.CompressionType) dio.CompressionType
ToDioType converts the CompressionType type defined in the config package to the CompressionType value defined in the dio package.
Types ¶
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter is a writer type that writes the input data to the underlying storage with checksum appended at the end of each block.
func NewBlockWriter ¶
func NewBlockWriter(blockSize uint64, nb func(data []byte, crc []byte) error, t pb.ChecksumType) *BlockWriter
NewBlockWriter creates and returns a block writer.
func (*BlockWriter) Close ¶
func (bw *BlockWriter) Close() error
Close closes the writer by passing all in memory buffered data to the underlying onNewBlock function.
func (*BlockWriter) GetPayloadChecksum ¶
func (bw *BlockWriter) GetPayloadChecksum() []byte
GetPayloadChecksum returns the checksum for the entire payload.
type ChunkWriter ¶
type ChunkWriter struct {
// contains filtered or unexported fields
}
ChunkWriter is an io.WriteCloser type that streams snapshot chunks to its intended remote nodes.
func NewChunkWriter ¶
func NewChunkWriter(sink pb.IChunkSink, meta SSMeta) *ChunkWriter
NewChunkWriter creates and returns a chunk writer instance.
type ConcurrentStateMachine ¶
type ConcurrentStateMachine struct {
// contains filtered or unexported fields
}
ConcurrentStateMachine is an IStateMachine type capable of taking concurrent snapshots.
func NewConcurrentStateMachine ¶
func NewConcurrentStateMachine(s sm.IConcurrentStateMachine) *ConcurrentStateMachine
NewConcurrentStateMachine creates a new ConcurrentStateMachine instance.
func (*ConcurrentStateMachine) Close ¶
func (s *ConcurrentStateMachine) Close() error
Close closes the state machine.
func (*ConcurrentStateMachine) Concurrent ¶
func (s *ConcurrentStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*ConcurrentStateMachine) GetHash ¶
func (s *ConcurrentStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*ConcurrentStateMachine) Lookup ¶
func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*ConcurrentStateMachine) NALookup ¶
func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*ConcurrentStateMachine) OnDisk ¶
func (s *ConcurrentStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is a on disk state machine.
func (*ConcurrentStateMachine) Open ¶
func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*ConcurrentStateMachine) Prepare ¶
func (s *ConcurrentStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*ConcurrentStateMachine) Recover ¶
func (s *ConcurrentStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*ConcurrentStateMachine) Save ¶
func (s *ConcurrentStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*ConcurrentStateMachine) Sync ¶
func (s *ConcurrentStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*ConcurrentStateMachine) Type ¶
func (s *ConcurrentStateMachine) Type() pb.StateMachineType
Type returns the type of the state machine.
type Files ¶
type Files struct {
// contains filtered or unexported fields
}
Files is a collection of external files specified by the SaveSnapshot method of the state machine type.
func NewFileCollection ¶
func NewFileCollection() *Files
NewFileCollection creates and returns a Files instance.
func (*Files) GetFileAt ¶
func (fc *Files) GetFileAt(idx uint64) *pb.SnapshotFile
GetFileAt returns the specified file.
func (*Files) PrepareFiles ¶
PrepareFiles finalize the external files added to the collection.
type IBlockWriter ¶
type IBlockWriter interface { io.WriteCloser GetPayloadChecksum() []byte }
IBlockWriter is the interface for writing checksumed data blocks.
type ILoadable ¶
ILoadable is the interface for types that can load client session state from a snapshot.
type IManagedStateMachine ¶
type IManagedStateMachine interface { Open() (uint64, error) Update(sm.Entry) (sm.Result, error) BatchedUpdate([]sm.Entry) ([]sm.Entry, error) Lookup(interface{}) (interface{}, error) ConcurrentLookup(interface{}) (interface{}, error) NALookup([]byte) ([]byte, error) NAConcurrentLookup([]byte) ([]byte, error) Sync() error GetHash() (uint64, error) Prepare() (interface{}, error) Save(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) Recover(io.Reader, []sm.SnapshotFile) error Stream(interface{}, io.Writer) error Offloaded() bool Loaded() Close() error DestroyedC() <-chan struct{} Concurrent() bool OnDisk() bool Type() pb.StateMachineType }
IManagedStateMachine is the interface used for managed state machine. A managed state machine contains a user state machine plus its engine state.
type INode ¶
type INode interface { StepReady() RestoreRemotes(pb.Snapshot) error ApplyUpdate(pb.Entry, sm.Result, bool, bool, bool) ApplyConfigChange(pb.ConfigChange, uint64, bool) error NodeID() uint64 ClusterID() uint64 ShouldStop() <-chan struct{} }
INode is the interface of a dragonboat node.
type IRecoverable ¶
type IRecoverable interface {
Recover(io.Reader, []sm.SnapshotFile) error
}
IRecoverable is the interface for types that can have its state restored from snapshots.
type ISavable ¶
type ISavable interface {
Save(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
}
ISavable is the interface for types that can its content saved as snapshots.
type ISnapshotter ¶
type ISnapshotter interface { GetSnapshot() (pb.Snapshot, error) Stream(IStreamable, SSMeta, pb.IChunkSink) error Shrunk(ss pb.Snapshot) (bool, error) Save(ISavable, SSMeta) (pb.Snapshot, SSEnv, error) Load(pb.Snapshot, ILoadable, IRecoverable) error IsNoSnapshotError(error) bool }
ISnapshotter is the interface for the snapshotter object.
type IStateMachine ¶
type IStateMachine interface { Open(<-chan struct{}) (uint64, error) Update(entries []sm.Entry) ([]sm.Entry, error) Lookup(query interface{}) (interface{}, error) NALookup(query []byte) ([]byte, error) Sync() error Prepare() (interface{}, error) Save(interface{}, io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error Recover(io.Reader, []sm.SnapshotFile, <-chan struct{}) error Close() error GetHash() (uint64, error) Concurrent() bool OnDisk() bool Type() pb.StateMachineType }
IStateMachine is an adapter interface for underlying sm.IStateMachine, sm.IConcurrentStateMachine and sm.IOnDIskStateMachine instances.
type IStreamable ¶
IStreamable is the interface for types that can be snapshot streamed.
type IVValidator ¶
IVValidator is the interface for versioned validator.
type IVWriter ¶
type IVWriter interface { io.WriteCloser GetVersion() SSVersion GetPayloadSum() []byte GetPayloadSize(uint64) uint64 }
IVWriter is the interface for versioned snapshot writer.
type InMemStateMachine ¶
type InMemStateMachine struct {
// contains filtered or unexported fields
}
InMemStateMachine is a regular state machine not capable of concurrent access from multiple goroutines.
func NewInMemStateMachine ¶
func NewInMemStateMachine(s sm.IStateMachine) *InMemStateMachine
NewInMemStateMachine creates a new InMemStateMachine instance.
func (*InMemStateMachine) Close ¶
func (i *InMemStateMachine) Close() error
Close closes the state machine.
func (*InMemStateMachine) Concurrent ¶
func (i *InMemStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*InMemStateMachine) GetHash ¶
func (i *InMemStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*InMemStateMachine) Lookup ¶
func (i *InMemStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*InMemStateMachine) NALookup ¶
func (i *InMemStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*InMemStateMachine) OnDisk ¶
func (i *InMemStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is an on disk state machine.
func (*InMemStateMachine) Open ¶
func (i *InMemStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*InMemStateMachine) Prepare ¶
func (i *InMemStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*InMemStateMachine) Recover ¶
func (i *InMemStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*InMemStateMachine) Save ¶
func (i *InMemStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*InMemStateMachine) Sync ¶
func (i *InMemStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*InMemStateMachine) Type ¶
func (i *InMemStateMachine) Type() pb.StateMachineType
Type returns the type of the state machine.
type ManagedStateMachineFactory ¶
type ManagedStateMachineFactory func(clusterID uint64, nodeID uint64, stopc <-chan struct{}) IManagedStateMachine
ManagedStateMachineFactory is the factory function type for creating an IManagedStateMachine instance.
type NativeSM ¶
type NativeSM struct { OffloadedStatus // contains filtered or unexported fields }
NativeSM is the IManagedStateMachine object used to manage native data store in Golang.
func NewNativeSM ¶
func NewNativeSM(config config.Config, ism IStateMachine, done <-chan struct{}) *NativeSM
NewNativeSM creates and returns a new NativeSM object.
func (*NativeSM) BatchedUpdate ¶
BatchedUpdate applies committed entries in a batch to hide latency.
func (*NativeSM) Concurrent ¶
Concurrent returns a boolean flag to indicate whether the managed state machine instance is capable of doing concurrent snapshots.
func (*NativeSM) ConcurrentLookup ¶
ConcurrentLookup queries the data store without obtaining the NativeSM.mu.
func (*NativeSM) DestroyedC ¶
func (ds *NativeSM) DestroyedC() <-chan struct{}
DestroyedC returns a chan struct{} used to indicate whether the SM has been fully offloaded.
func (*NativeSM) GetHash ¶
GetHash returns an integer value representing the state of the data store.
func (*NativeSM) Loaded ¶
func (ds *NativeSM) Loaded()
Loaded marks the statemachine as loaded by the specified component.
func (*NativeSM) NAConcurrentLookup ¶
NAConcurrentLookup queries the data store without obtaining the NativeSM.mu.
func (*NativeSM) OnDisk ¶
OnDisk returns a boolean flag indicating whether the state machine is an on disk state machine.
func (*NativeSM) Save ¶
func (ds *NativeSM) Save(meta SSMeta, w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)
Save saves the state of the data store to the specified writer.
func (*NativeSM) Type ¶
func (ds *NativeSM) Type() pb.StateMachineType
Type returns the state machine type.
type OffloadedStatus ¶
type OffloadedStatus struct { DestroyedC chan struct{} // contains filtered or unexported fields }
OffloadedStatus is used for tracking whether the managed data store has been offloaded from various system components.
func (*OffloadedStatus) Destroyed ¶
func (o *OffloadedStatus) Destroyed() bool
Destroyed returns a boolean value indicating whether the belonging object has been destroyed.
func (*OffloadedStatus) SetDestroyed ¶
func (o *OffloadedStatus) SetDestroyed()
SetDestroyed set the destroyed flag to be true
func (*OffloadedStatus) SetLoaded ¶
func (o *OffloadedStatus) SetLoaded()
SetLoaded marks the managed data store as loaded by a user component.
func (*OffloadedStatus) SetOffloaded ¶
func (o *OffloadedStatus) SetOffloaded() uint64
SetOffloaded marks the managed data store as offloaded from a user component.
type OnDiskStateMachine ¶
type OnDiskStateMachine struct {
// contains filtered or unexported fields
}
OnDiskStateMachine is the type to represent an on disk state machine.
func NewOnDiskStateMachine ¶
func NewOnDiskStateMachine(s sm.IOnDiskStateMachine) *OnDiskStateMachine
NewOnDiskStateMachine creates and returns an on disk state machine.
func (*OnDiskStateMachine) Close ¶
func (s *OnDiskStateMachine) Close() error
Close closes the state machine.
func (*OnDiskStateMachine) Concurrent ¶
func (s *OnDiskStateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*OnDiskStateMachine) GetHash ¶
func (s *OnDiskStateMachine) GetHash() (uint64, error)
GetHash returns the uint64 hash value representing the state of a state machine.
func (*OnDiskStateMachine) Lookup ¶
func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the state machine.
func (*OnDiskStateMachine) NALookup ¶
func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the state machine.
func (*OnDiskStateMachine) OnDisk ¶
func (s *OnDiskStateMachine) OnDisk() bool
OnDisk returns a boolean flag indicating whether this is an on disk state machine.
func (*OnDiskStateMachine) Open ¶
func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)
Open opens the state machine.
func (*OnDiskStateMachine) Prepare ¶
func (s *OnDiskStateMachine) Prepare() (interface{}, error)
Prepare makes preparations for taking concurrent snapshot.
func (*OnDiskStateMachine) Recover ¶
func (s *OnDiskStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error
Recover recovers the state machine from a snapshot.
func (*OnDiskStateMachine) Save ¶
func (s *OnDiskStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error
Save saves the snapshot.
func (*OnDiskStateMachine) SetTestFS ¶
func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)
SetTestFS injects the specified fs to the test SM.
func (*OnDiskStateMachine) Sync ¶
func (s *OnDiskStateMachine) Sync() error
Sync synchronizes all in-core state with that on disk.
func (*OnDiskStateMachine) Type ¶
func (s *OnDiskStateMachine) Type() pb.StateMachineType
Type returns the type of the state machine.
type RaftClientID ¶
type RaftClientID uint64
RaftClientID is the type used as client id in sessions.
func (*RaftClientID) Compare ¶
func (a *RaftClientID) Compare(b llrb.Comparable) int
Compare implements the llrb.Comparable interface.
type RaftSeriesID ¶
type RaftSeriesID uint64
RaftSeriesID is the type used as series id in sessions.
type SMFactoryFunc ¶
type SMFactoryFunc func(clusterID uint64, nodeID uint64, done <-chan struct{}) IManagedStateMachine
SMFactoryFunc is the function type for creating an IStateMachine instance
type SSMeta ¶
type SSMeta struct { Membership pb.Membership Ctx interface{} Session *bytes.Buffer Request SSRequest From uint64 OnDiskIndex uint64 Index uint64 Term uint64 Type pb.StateMachineType CompressionType config.CompressionType }
SSMeta is the metadata of a snapshot.
type SSReqType ¶
type SSReqType uint64
SSReqType is the type of a snapshot request.
const ( // Periodic is the value to indicate periodic snapshot. Periodic SSReqType = iota // UserRequested is the value to indicate user requested snapshot. UserRequested // Exported is the value to indicate exported snapshot. Exported // Streaming is the value to indicate snapshot streaming. Streaming )
type SSRequest ¶
type SSRequest struct { Path string Type SSReqType Key uint64 CompactionOverhead uint64 OverrideCompaction bool }
SSRequest contains details of a snapshot request.
type SSVersion ¶
type SSVersion uint64
SSVersion is the snapshot version value type.
const ( // V1 is the value of snapshot version 1. V1 SSVersion = 1 // V2 is the value of snapshot version 2. V2 SSVersion = 2 // DefaultVersion is the snapshot binary format version. DefaultVersion SSVersion = V2 // HeaderSize is the size of snapshot in number of bytes. HeaderSize = settings.SnapshotHeaderSize // DefaultChecksumType is the default checksum type. DefaultChecksumType = defaultChecksumType )
type Session ¶
type Session struct { History map[RaftSeriesID]sm.Result ClientID RaftClientID RespondedUpTo RaftSeriesID }
Session is the session object maintained on the raft side.
func (*Session) AddResponse ¶
func (s *Session) AddResponse(id RaftSeriesID, result sm.Result)
AddResponse adds a response.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager is the wrapper struct that implements client session related functionalites used in the IManagedStateMachine interface.
func NewSessionManager ¶
func NewSessionManager() *SessionManager
NewSessionManager returns a new SessionManager instance.
func (*SessionManager) AddResponse ¶
func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result sm.Result)
AddResponse adds the specified result to the session.
func (*SessionManager) ClientRegistered ¶
func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)
ClientRegistered returns whether the specified client exists in the system.
func (*SessionManager) GetSessionHash ¶
func (ds *SessionManager) GetSessionHash() uint64
GetSessionHash returns an uint64 integer representing the state of the session manager.
func (*SessionManager) LoadSessions ¶
func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error
LoadSessions loads and restores sessions from io.Reader.
func (*SessionManager) MustHaveClientSeries ¶
func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64)
MustHaveClientSeries checks whether the session manager contains a client session identified as clientID and whether it has seriesID responded.
func (*SessionManager) RegisterClientID ¶
func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result
RegisterClientID registers a new client, it returns the input client id if it is previously unknown, or 0 when the client has already been registered.
func (*SessionManager) SaveSessions ¶
func (ds *SessionManager) SaveSessions(writer io.Writer) error
SaveSessions saves the sessions to the provided io.writer.
func (*SessionManager) UnregisterClientID ¶
func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result
UnregisterClientID removes the specified client session from the system. It returns the client id if the client is successfully removed, or 0 if the client session does not exist.
func (*SessionManager) UpdateRequired ¶
UpdateRequired return a tuple of request result, responded before, update required.
func (*SessionManager) UpdateRespondedTo ¶
func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64)
UpdateRespondedTo updates the responded to value of the specified client session.
type SnapshotReader ¶
type SnapshotReader struct {
// contains filtered or unexported fields
}
SnapshotReader is an io.Reader for reading from snapshot files.
func NewSnapshotReader ¶
func NewSnapshotReader(fp string, fs vfs.IFS) (*SnapshotReader, pb.SnapshotHeader, error)
NewSnapshotReader creates a new snapshot reader instance.
func (*SnapshotReader) Close ¶
func (sr *SnapshotReader) Close() error
Close closes the snapshot reader instance.
type SnapshotValidator ¶
type SnapshotValidator struct {
// contains filtered or unexported fields
}
SnapshotValidator is the validator used to check incoming snapshot chunks.
func NewSnapshotValidator ¶
func NewSnapshotValidator() *SnapshotValidator
NewSnapshotValidator creates and returns a new SnapshotValidator instance.
func (*SnapshotValidator) AddChunk ¶
func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool
AddChunk adds a new snapshot chunk to the validator.
func (*SnapshotValidator) Validate ¶
func (v *SnapshotValidator) Validate() bool
Validate validates the added chunks and return a boolean flag indicating whether the snapshot chunks are valid.
type SnapshotWriter ¶
type SnapshotWriter struct {
// contains filtered or unexported fields
}
SnapshotWriter is an io.Writer used to write snapshot file.
func NewSnapshotWriter ¶
func NewSnapshotWriter(fp string, ct pb.CompressionType, fs vfs.IFS) (*SnapshotWriter, error)
NewSnapshotWriter creates a new snapshot writer instance.
func (*SnapshotWriter) Close ¶
func (sw *SnapshotWriter) Close() (err error)
Close closes the snapshot writer instance.
func (*SnapshotWriter) GetPayloadChecksum ¶
func (sw *SnapshotWriter) GetPayloadChecksum() []byte
GetPayloadChecksum returns the payload checksum.
func (*SnapshotWriter) GetPayloadSize ¶
func (sw *SnapshotWriter) GetPayloadSize(sz uint64) uint64
GetPayloadSize returns the payload size.
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
StateMachine is the state machine component in the replicated state machine scheme.
func NewStateMachine ¶
func NewStateMachine(sm IManagedStateMachine, snapshotter ISnapshotter, cfg config.Config, node INode, fs vfs.IFS) *StateMachine
NewStateMachine creates a new application state machine object.
func (*StateMachine) Concurrent ¶
func (s *StateMachine) Concurrent() bool
Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.
func (*StateMachine) DestroyedC ¶
func (s *StateMachine) DestroyedC() <-chan struct{}
DestroyedC return a chan struct{} used to indicate whether the SM has been fully unloaded.
func (*StateMachine) GetHash ¶
func (s *StateMachine) GetHash() (uint64, error)
GetHash returns the state machine hash.
func (*StateMachine) GetLastApplied ¶
func (s *StateMachine) GetLastApplied() uint64
GetLastApplied returns the last applied value.
func (*StateMachine) GetMembership ¶
func (s *StateMachine) GetMembership() pb.Membership
GetMembership returns the membership info maintained by the state machine.
func (*StateMachine) GetMembershipHash ¶
func (s *StateMachine) GetMembershipHash() uint64
GetMembershipHash returns the hash of the membership instance.
func (*StateMachine) GetSessionHash ¶
func (s *StateMachine) GetSessionHash() uint64
GetSessionHash returns the session hash.
func (*StateMachine) GetSyncedIndex ¶
func (s *StateMachine) GetSyncedIndex() uint64
GetSyncedIndex returns the index value that is known to have been synchronized.
func (*StateMachine) Handle ¶
Handle pulls the committed record and apply it if there is any available.
func (*StateMachine) Loaded ¶
func (s *StateMachine) Loaded()
Loaded marks the state machine as loaded from the specified compone.
func (*StateMachine) Lookup ¶
func (s *StateMachine) Lookup(query interface{}) (interface{}, error)
Lookup queries the local state machine.
func (*StateMachine) NALookup ¶
func (s *StateMachine) NALookup(query []byte) ([]byte, error)
NALookup queries the local state machine.
func (*StateMachine) Offloaded ¶
func (s *StateMachine) Offloaded() bool
Offloaded marks the state machine as offloaded from the specified compone. It returns a boolean value indicating whether the node has been fully unloaded after unloading from the specified compone.
func (*StateMachine) OnDiskStateMachine ¶
func (s *StateMachine) OnDiskStateMachine() bool
OnDiskStateMachine returns a boolean flag indicating whether it is an on disk state machine.
func (*StateMachine) OpenOnDiskStateMachine ¶
func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)
OpenOnDiskStateMachine opens the on disk state machine.
func (*StateMachine) ReadyToStream ¶
func (s *StateMachine) ReadyToStream() bool
ReadyToStream returns a boolean flag to indicate whether the state machine is ready to stream snapshot. It can not stream a full snapshot when membership state is catching up with the all disk SM state. Meta only snapshot can be taken at any time.
func (*StateMachine) Recover ¶
func (s *StateMachine) Recover(t Task) (_ pb.Snapshot, err error)
Recover applies the snapshot.
func (*StateMachine) SetLastApplied ¶
func (s *StateMachine) SetLastApplied(index uint64)
SetLastApplied sets the last applied index to the specified value. This method is only used in tests.
func (*StateMachine) Stream ¶
func (s *StateMachine) Stream(sink pb.IChunkSink) error
Stream starts to stream snapshot from the current SM to a remote node targeted by the provided sink.
func (*StateMachine) Sync ¶
func (s *StateMachine) Sync() error
Sync synchronizes state machine's in-core state with that on disk.
func (*StateMachine) TaskChanBusy ¶
func (s *StateMachine) TaskChanBusy() bool
TaskChanBusy returns whether the TaskC chan is busy. Busy is defined as having more than half of its buffer occupied.
func (*StateMachine) TaskQ ¶
func (s *StateMachine) TaskQ() *TaskQueue
TaskQ returns the task queue.
func (*StateMachine) Type ¶
func (s *StateMachine) Type() pb.StateMachineType
Type returns the state machine type.
type Task ¶
type Task struct { Entries []pb.Entry SSRequest SSRequest ClusterID uint64 NodeID uint64 Index uint64 Save bool Stream bool PeriodicSync bool NewNode bool Recover bool Initial bool }
Task describes a task that need to be handled by StateMachine.
func (*Task) IsSnapshotTask ¶
IsSnapshotTask returns a boolean flag indicating whether it is a snapshot task.
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue is a queue of tasks to be processed by the state machine.
func NewTaskQueue ¶
func NewTaskQueue() *TaskQueue
NewTaskQueue creates and returns a new task queue.
func (*TaskQueue) MoreEntryToApply ¶
MoreEntryToApply returns a boolean value indicating whether it is ok to queue more entries to apply.