rsm

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package rsm implements State Machines used in Dragonboat.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	EEHeaderSize uint8 = 1
	EEVersion    uint8 = 0 << 4
	EEV0         uint8 = 0 << 4

	// for V0 format, entries with empty payload will cause panic as such
	// entries always have their TYPE value set to ApplicationEntry
	EENoCompression uint8 = 0 << 1
	EESnappy        uint8 = 1 << 1

	EENoSession  uint8 = 0
	EEHasSession uint8 = 1
	// uncompressed size is binary.Uvarint encoded
	EEV0SizeOffset int = 1
)

Entry Cmd format when Type = pb.EncodedEntry

------------------------------------- |Version|CompressionFlag|SessionFlag| | 4Bits | 3Bits | 1Bit | -------------------------------------

View Source
const (
	// ChunkSize is the size of each snapshot chunk.
	ChunkSize = settings.SnapshotChunkSize
)
View Source
const (
	// EmptyClientSessionLength defines the length of an empty sessions instance.
	EmptyClientSessionLength uint64 = 16
)

Variables

View Source
var DefaultSSRequest = SSRequest{}

DefaultSSRequest is the default SSRequest.

View Source
var (
	// ErrRestoreSnapshot indicates there is error when trying to restore
	// from a snapshot
	ErrRestoreSnapshot = errors.New("failed to restore snapshot")
)
View Source
var (
	// ErrShardClosed indicates that the shard has been closed
	ErrShardClosed = errors.New("raft shard already closed")
)
View Source
var (
	// LRUMaxSessionCount is the largest number of client sessions that can be
	// concurrently managed by a LRUSession instance.
	LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount
)

Functions

func GetDefaultChecksum

func GetDefaultChecksum() hash.Hash

GetDefaultChecksum returns the default hash.Hash instance.

func GetEmptyLRUSession

func GetEmptyLRUSession() []byte

GetEmptyLRUSession returns an marshaled empty sessions instance.

func GetEncoded

func GetEncoded(ct dio.CompressionType, cmd []byte, dst []byte) []byte

GetEncoded returns the encoded payload using the specified compression type and the default encoded entry version.

func GetMaxBlockSize

func GetMaxBlockSize(ct config.CompressionType) uint64

GetMaxBlockSize returns the maximum block length supported by the specified compression type.

func GetPayload

func GetPayload(e pb.Entry) ([]byte, error)

GetPayload returns the payload of the entry ready to be applied into the state machine.

func GetV2PayloadChecksum

func GetV2PayloadChecksum(fp string, fs vfs.IFS) (crc []byte, err error)

GetV2PayloadChecksum calculates the payload checksum of the specified snapshot file.

func GetV2PayloadSize

func GetV2PayloadSize(sz uint64) uint64

GetV2PayloadSize returns the actual on disk size for the input user payload size.

func GetWitnessSnapshot

func GetWitnessSnapshot(fs vfs.IFS) (result []byte, err error)

GetWitnessSnapshot returns the content of a witness snapshot.

func IsShrunkSnapshotFile

func IsShrunkSnapshotFile(fp string, fs vfs.IFS) (shrunk bool, err error)

IsShrunkSnapshotFile returns a boolean flag indicating whether the specified snapshot file is already shrunk.

func ReplaceSnapshot

func ReplaceSnapshot(newFp string, fp string, fs vfs.IFS) error

ReplaceSnapshot replace the specified snapshot file with the shrunk version atomically.

func ShrinkSnapshot

func ShrinkSnapshot(fp string, newFp string, fs vfs.IFS) (err error)

ShrinkSnapshot shrinks the specified snapshot file and save the generated shrunk version to the path specified by newFp.

func ToDioType

ToDioType converts the CompressionType type defined in the config package to the CompressionType value defined in the dio package.

Types

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is a writer type that writes the input data to the underlying storage with checksum appended at the end of each block.

func NewBlockWriter

func NewBlockWriter(blockSize uint64,
	nb func(data []byte, crc []byte) error, t pb.ChecksumType) *BlockWriter

NewBlockWriter creates and returns a block writer.

func (*BlockWriter) Close

func (bw *BlockWriter) Close() error

Close closes the writer by passing all in memory buffered data to the underlying onNewBlock function.

func (*BlockWriter) GetPayloadChecksum

func (bw *BlockWriter) GetPayloadChecksum() []byte

GetPayloadChecksum returns the checksum for the entire payload.

func (*BlockWriter) Write

func (bw *BlockWriter) Write(bs []byte) (int, error)

Write writes the specified data using the block writer.

type ChunkWriter

type ChunkWriter struct {
	// contains filtered or unexported fields
}

ChunkWriter is an io.WriteCloser type that streams snapshot chunks to its intended remote nodes.

func NewChunkWriter

func NewChunkWriter(sink pb.IChunkSink, meta SSMeta) *ChunkWriter

NewChunkWriter creates and returns a chunk writer instance.

func (*ChunkWriter) Close

func (cw *ChunkWriter) Close() error

Close closes the chunk writer.

func (*ChunkWriter) Write

func (cw *ChunkWriter) Write(data []byte) (int, error)

Write writes the specified input data.

type ConcurrentStateMachine

type ConcurrentStateMachine struct {
	// contains filtered or unexported fields
}

ConcurrentStateMachine is an IStateMachine type capable of taking concurrent snapshots.

func NewConcurrentStateMachine

func NewConcurrentStateMachine(s sm.IConcurrentStateMachine) *ConcurrentStateMachine

NewConcurrentStateMachine creates a new ConcurrentStateMachine instance.

func (*ConcurrentStateMachine) Close

func (s *ConcurrentStateMachine) Close() error

Close closes the state machine.

func (*ConcurrentStateMachine) Concurrent

func (s *ConcurrentStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*ConcurrentStateMachine) GetHash

func (s *ConcurrentStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*ConcurrentStateMachine) Lookup

func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*ConcurrentStateMachine) NALookup

func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*ConcurrentStateMachine) OnDisk

func (s *ConcurrentStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is a on disk state machine.

func (*ConcurrentStateMachine) Open

func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*ConcurrentStateMachine) Prepare

func (s *ConcurrentStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*ConcurrentStateMachine) Recover

func (s *ConcurrentStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*ConcurrentStateMachine) Save

func (s *ConcurrentStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*ConcurrentStateMachine) Sync

func (s *ConcurrentStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*ConcurrentStateMachine) Type

Type returns the type of the state machine.

func (*ConcurrentStateMachine) Update

func (s *ConcurrentStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type Files

type Files struct {
	// contains filtered or unexported fields
}

Files is a collection of external files specified by the SaveSnapshot method of the state machine type.

func NewFileCollection

func NewFileCollection() *Files

NewFileCollection creates and returns a Files instance.

func (*Files) AddFile

func (fc *Files) AddFile(fileID uint64,
	path string, metadata []byte)

AddFile adds the specified file to the external file collection.

func (*Files) GetFileAt

func (fc *Files) GetFileAt(idx uint64) *pb.SnapshotFile

GetFileAt returns the specified file.

func (*Files) PrepareFiles

func (fc *Files) PrepareFiles(tmpdir string,
	finaldir string) ([]*pb.SnapshotFile, error)

PrepareFiles finalize the external files added to the collection.

func (*Files) Size

func (fc *Files) Size() uint64

Size returns the number of external files already added to the external file collection.

type IBlockWriter

type IBlockWriter interface {
	io.WriteCloser
	GetPayloadChecksum() []byte
}

IBlockWriter is the interface for writing checksumed data blocks.

type ILoadable

type ILoadable interface {
	LoadSessions(io.Reader, SSVersion) error
}

ILoadable is the interface for types that can load client session state from a snapshot.

type IManagedStateMachine

type IManagedStateMachine interface {
	Open() (uint64, error)
	Update(sm.Entry) (sm.Result, error)
	BatchedUpdate([]sm.Entry) ([]sm.Entry, error)
	Lookup(interface{}) (interface{}, error)
	ConcurrentLookup(interface{}) (interface{}, error)
	NALookup([]byte) ([]byte, error)
	NAConcurrentLookup([]byte) ([]byte, error)
	Sync() error
	GetHash() (uint64, error)
	Prepare() (interface{}, error)
	Save(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
	Recover(io.Reader, []sm.SnapshotFile) error
	Stream(interface{}, io.Writer) error
	Offloaded() bool
	Loaded()
	Close() error
	DestroyedC() <-chan struct{}
	Concurrent() bool
	OnDisk() bool
	Type() pb.StateMachineType
}

IManagedStateMachine is the interface used for managed state machine. A managed state machine contains a user state machine plus its engine state.

type INode

type INode interface {
	StepReady()
	RestoreRemotes(pb.Snapshot) error
	ApplyUpdate(pb.Entry, sm.Result, bool, bool, bool)
	ApplyConfigChange(pb.ConfigChange, uint64, bool) error
	ReplicaID() uint64
	ShardID() uint64
	ShouldStop() <-chan struct{}
}

INode is the interface of a dragonboat node.

type IRecoverable

type IRecoverable interface {
	Recover(io.Reader, []sm.SnapshotFile) error
}

IRecoverable is the interface for types that can have its state restored from snapshots.

type ISavable

type ISavable interface {
	Save(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
}

ISavable is the interface for types that can its content saved as snapshots.

type ISnapshotter

type ISnapshotter interface {
	GetSnapshot() (pb.Snapshot, error)
	Stream(IStreamable, SSMeta, pb.IChunkSink) error
	Shrunk(ss pb.Snapshot) (bool, error)
	Save(ISavable, SSMeta) (pb.Snapshot, SSEnv, error)
	Load(pb.Snapshot, ILoadable, IRecoverable) error
	IsNoSnapshotError(error) bool
}

ISnapshotter is the interface for the snapshotter object.

type IStateMachine

type IStateMachine interface {
	Open(<-chan struct{}) (uint64, error)
	Update(entries []sm.Entry) ([]sm.Entry, error)
	Lookup(query interface{}) (interface{}, error)
	NALookup(query []byte) ([]byte, error)
	Sync() error
	Prepare() (interface{}, error)
	Save(interface{},
		io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error
	Recover(io.Reader, []sm.SnapshotFile, <-chan struct{}) error
	Close() error
	GetHash() (uint64, error)
	Concurrent() bool
	OnDisk() bool
	Type() pb.StateMachineType
}

IStateMachine is an adapter interface for underlying sm.IStateMachine, sm.IConcurrentStateMachine and sm.IOnDIskStateMachine instances.

type IStreamable

type IStreamable interface {
	Stream(interface{}, io.Writer) error
}

IStreamable is the interface for types that can be snapshot streamed.

type ITestFS

type ITestFS interface {
	SetTestFS(fs config.IFS)
}

ITestFS is an interface implemented by test SMs.

type IVReader

type IVReader interface {
	Read(data []byte) (int, error)
	Sum() []byte
}

IVReader is the interface for versioned snapshot reader.

type IVValidator

type IVValidator interface {
	AddChunk(data []byte, chunkID uint64) bool
	Validate() bool
}

IVValidator is the interface for versioned validator.

type IVWriter

type IVWriter interface {
	io.WriteCloser
	GetVersion() SSVersion
	GetPayloadSum() []byte
	GetPayloadSize(uint64) uint64
}

IVWriter is the interface for versioned snapshot writer.

type InMemStateMachine

type InMemStateMachine struct {
	// contains filtered or unexported fields
}

InMemStateMachine is a regular state machine not capable of concurrent access from multiple goroutines.

func NewInMemStateMachine

func NewInMemStateMachine(s sm.IStateMachine) *InMemStateMachine

NewInMemStateMachine creates a new InMemStateMachine instance.

func (*InMemStateMachine) Close

func (i *InMemStateMachine) Close() error

Close closes the state machine.

func (*InMemStateMachine) Concurrent

func (i *InMemStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*InMemStateMachine) GetHash

func (i *InMemStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*InMemStateMachine) Lookup

func (i *InMemStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*InMemStateMachine) NALookup

func (i *InMemStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*InMemStateMachine) OnDisk

func (i *InMemStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is an on disk state machine.

func (*InMemStateMachine) Open

func (i *InMemStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*InMemStateMachine) Prepare

func (i *InMemStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*InMemStateMachine) Recover

func (i *InMemStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*InMemStateMachine) Save

func (i *InMemStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*InMemStateMachine) Sync

func (i *InMemStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*InMemStateMachine) Type

Type returns the type of the state machine.

func (*InMemStateMachine) Update

func (i *InMemStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type ManagedStateMachineFactory

type ManagedStateMachineFactory func(shardID uint64,
	replicaID uint64, stopc <-chan struct{}) IManagedStateMachine

ManagedStateMachineFactory is the factory function type for creating an IManagedStateMachine instance.

type NativeSM

type NativeSM struct {
	OffloadedStatus
	// contains filtered or unexported fields
}

NativeSM is the IManagedStateMachine object used to manage native data store in Golang.

func NewNativeSM

func NewNativeSM(config config.Config, ism IStateMachine,
	done <-chan struct{}) *NativeSM

NewNativeSM creates and returns a new NativeSM object.

func (*NativeSM) BatchedUpdate

func (ds *NativeSM) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error)

BatchedUpdate applies committed entries in a batch to hide latency.

func (*NativeSM) Close

func (ds *NativeSM) Close() error

Close closes the underlying user state machine and set the destroyed flag.

func (*NativeSM) Concurrent

func (ds *NativeSM) Concurrent() bool

Concurrent returns a boolean flag to indicate whether the managed state machine instance is capable of doing concurrent snapshots.

func (*NativeSM) ConcurrentLookup

func (ds *NativeSM) ConcurrentLookup(query interface{}) (interface{}, error)

ConcurrentLookup queries the data store without obtaining the NativeSM.mu.

func (*NativeSM) DestroyedC

func (ds *NativeSM) DestroyedC() <-chan struct{}

DestroyedC returns a chan struct{} used to indicate whether the SM has been fully offloaded.

func (*NativeSM) GetHash

func (ds *NativeSM) GetHash() (uint64, error)

GetHash returns an integer value representing the state of the data store.

func (*NativeSM) Loaded

func (ds *NativeSM) Loaded()

Loaded marks the statemachine as loaded by the specified component.

func (*NativeSM) Lookup

func (ds *NativeSM) Lookup(query interface{}) (interface{}, error)

Lookup queries the data store.

func (*NativeSM) NAConcurrentLookup

func (ds *NativeSM) NAConcurrentLookup(query []byte) ([]byte, error)

NAConcurrentLookup queries the data store without obtaining the NativeSM.mu.

func (*NativeSM) NALookup

func (ds *NativeSM) NALookup(query []byte) ([]byte, error)

NALookup queries the data store.

func (*NativeSM) Offloaded

func (ds *NativeSM) Offloaded() bool

Offloaded offloads the data store from a user component.

func (*NativeSM) OnDisk

func (ds *NativeSM) OnDisk() bool

OnDisk returns a boolean flag indicating whether the state machine is an on disk state machine.

func (*NativeSM) Open

func (ds *NativeSM) Open() (uint64, error)

Open opens on disk state machine.

func (*NativeSM) Prepare

func (ds *NativeSM) Prepare() (interface{}, error)

Prepare makes preparation for concurrently taking snapshot.

func (*NativeSM) Recover

func (ds *NativeSM) Recover(r io.Reader, files []sm.SnapshotFile) error

Recover recovers the state of the data store from the specified reader.

func (*NativeSM) Save

func (ds *NativeSM) Save(meta SSMeta,
	w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)

Save saves the state of the data store to the specified writer.

func (*NativeSM) Stream

func (ds *NativeSM) Stream(ctx interface{}, w io.Writer) error

Stream creates and streams snapshot to a remote node.

func (*NativeSM) Sync

func (ds *NativeSM) Sync() error

Sync synchronizes state machine's in-core state with that on disk.

func (*NativeSM) Type

func (ds *NativeSM) Type() pb.StateMachineType

Type returns the state machine type.

func (*NativeSM) Update

func (ds *NativeSM) Update(e sm.Entry) (sm.Result, error)

Update updates the data store.

type OffloadedStatus

type OffloadedStatus struct {
	DestroyedC chan struct{}
	// contains filtered or unexported fields
}

OffloadedStatus is used for tracking whether the managed data store has been offloaded from various system components.

func (*OffloadedStatus) Destroyed

func (o *OffloadedStatus) Destroyed() bool

Destroyed returns a boolean value indicating whether the belonging object has been destroyed.

func (*OffloadedStatus) SetDestroyed

func (o *OffloadedStatus) SetDestroyed()

SetDestroyed set the destroyed flag to be true

func (*OffloadedStatus) SetLoaded

func (o *OffloadedStatus) SetLoaded()

SetLoaded marks the managed data store as loaded by a user component.

func (*OffloadedStatus) SetOffloaded

func (o *OffloadedStatus) SetOffloaded() uint64

SetOffloaded marks the managed data store as offloaded from a user component.

type OnDiskStateMachine

type OnDiskStateMachine struct {
	// contains filtered or unexported fields
}

OnDiskStateMachine is the type to represent an on disk state machine.

func NewOnDiskStateMachine

func NewOnDiskStateMachine(s sm.IOnDiskStateMachine) *OnDiskStateMachine

NewOnDiskStateMachine creates and returns an on disk state machine.

func (*OnDiskStateMachine) Close

func (s *OnDiskStateMachine) Close() error

Close closes the state machine.

func (*OnDiskStateMachine) Concurrent

func (s *OnDiskStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*OnDiskStateMachine) GetHash

func (s *OnDiskStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*OnDiskStateMachine) Lookup

func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*OnDiskStateMachine) NALookup

func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*OnDiskStateMachine) OnDisk

func (s *OnDiskStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is an on disk state machine.

func (*OnDiskStateMachine) Open

func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*OnDiskStateMachine) Prepare

func (s *OnDiskStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*OnDiskStateMachine) Recover

func (s *OnDiskStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*OnDiskStateMachine) Save

func (s *OnDiskStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*OnDiskStateMachine) SetTestFS

func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)

SetTestFS injects the specified fs to the test SM.

func (*OnDiskStateMachine) Sync

func (s *OnDiskStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*OnDiskStateMachine) Type

Type returns the type of the state machine.

func (*OnDiskStateMachine) Update

func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type RaftClientID

type RaftClientID uint64

RaftClientID is the type used as client id in sessions.

func (*RaftClientID) Compare

func (a *RaftClientID) Compare(b llrb.Comparable) int

Compare implements the llrb.Comparable interface.

type RaftSeriesID

type RaftSeriesID uint64

RaftSeriesID is the type used as series id in sessions.

type SMFactoryFunc

type SMFactoryFunc func(shardID uint64,
	replicaID uint64, done <-chan struct{}) IManagedStateMachine

SMFactoryFunc is the function type for creating an IStateMachine instance

type SSEnv

type SSEnv = server.SSEnv

SSEnv is the snapshot environment type.

type SSMeta

type SSMeta struct {
	Membership      pb.Membership
	Ctx             interface{}
	Session         *bytes.Buffer
	Request         SSRequest
	From            uint64
	OnDiskIndex     uint64
	Index           uint64
	Term            uint64
	Type            pb.StateMachineType
	CompressionType config.CompressionType
}

SSMeta is the metadata of a snapshot.

type SSReqType

type SSReqType uint64

SSReqType is the type of a snapshot request.

const (
	// Periodic is the value to indicate periodic snapshot.
	Periodic SSReqType = iota
	// UserRequested is the value to indicate user requested snapshot.
	UserRequested
	// Exported is the value to indicate exported snapshot.
	Exported
	// Streaming is the value to indicate snapshot streaming.
	Streaming
)

type SSRequest

type SSRequest struct {
	Path               string
	Type               SSReqType
	Key                uint64
	CompactionOverhead uint64
	CompactionIndex    uint64
	OverrideCompaction bool
}

SSRequest contains details of a snapshot request.

func (*SSRequest) Exported

func (r *SSRequest) Exported() bool

Exported returns a boolean value indicating whether the snapshot request is to create an exported snapshot.

func (*SSRequest) Streaming

func (r *SSRequest) Streaming() bool

Streaming returns a boolean value indicating whether the snapshot request is to stream snapshot.

type SSVersion

type SSVersion uint64

SSVersion is the snapshot version value type.

const (
	// V1 is the value of snapshot version 1.
	V1 SSVersion = 1
	// V2 is the value of snapshot version 2.
	V2 SSVersion = 2
	// DefaultVersion is the snapshot binary format version.
	DefaultVersion SSVersion = V2
	// HeaderSize is the size of snapshot in number of bytes.
	HeaderSize = settings.SnapshotHeaderSize

	// DefaultChecksumType is the default checksum type.
	DefaultChecksumType = defaultChecksumType
)

type Session

type Session struct {
	History       map[RaftSeriesID]sm.Result
	ClientID      RaftClientID
	RespondedUpTo RaftSeriesID
}

Session is the session object maintained on the raft side.

func (*Session) AddResponse

func (s *Session) AddResponse(id RaftSeriesID, result sm.Result)

AddResponse adds a response.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager is the wrapper struct that implements client session related functionalities used in the IManagedStateMachine interface.

func NewSessionManager

func NewSessionManager() *SessionManager

NewSessionManager returns a new SessionManager instance.

func (*SessionManager) AddResponse

func (ds *SessionManager) AddResponse(session *Session,
	seriesID uint64, result sm.Result)

AddResponse adds the specified result to the session.

func (*SessionManager) ClientRegistered

func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)

ClientRegistered returns whether the specified client exists in the system.

func (*SessionManager) GetSessionHash

func (ds *SessionManager) GetSessionHash() uint64

GetSessionHash returns an uint64 integer representing the state of the session manager.

func (*SessionManager) LoadSessions

func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error

LoadSessions loads and restores sessions from io.Reader.

func (*SessionManager) MustHaveClientSeries

func (ds *SessionManager) MustHaveClientSeries(session *Session,
	seriesID uint64)

MustHaveClientSeries checks whether the session manager contains a client session identified as clientID and whether it has seriesID responded.

func (*SessionManager) RegisterClientID

func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result

RegisterClientID registers a new client, it returns the input client id if it is previously unknown, or 0 when the client has already been registered.

func (*SessionManager) SaveSessions

func (ds *SessionManager) SaveSessions(writer io.Writer) error

SaveSessions saves the sessions to the provided io.writer.

func (*SessionManager) UnregisterClientID

func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result

UnregisterClientID removes the specified client session from the system. It returns the client id if the client is successfully removed, or 0 if the client session does not exist.

func (*SessionManager) UpdateRequired

func (ds *SessionManager) UpdateRequired(session *Session,
	seriesID uint64) (sm.Result, bool, bool)

UpdateRequired return a tuple of request result, responded before, update required.

func (*SessionManager) UpdateRespondedTo

func (ds *SessionManager) UpdateRespondedTo(session *Session,
	respondedTo uint64)

UpdateRespondedTo updates the responded to value of the specified client session.

type SnapshotReader

type SnapshotReader struct {
	// contains filtered or unexported fields
}

SnapshotReader is an io.Reader for reading from snapshot files.

func NewSnapshotReader

func NewSnapshotReader(fp string,
	fs vfs.IFS) (*SnapshotReader, pb.SnapshotHeader, error)

NewSnapshotReader creates a new snapshot reader instance.

func (*SnapshotReader) Close

func (sr *SnapshotReader) Close() error

Close closes the snapshot reader instance.

func (*SnapshotReader) Read

func (sr *SnapshotReader) Read(data []byte) (int, error)

Read reads up to len(data) bytes from the snapshot file.

type SnapshotValidator

type SnapshotValidator struct {
	// contains filtered or unexported fields
}

SnapshotValidator is the validator used to check incoming snapshot chunks.

func NewSnapshotValidator

func NewSnapshotValidator() *SnapshotValidator

NewSnapshotValidator creates and returns a new SnapshotValidator instance.

func (*SnapshotValidator) AddChunk

func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool

AddChunk adds a new snapshot chunk to the validator.

func (*SnapshotValidator) Validate

func (v *SnapshotValidator) Validate() bool

Validate validates the added chunks and return a boolean flag indicating whether the snapshot chunks are valid.

type SnapshotWriter

type SnapshotWriter struct {
	// contains filtered or unexported fields
}

SnapshotWriter is an io.Writer used to write snapshot file.

func NewSnapshotWriter

func NewSnapshotWriter(fp string,
	ct pb.CompressionType, fs vfs.IFS) (*SnapshotWriter, error)

NewSnapshotWriter creates a new snapshot writer instance.

func (*SnapshotWriter) Close

func (sw *SnapshotWriter) Close() (err error)

Close closes the snapshot writer instance.

func (*SnapshotWriter) GetPayloadChecksum

func (sw *SnapshotWriter) GetPayloadChecksum() []byte

GetPayloadChecksum returns the payload checksum.

func (*SnapshotWriter) GetPayloadSize

func (sw *SnapshotWriter) GetPayloadSize(sz uint64) uint64

GetPayloadSize returns the payload size.

func (*SnapshotWriter) Write

func (sw *SnapshotWriter) Write(data []byte) (int, error)

Write writes the specified data to the snapshot.

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

StateMachine is the state machine component in the replicated state machine scheme.

func NewStateMachine

func NewStateMachine(sm IManagedStateMachine,
	snapshotter ISnapshotter,
	cfg config.Config, node INode, fs vfs.IFS) *StateMachine

NewStateMachine creates a new application state machine object.

func (*StateMachine) Close

func (s *StateMachine) Close() error

Close closes the state machine.

func (*StateMachine) Concurrent

func (s *StateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*StateMachine) DestroyedC

func (s *StateMachine) DestroyedC() <-chan struct{}

DestroyedC return a chan struct{} used to indicate whether the SM has been fully unloaded.

func (*StateMachine) GetHash

func (s *StateMachine) GetHash() (uint64, error)

GetHash returns the state machine hash.

func (*StateMachine) GetLastApplied

func (s *StateMachine) GetLastApplied() uint64

GetLastApplied returns the last applied value.

func (*StateMachine) GetMembership

func (s *StateMachine) GetMembership() pb.Membership

GetMembership returns the membership info maintained by the state machine.

func (*StateMachine) GetMembershipHash

func (s *StateMachine) GetMembershipHash() uint64

GetMembershipHash returns the hash of the membership instance.

func (*StateMachine) GetSessionHash

func (s *StateMachine) GetSessionHash() uint64

GetSessionHash returns the session hash.

func (*StateMachine) GetSyncedIndex

func (s *StateMachine) GetSyncedIndex() uint64

GetSyncedIndex returns the index value that is known to have been synchronized.

func (*StateMachine) Handle

func (s *StateMachine) Handle(batch []Task, apply []sm.Entry) (Task, error)

Handle pulls the committed record and apply it if there is any available.

func (*StateMachine) Loaded

func (s *StateMachine) Loaded()

Loaded marks the state machine as loaded from the specified compone.

func (*StateMachine) Lookup

func (s *StateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the local state machine.

func (*StateMachine) NALookup

func (s *StateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the local state machine.

func (*StateMachine) Offloaded

func (s *StateMachine) Offloaded() bool

Offloaded marks the state machine as offloaded from the specified compone. It returns a boolean value indicating whether the node has been fully unloaded after unloading from the specified compone.

func (*StateMachine) OnDiskStateMachine

func (s *StateMachine) OnDiskStateMachine() bool

OnDiskStateMachine returns a boolean flag indicating whether it is an on disk state machine.

func (*StateMachine) OpenOnDiskStateMachine

func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)

OpenOnDiskStateMachine opens the on disk state machine.

func (*StateMachine) ReadyToStream

func (s *StateMachine) ReadyToStream() bool

ReadyToStream returns a boolean flag to indicate whether the state machine is ready to stream snapshot. It can not stream a full snapshot when membership state is catching up with the all disk SM state. Meta only snapshot can be taken at any time.

func (*StateMachine) Recover

func (s *StateMachine) Recover(t Task) (_ pb.Snapshot, err error)

Recover applies the snapshot.

func (*StateMachine) Save

func (s *StateMachine) Save(req SSRequest) (pb.Snapshot, SSEnv, error)

Save creates a snapshot.

func (*StateMachine) SetLastApplied

func (s *StateMachine) SetLastApplied(index uint64)

SetLastApplied sets the last applied index to the specified value. This method is only used in tests.

func (*StateMachine) Stream

func (s *StateMachine) Stream(sink pb.IChunkSink) error

Stream starts to stream snapshot from the current SM to a remote node targeted by the provided sink.

func (*StateMachine) Sync

func (s *StateMachine) Sync() error

Sync synchronizes state machine's in-core state with that on disk.

func (*StateMachine) TaskChanBusy

func (s *StateMachine) TaskChanBusy() bool

TaskChanBusy returns whether the TaskC chan is busy. Busy is defined as having more than half of its buffer occupied.

func (*StateMachine) TaskQ

func (s *StateMachine) TaskQ() *TaskQueue

TaskQ returns the task queue.

func (*StateMachine) Type

func (s *StateMachine) Type() pb.StateMachineType

Type returns the state machine type.

type Task

type Task struct {
	Entries      []pb.Entry
	SSRequest    SSRequest
	ShardID      uint64
	ReplicaID    uint64
	Index        uint64
	Save         bool
	Stream       bool
	PeriodicSync bool
	NewNode      bool
	Recover      bool
	Initial      bool
}

Task describes a task that need to be handled by StateMachine.

func (*Task) IsSnapshotTask

func (t *Task) IsSnapshotTask() bool

IsSnapshotTask returns a boolean flag indicating whether it is a snapshot task.

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue is a queue of tasks to be processed by the state machine.

func NewTaskQueue

func NewTaskQueue() *TaskQueue

NewTaskQueue creates and returns a new task queue.

func (*TaskQueue) Add

func (tq *TaskQueue) Add(task Task)

Add adds a new task to the queue.

func (*TaskQueue) Get

func (tq *TaskQueue) Get() (Task, bool)

Get returns a task from the queue if there is any.

func (*TaskQueue) GetAll

func (tq *TaskQueue) GetAll() []Task

GetAll returns all tasks currently in the queue.

func (*TaskQueue) MoreEntryToApply

func (tq *TaskQueue) MoreEntryToApply() bool

MoreEntryToApply returns a boolean value indicating whether it is ok to queue more entries to apply.

func (*TaskQueue) Size

func (tq *TaskQueue) Size() uint64

Size returns the number of queued tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL