Versions in this module Expand all Collapse all v1 v1.0.0 Jul 19, 2024 Changes in this version + const ChunkSize + const EEHasSession + const EEHeaderSize + const EENoCompression + const EENoSession + const EESnappy + const EEV0 + const EEV0SizeOffset + const EEVersion + const EmptyClientSessionLength + var DefaultSSRequest = SSRequest + var ErrRestoreSnapshot = errors.New("failed to restore snapshot") + var ErrShardClosed = errors.New("raft shard already closed") + var LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount + func GetDefaultChecksum() hash.Hash + func GetEmptyLRUSession() []byte + func GetEncoded(ct dio.CompressionType, cmd []byte, dst []byte) []byte + func GetMaxBlockSize(ct config.CompressionType) uint64 + func GetPayload(e pb.Entry) ([]byte, error) + func GetV2PayloadChecksum(fp string, fs vfs.IFS) (crc []byte, err error) + func GetV2PayloadSize(sz uint64) uint64 + func GetWitnessSnapshot(fs vfs.IFS) (result []byte, err error) + func IsShrunkSnapshotFile(fp string, fs vfs.IFS) (shrunk bool, err error) + func ReplaceSnapshot(newFp string, fp string, fs vfs.IFS) error + func ShrinkSnapshot(fp string, newFp string, fs vfs.IFS) (err error) + func ToDioType(ct config.CompressionType) dio.CompressionType + type BlockWriter struct + func NewBlockWriter(blockSize uint64, nb func(data []byte, crc []byte) error, t pb.ChecksumType) *BlockWriter + func (bw *BlockWriter) Close() error + func (bw *BlockWriter) GetPayloadChecksum() []byte + func (bw *BlockWriter) Write(bs []byte) (int, error) + type ChunkWriter struct + func NewChunkWriter(sink pb.IChunkSink, meta SSMeta) *ChunkWriter + func (cw *ChunkWriter) Close() error + func (cw *ChunkWriter) Write(data []byte) (int, error) + type ConcurrentStateMachine struct + func NewConcurrentStateMachine(s sm.IConcurrentStateMachine) *ConcurrentStateMachine + func (s *ConcurrentStateMachine) Close() error + func (s *ConcurrentStateMachine) Concurrent() bool + func (s *ConcurrentStateMachine) GetHash() (uint64, error) + func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error) + func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error) + func (s *ConcurrentStateMachine) OnDisk() bool + func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error) + func (s *ConcurrentStateMachine) Prepare() (interface{}, error) + func (s *ConcurrentStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error + func (s *ConcurrentStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error + func (s *ConcurrentStateMachine) Sync() error + func (s *ConcurrentStateMachine) Type() pb.StateMachineType + func (s *ConcurrentStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error) + type Files struct + func NewFileCollection() *Files + func (fc *Files) AddFile(fileID uint64, path string, metadata []byte) + func (fc *Files) GetFileAt(idx uint64) *pb.SnapshotFile + func (fc *Files) PrepareFiles(tmpdir string, finaldir string) ([]*pb.SnapshotFile, error) + func (fc *Files) Size() uint64 + type IBlockWriter interface + GetPayloadChecksum func() []byte + type ILoadable interface + LoadSessions func(io.Reader, SSVersion) error + type IManagedStateMachine interface + BatchedUpdate func([]sm.Entry) ([]sm.Entry, error) + Close func() error + Concurrent func() bool + ConcurrentLookup func(interface{}) (interface{}, error) + DestroyedC func() <-chan struct{} + GetHash func() (uint64, error) + Loaded func() + Lookup func(interface{}) (interface{}, error) + NAConcurrentLookup func([]byte) ([]byte, error) + NALookup func([]byte) ([]byte, error) + Offloaded func() bool + OnDisk func() bool + Open func() (uint64, error) + Prepare func() (interface{}, error) + Recover func(io.Reader, []sm.SnapshotFile) error + Save func(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) + Stream func(interface{}, io.Writer) error + Sync func() error + Type func() pb.StateMachineType + Update func(sm.Entry) (sm.Result, error) + type INode interface + ApplyConfigChange func(pb.ConfigChange, uint64, bool) error + ApplyUpdate func(pb.Entry, sm.Result, bool, bool, bool) + ReplicaID func() uint64 + RestoreRemotes func(pb.Snapshot) error + ShardID func() uint64 + ShouldStop func() <-chan struct{} + StepReady func() + type IRecoverable interface + Recover func(io.Reader, []sm.SnapshotFile) error + type ISavable interface + Save func(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) + type ISnapshotter interface + GetSnapshot func() (pb.Snapshot, error) + IsNoSnapshotError func(error) bool + Load func(pb.Snapshot, ILoadable, IRecoverable) error + Save func(ISavable, SSMeta) (pb.Snapshot, SSEnv, error) + Shrunk func(ss pb.Snapshot) (bool, error) + Stream func(IStreamable, SSMeta, pb.IChunkSink) error + type IStateMachine interface + Close func() error + Concurrent func() bool + GetHash func() (uint64, error) + Lookup func(query interface{}) (interface{}, error) + NALookup func(query []byte) ([]byte, error) + OnDisk func() bool + Open func(<-chan struct{}) (uint64, error) + Prepare func() (interface{}, error) + Recover func(io.Reader, []sm.SnapshotFile, <-chan struct{}) error + Save func(interface{}, io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error + Sync func() error + Type func() pb.StateMachineType + Update func(entries []sm.Entry) ([]sm.Entry, error) + type IStreamable interface + Stream func(interface{}, io.Writer) error + type ITestFS interface + SetTestFS func(fs config.IFS) + type IVReader interface + Read func(data []byte) (int, error) + Sum func() []byte + type IVValidator interface + AddChunk func(data []byte, chunkID uint64) bool + Validate func() bool + type IVWriter interface + GetPayloadSize func(uint64) uint64 + GetPayloadSum func() []byte + GetVersion func() SSVersion + type InMemStateMachine struct + func NewInMemStateMachine(s sm.IStateMachine) *InMemStateMachine + func (i *InMemStateMachine) Close() error + func (i *InMemStateMachine) Concurrent() bool + func (i *InMemStateMachine) GetHash() (uint64, error) + func (i *InMemStateMachine) Lookup(query interface{}) (interface{}, error) + func (i *InMemStateMachine) NALookup(query []byte) ([]byte, error) + func (i *InMemStateMachine) OnDisk() bool + func (i *InMemStateMachine) Open(stopc <-chan struct{}) (uint64, error) + func (i *InMemStateMachine) Prepare() (interface{}, error) + func (i *InMemStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error + func (i *InMemStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error + func (i *InMemStateMachine) Sync() error + func (i *InMemStateMachine) Type() pb.StateMachineType + func (i *InMemStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error) + type ManagedStateMachineFactory func(shardID uint64, replicaID uint64, stopc <-chan struct{}) IManagedStateMachine + type NativeSM struct + func NewNativeSM(config config.Config, ism IStateMachine, done <-chan struct{}) *NativeSM + func (ds *NativeSM) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error) + func (ds *NativeSM) Close() error + func (ds *NativeSM) Concurrent() bool + func (ds *NativeSM) ConcurrentLookup(query interface{}) (interface{}, error) + func (ds *NativeSM) DestroyedC() <-chan struct{} + func (ds *NativeSM) GetHash() (uint64, error) + func (ds *NativeSM) Loaded() + func (ds *NativeSM) Lookup(query interface{}) (interface{}, error) + func (ds *NativeSM) NAConcurrentLookup(query []byte) ([]byte, error) + func (ds *NativeSM) NALookup(query []byte) ([]byte, error) + func (ds *NativeSM) Offloaded() bool + func (ds *NativeSM) OnDisk() bool + func (ds *NativeSM) Open() (uint64, error) + func (ds *NativeSM) Prepare() (interface{}, error) + func (ds *NativeSM) Recover(r io.Reader, files []sm.SnapshotFile) error + func (ds *NativeSM) Save(meta SSMeta, w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error) + func (ds *NativeSM) Stream(ctx interface{}, w io.Writer) error + func (ds *NativeSM) Sync() error + func (ds *NativeSM) Type() pb.StateMachineType + func (ds *NativeSM) Update(e sm.Entry) (sm.Result, error) + type OffloadedStatus struct + DestroyedC chan struct{} + func (o *OffloadedStatus) Destroyed() bool + func (o *OffloadedStatus) SetDestroyed() + func (o *OffloadedStatus) SetLoaded() + func (o *OffloadedStatus) SetOffloaded() uint64 + type OnDiskStateMachine struct + func NewOnDiskStateMachine(s sm.IOnDiskStateMachine) *OnDiskStateMachine + func (s *OnDiskStateMachine) Close() error + func (s *OnDiskStateMachine) Concurrent() bool + func (s *OnDiskStateMachine) GetHash() (uint64, error) + func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error) + func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error) + func (s *OnDiskStateMachine) OnDisk() bool + func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error) + func (s *OnDiskStateMachine) Prepare() (interface{}, error) + func (s *OnDiskStateMachine) Recover(r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error + func (s *OnDiskStateMachine) Save(ctx interface{}, w io.Writer, fc sm.ISnapshotFileCollection, ...) error + func (s *OnDiskStateMachine) SetTestFS(fs config.IFS) + func (s *OnDiskStateMachine) Sync() error + func (s *OnDiskStateMachine) Type() pb.StateMachineType + func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error) + type RaftClientID uint64 + func (a *RaftClientID) Compare(b llrb.Comparable) int + type RaftSeriesID uint64 + type SMFactoryFunc func(shardID uint64, replicaID uint64, done <-chan struct{}) IManagedStateMachine + type SSEnv = server.SSEnv + type SSMeta struct + CompressionType config.CompressionType + Ctx interface{} + From uint64 + Index uint64 + Membership pb.Membership + OnDiskIndex uint64 + Request SSRequest + Session *bytes.Buffer + Term uint64 + Type pb.StateMachineType + type SSReqType uint64 + const Exported + const Periodic + const Streaming + const UserRequested + type SSRequest struct + CompactionIndex uint64 + CompactionOverhead uint64 + Key uint64 + OverrideCompaction bool + Path string + Type SSReqType + func (r *SSRequest) Exported() bool + func (r *SSRequest) Streaming() bool + type SSVersion uint64 + const DefaultChecksumType + const DefaultVersion + const HeaderSize + const V1 + const V2 + type Session struct + ClientID RaftClientID + History map[RaftSeriesID]sm.Result + RespondedUpTo RaftSeriesID + func (s *Session) AddResponse(id RaftSeriesID, result sm.Result) + type SessionManager struct + func NewSessionManager() *SessionManager + func (ds *SessionManager) AddResponse(session *Session, seriesID uint64, result sm.Result) + func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool) + func (ds *SessionManager) GetSessionHash() uint64 + func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error + func (ds *SessionManager) MustHaveClientSeries(session *Session, seriesID uint64) + func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result + func (ds *SessionManager) SaveSessions(writer io.Writer) error + func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result + func (ds *SessionManager) UpdateRequired(session *Session, seriesID uint64) (sm.Result, bool, bool) + func (ds *SessionManager) UpdateRespondedTo(session *Session, respondedTo uint64) + type SnapshotReader struct + func NewSnapshotReader(fp string, fs vfs.IFS) (*SnapshotReader, pb.SnapshotHeader, error) + func (sr *SnapshotReader) Close() error + func (sr *SnapshotReader) Read(data []byte) (int, error) + type SnapshotValidator struct + func NewSnapshotValidator() *SnapshotValidator + func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool + func (v *SnapshotValidator) Validate() bool + type SnapshotWriter struct + func NewSnapshotWriter(fp string, ct pb.CompressionType, fs vfs.IFS) (*SnapshotWriter, error) + func (sw *SnapshotWriter) Close() (err error) + func (sw *SnapshotWriter) GetPayloadChecksum() []byte + func (sw *SnapshotWriter) GetPayloadSize(sz uint64) uint64 + func (sw *SnapshotWriter) Write(data []byte) (int, error) + type StateMachine struct + func NewStateMachine(sm IManagedStateMachine, snapshotter ISnapshotter, cfg config.Config, ...) *StateMachine + func (s *StateMachine) Close() error + func (s *StateMachine) Concurrent() bool + func (s *StateMachine) DestroyedC() <-chan struct{} + func (s *StateMachine) GetHash() (uint64, error) + func (s *StateMachine) GetLastApplied() uint64 + func (s *StateMachine) GetMembership() pb.Membership + func (s *StateMachine) GetMembershipHash() uint64 + func (s *StateMachine) GetSessionHash() uint64 + func (s *StateMachine) GetSyncedIndex() uint64 + func (s *StateMachine) Handle(batch []Task, apply []sm.Entry) (Task, error) + func (s *StateMachine) Loaded() + func (s *StateMachine) Lookup(query interface{}) (interface{}, error) + func (s *StateMachine) NALookup(query []byte) ([]byte, error) + func (s *StateMachine) Offloaded() bool + func (s *StateMachine) OnDiskStateMachine() bool + func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error) + func (s *StateMachine) ReadyToStream() bool + func (s *StateMachine) Recover(t Task) (_ pb.Snapshot, err error) + func (s *StateMachine) Save(req SSRequest) (pb.Snapshot, SSEnv, error) + func (s *StateMachine) SetLastApplied(index uint64) + func (s *StateMachine) Stream(sink pb.IChunkSink) error + func (s *StateMachine) Sync() error + func (s *StateMachine) TaskChanBusy() bool + func (s *StateMachine) TaskQ() *TaskQueue + func (s *StateMachine) Type() pb.StateMachineType + type Task struct + Entries []pb.Entry + Index uint64 + Initial bool + NewNode bool + PeriodicSync bool + Recover bool + ReplicaID uint64 + SSRequest SSRequest + Save bool + ShardID uint64 + Stream bool + func (t *Task) IsSnapshotTask() bool + type TaskQueue struct + func NewTaskQueue() *TaskQueue + func (tq *TaskQueue) Add(task Task) + func (tq *TaskQueue) Get() (Task, bool) + func (tq *TaskQueue) GetAll() []Task + func (tq *TaskQueue) MoreEntryToApply() bool + func (tq *TaskQueue) Size() uint64 Other modules containing this package github.com/foreeest/dragonboat/v2