rsm

package
v3.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2020 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package rsm implements Replicated State Machines used in Dragonboat.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	EEHeaderSize uint8 = 1
	EEVersion    uint8 = 0 << 4
	EEV0         uint8 = 0 << 4

	// for V0 format, entries with empty payload will cause panic as such
	// entries always have their TYPE value set to ApplicationEntry
	EENoCompression uint8 = 0 << 1
	EESnappy        uint8 = 1 << 1

	EENoSession  uint8 = 0
	EEHasSession uint8 = 1
	// uncompressed size is binary.Uvarint encoded
	EEV0SizeOffset int = 1
)

Entry Cmd format when Type = pb.EncodedEntry

|Version|CompressionFlag|SessionFlag| | 4Bits | 3Bits | 1Bit |

View Source
const (
	// ChunkSize is the size of each snapshot chunk.
	ChunkSize = settings.SnapshotChunkSize
)
View Source
const (
	// EmptyClientSessionLength defines the length of an empty sessions instance.
	EmptyClientSessionLength uint64 = 16
)

Variables

View Source
var (
	// ErrTestKnobReturn is the error returned when returned earlier due to test
	// knob.
	ErrTestKnobReturn = errors.New("returned earlier due to test knob")
	// ErrRestoreSnapshot indicates there is error when trying to restore
	// from a snapshot
	ErrRestoreSnapshot = errors.New("failed to restore from snapshot")
)
View Source
var (
	// ErrClusterClosed indicates that the cluster has been closed
	ErrClusterClosed = errors.New("raft cluster already closed")
)
View Source
var (
	// LRUMaxSessionCount is the largest number of client sessions that can be
	// concurrently managed by a LRUSession instance.
	LRUMaxSessionCount = settings.Hard.LRUMaxSessionCount
)

Functions

func GetDefaultChecksum

func GetDefaultChecksum() hash.Hash

GetDefaultChecksum returns the default hash.Hash instance.

func GetEmptyLRUSession

func GetEmptyLRUSession() []byte

GetEmptyLRUSession returns an marshaled empty sessions instance.

func GetEncodedPayload

func GetEncodedPayload(ct dio.CompressionType, cmd []byte, dst []byte) []byte

GetEncodedPayload returns the encoded payload using the specified compression type and the default encoded entry version.

func GetEntryPayload

func GetEntryPayload(e pb.Entry) []byte

GetEntryPayload returns the payload of the entry ready to be applied into the state machine.

func GetMaxBlockSize

func GetMaxBlockSize(ct config.CompressionType) uint64

GetMaxBlockSize returns the maximum block length supported by the specified compression type.

func GetV2PayloadChecksum

func GetV2PayloadChecksum(fp string, fs vfs.IFS) (crc []byte, err error)

GetV2PayloadChecksum calculates the payload checksum of the specified snapshot file.

func GetV2PayloadSize

func GetV2PayloadSize(sz uint64) uint64

GetV2PayloadSize returns the actual on disk size for the input user payload size.

func GetWitnessSnapshot

func GetWitnessSnapshot(fs vfs.IFS) ([]byte, error)

GetWitnessSnapshot returns the content of a witness snapshot.

func IsShrinkedSnapshotFile

func IsShrinkedSnapshotFile(fp string, fs vfs.IFS) (shrunk bool, err error)

IsShrinkedSnapshotFile returns a boolean flag indicating whether the specified snapshot file is already shrunk.

func ReplaceSnapshotFile

func ReplaceSnapshotFile(newFp string, fp string, fs vfs.IFS) error

ReplaceSnapshotFile replace the specified snapshot file with the shrunk version atomically.

func ShrinkSnapshot

func ShrinkSnapshot(fp string, newFp string, fs vfs.IFS) (err error)

ShrinkSnapshot shrinks the specified snapshot file and save the generated shrunk version to the path specified by newFp.

func ToDioCompressionType

func ToDioCompressionType(ct config.CompressionType) dio.CompressionType

ToDioCompressionType converts the CompressionType type defined in the config package to the CompressionType value defined in the dio package.

Types

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is a writer type that writes the input data to the underlying storage with checksum appended at the end of each block.

func NewBlockWriter

func NewBlockWriter(blockSize uint64,
	onNewBlock func(data []byte, crc []byte) error,
	t pb.ChecksumType) *BlockWriter

NewBlockWriter creates and returns a block writer.

func (*BlockWriter) Flush

func (bw *BlockWriter) Flush() error

Flush writes all in memory buffered data.

func (*BlockWriter) GetPayloadChecksum

func (bw *BlockWriter) GetPayloadChecksum() []byte

GetPayloadChecksum returns the checksum for the entire payload.

func (*BlockWriter) Write

func (bw *BlockWriter) Write(bs []byte) (int, error)

Write writes the specified data using the block writer.

type ChunkWriter

type ChunkWriter struct {
	// contains filtered or unexported fields
}

ChunkWriter is an io.WriteCloser type that streams snapshot chunks to its intended remote nodes.

func NewChunkWriter

func NewChunkWriter(sink pb.IChunkSink, meta *SSMeta) *ChunkWriter

NewChunkWriter creates and returns a chunk writer instance.

func (*ChunkWriter) Close

func (cw *ChunkWriter) Close() error

Close closes the chunk writer.

func (*ChunkWriter) Write

func (cw *ChunkWriter) Write(data []byte) (int, error)

Write writes the specified input data.

type ConcurrentStateMachine

type ConcurrentStateMachine struct {
	// contains filtered or unexported fields
}

ConcurrentStateMachine is an IStateMachine type capable of taking concurrent snapshots.

func NewConcurrentStateMachine

func NewConcurrentStateMachine(s sm.IConcurrentStateMachine) *ConcurrentStateMachine

NewConcurrentStateMachine creates a new ConcurrentStateMachine instance.

func (*ConcurrentStateMachine) Close

func (s *ConcurrentStateMachine) Close() error

Close closes the state machine.

func (*ConcurrentStateMachine) Concurrent

func (s *ConcurrentStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*ConcurrentStateMachine) GetHash

func (s *ConcurrentStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*ConcurrentStateMachine) Lookup

func (s *ConcurrentStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*ConcurrentStateMachine) NALookup

func (s *ConcurrentStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*ConcurrentStateMachine) OnDisk

func (s *ConcurrentStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is a on disk state machine.

func (*ConcurrentStateMachine) Open

func (s *ConcurrentStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*ConcurrentStateMachine) Prepare

func (s *ConcurrentStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*ConcurrentStateMachine) Recover

func (s *ConcurrentStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*ConcurrentStateMachine) Save

func (s *ConcurrentStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*ConcurrentStateMachine) Sync

func (s *ConcurrentStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*ConcurrentStateMachine) Type

StateMachineType returns the type of the state machine.

func (*ConcurrentStateMachine) Update

func (s *ConcurrentStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type Files

type Files struct {
	// contains filtered or unexported fields
}

Files is a collection of external files specified by the SaveSnapshot method of the state machine type.

func NewFileCollection

func NewFileCollection() *Files

NewFileCollection creates and returns a Files instance.

func (*Files) AddFile

func (fc *Files) AddFile(fileID uint64,
	path string, metadata []byte)

AddFile adds the specified file to the external file collection.

func (*Files) GetFileAt

func (fc *Files) GetFileAt(idx uint64) *pb.SnapshotFile

GetFileAt returns the specified file.

func (*Files) PrepareFiles

func (fc *Files) PrepareFiles(tmpdir string,
	finaldir string) ([]*pb.SnapshotFile, error)

PrepareFiles finalize the external files added to the collection.

func (*Files) Size

func (fc *Files) Size() uint64

Size returns the number of external files already added to the external file collection.

type From

type From uint64

From identifies a component in the system.

const (
	// FromNodeHost indicates the data store has been loaded by or offloaded from
	// nodehost.
	FromNodeHost From = iota
	// FromStepWorker indicates that the data store has been loaded by or
	// offloaded from the step worker.
	FromStepWorker
	// FromCommitWorker indicates that the data store has been loaded by or
	// offloaded from the commit worker.
	FromCommitWorker
	// FromApplyWorker indicates that the data store has been loaded by or
	// offloaded from the apply worker.
	FromApplyWorker
	// FromSnapshotWorker indicates that the data store has been loaded by or
	// offloaded from the snapshot worker.
	FromSnapshotWorker
)

func (From) String

func (f From) String() string

type IBlockWriter

type IBlockWriter interface {
	Write(bs []byte) (int, error)
	Flush() error
	GetPayloadChecksum() []byte
}

IBlockWriter is the interface for writing checksumed data blocks.

type ILoadable

type ILoadable interface {
	LoadSessions(io.Reader, SSVersion) error
}

ILoadable is the interface for types that can load client session state from a snapshot.

type IManagedStateMachine

type IManagedStateMachine interface {
	Open() (uint64, error)
	Update(sm.Entry) (sm.Result, error)
	BatchedUpdate([]sm.Entry) ([]sm.Entry, error)
	Lookup(interface{}) (interface{}, error)
	NALookup([]byte) ([]byte, error)
	Sync() error
	GetHash() (uint64, error)
	Prepare() (interface{}, error)
	Save(*SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
	Recover(io.Reader, []sm.SnapshotFile) error
	Stream(interface{}, io.Writer) error
	Offloaded(From) bool
	Loaded(From)
	DestroyedC() <-chan struct{}
	Concurrent() bool
	OnDisk() bool
	Type() pb.StateMachineType
}

IManagedStateMachine is the interface used to manage data store.

func NewNativeSM

func NewNativeSM(config config.Config, ism IStateMachine,
	done <-chan struct{}) IManagedStateMachine

NewNativeSM creates and returns a new NativeSM object.

type INode

type INode interface {
	StepReady()
	RestoreRemotes(pb.Snapshot)
	ApplyUpdate(pb.Entry, sm.Result, bool, bool, bool)
	ApplyConfigChange(pb.ConfigChange)
	ConfigChangeProcessed(uint64, bool)
	NodeID() uint64
	ClusterID() uint64
	ShouldStop() <-chan struct{}
}

INode is the interface of a dragonboat node.

type IRecoverable

type IRecoverable interface {
	Recover(io.Reader, []sm.SnapshotFile) error
}

IRecoverable is the interface for types that can have its state restored from snapshots.

type ISavable

type ISavable interface {
	Save(*SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error)
}

ISavable is the interface for types that can its content saved as snapshots.

type ISnapshotter

type ISnapshotter interface {
	GetSnapshot(uint64) (pb.Snapshot, error)
	GetMostRecentSnapshot() (pb.Snapshot, error)
	GetFilePath(uint64) string
	Stream(IStreamable, *SSMeta, pb.IChunkSink) error
	Save(ISavable, *SSMeta) (*pb.Snapshot, *server.SSEnv, error)
	Load(ILoadable, IRecoverable, string, []sm.SnapshotFile) error
	IsNoSnapshotError(error) bool
}

ISnapshotter is the interface for the snapshotter object.

type IStateMachine

type IStateMachine interface {
	Open(<-chan struct{}) (uint64, error)
	Update(entries []sm.Entry) ([]sm.Entry, error)
	Lookup(query interface{}) (interface{}, error)
	NALookup(query []byte) ([]byte, error)
	Sync() error
	Prepare() (interface{}, error)
	Save(interface{},
		io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error
	Recover(io.Reader, []sm.SnapshotFile, <-chan struct{}) error
	Close() error
	GetHash() (uint64, error)
	Concurrent() bool
	OnDisk() bool
	Type() pb.StateMachineType
}

IStateMachine is an adapter interface for underlying IStateMachine or IConcurrentStateMachine instances.

type IStreamable

type IStreamable interface {
	Stream(interface{}, io.Writer) error
}

IStreamable is the interface for types that can be snapshot streamed.

type ITestFS

type ITestFS interface {
	SetTestFS(fs config.IFS)
}

ITestFS is an interface implemented by test SMs.

type IVReader

type IVReader interface {
	Read(data []byte) (int, error)
	Sum() []byte
}

IVReader is the interface for versioned snapshot reader.

type IVValidator

type IVValidator interface {
	AddChunk(data []byte, chunkID uint64) bool
	Validate() bool
}

IVValidator is the interface for versioned validator.

type IVWriter

type IVWriter interface {
	Write(data []byte) (int, error)
	GetVersion() SSVersion
	GetPayloadSum() []byte
	GetPayloadSize(uint64) uint64
	Flush() error
}

IVWriter is the interface for versioned snapshot writer.

type ManagedStateMachineFactory

type ManagedStateMachineFactory func(clusterID uint64,
	nodeID uint64, stopc <-chan struct{}) IManagedStateMachine

ManagedStateMachineFactory is the factory function type for creating an IManagedStateMachine instance.

type NativeSM

type NativeSM struct {
	OffloadedStatus
	// contains filtered or unexported fields
}

NativeSM is the IManagedStateMachine object used to manage native data store in Golang.

func (*NativeSM) BatchedUpdate

func (ds *NativeSM) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error)

BatchedUpdate applies committed entries in a batch to hide latency.

func (*NativeSM) Concurrent

func (ds *NativeSM) Concurrent() bool

Concurrent returns a boolean flag to indicate whether the managed state machine instance is capable of doing concurrent snapshots.

func (*NativeSM) DestroyedC

func (ds *NativeSM) DestroyedC() <-chan struct{}

DestroyedC returns a chan struct{} used to indicate whether the SM has been fully offloaded.

func (*NativeSM) GetHash

func (ds *NativeSM) GetHash() (uint64, error)

GetHash returns an integer value representing the state of the data store.

func (*NativeSM) Loaded

func (ds *NativeSM) Loaded(from From)

Loaded marks the statemachine as loaded by the specified component.

func (*NativeSM) Lookup

func (ds *NativeSM) Lookup(query interface{}) (interface{}, error)

Lookup queries the data store.

func (*NativeSM) NALookup

func (ds *NativeSM) NALookup(query []byte) ([]byte, error)

NALookup queries the data store.

func (*NativeSM) Offloaded

func (ds *NativeSM) Offloaded(from From) bool

Offloaded offloads the data store from the specified part of the system.

func (*NativeSM) OnDisk

func (ds *NativeSM) OnDisk() bool

OnDisk returns a boolean flag indicating whether the state machine is an on disk state machine.

func (*NativeSM) Open

func (ds *NativeSM) Open() (uint64, error)

Open opens on disk state machine.

func (*NativeSM) Prepare

func (ds *NativeSM) Prepare() (interface{}, error)

Prepare makes preparation for concurrently taking snapshot.

func (*NativeSM) Recover

func (ds *NativeSM) Recover(r io.Reader, files []sm.SnapshotFile) error

Recover recovers the state of the data store from the specified reader.

func (*NativeSM) Save

func (ds *NativeSM) Save(meta *SSMeta,
	w io.Writer, session []byte, c sm.ISnapshotFileCollection) (bool, error)

Save saves the state of the data store to the specified writer.

func (*NativeSM) Stream

func (ds *NativeSM) Stream(ssctx interface{}, w io.Writer) error

Stream creates and streams snapshot to a remote node.

func (*NativeSM) Sync

func (ds *NativeSM) Sync() error

Sync synchronizes state machine's in-core state with that on disk.

func (*NativeSM) Type

func (ds *NativeSM) Type() pb.StateMachineType

Type returns the state machine type.

func (*NativeSM) Update

func (ds *NativeSM) Update(e sm.Entry) (sm.Result, error)

Update updates the data store.

type OffloadedStatus

type OffloadedStatus struct {
	DestroyedC chan struct{}
	// contains filtered or unexported fields
}

OffloadedStatus is used for tracking whether the managed data store has been offloaded from various system components.

func (*OffloadedStatus) Destroyed

func (o *OffloadedStatus) Destroyed() bool

Destroyed returns a boolean value indicating whether the belonging object has been destroyed.

func (*OffloadedStatus) ReadyToDestroy

func (o *OffloadedStatus) ReadyToDestroy() bool

ReadyToDestroy returns a boolean value indicating whether the the managed data store is ready to be destroyed.

func (*OffloadedStatus) SetDestroyed

func (o *OffloadedStatus) SetDestroyed()

SetDestroyed set the destroyed flag to be true

func (*OffloadedStatus) SetLoaded

func (o *OffloadedStatus) SetLoaded(from From)

SetLoaded marks the managed data store as loaded from the specified component.

func (*OffloadedStatus) SetOffloaded

func (o *OffloadedStatus) SetOffloaded(from From)

SetOffloaded marks the managed data store as offloaded from the specified component.

type OnDiskStateMachine

type OnDiskStateMachine struct {
	// contains filtered or unexported fields
}

OnDiskStateMachine is the type to represent an on disk state machine.

func NewOnDiskStateMachine

func NewOnDiskStateMachine(s sm.IOnDiskStateMachine) *OnDiskStateMachine

NewOnDiskStateMachine creates and returns an on disk state machine.

func (*OnDiskStateMachine) Close

func (s *OnDiskStateMachine) Close() error

Close closes the state machine.

func (*OnDiskStateMachine) Concurrent

func (s *OnDiskStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*OnDiskStateMachine) GetHash

func (s *OnDiskStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*OnDiskStateMachine) Lookup

func (s *OnDiskStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*OnDiskStateMachine) NALookup

func (s *OnDiskStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*OnDiskStateMachine) OnDisk

func (s *OnDiskStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is an on disk state machine.

func (*OnDiskStateMachine) Open

func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*OnDiskStateMachine) Prepare

func (s *OnDiskStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*OnDiskStateMachine) Recover

func (s *OnDiskStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*OnDiskStateMachine) Save

func (s *OnDiskStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*OnDiskStateMachine) SetTestFS

func (s *OnDiskStateMachine) SetTestFS(fs config.IFS)

SetTestFS injects the specified fs to the test SM.

func (*OnDiskStateMachine) Sync

func (s *OnDiskStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*OnDiskStateMachine) Type

Type returns the type of the state machine.

func (*OnDiskStateMachine) Update

func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type RaftClientID

type RaftClientID uint64

RaftClientID is the type used as client id in sessions.

func (*RaftClientID) Compare

func (a *RaftClientID) Compare(b llrb.Comparable) int

Compare implements the llrb.Comparable interface.

type RaftSeriesID

type RaftSeriesID uint64

RaftSeriesID is the type used as series id in sessions.

type RegularStateMachine

type RegularStateMachine struct {
	// contains filtered or unexported fields
}

RegularStateMachine is a regular state machine not capable of taking concurrent snapshots.

func NewRegularStateMachine

func NewRegularStateMachine(s sm.IStateMachine) *RegularStateMachine

NewRegularStateMachine creates a new RegularStateMachine instance.

func (*RegularStateMachine) Close

func (s *RegularStateMachine) Close() error

Close closes the state machine.

func (*RegularStateMachine) Concurrent

func (s *RegularStateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*RegularStateMachine) GetHash

func (s *RegularStateMachine) GetHash() (uint64, error)

GetHash returns the uint64 hash value representing the state of a state machine.

func (*RegularStateMachine) Lookup

func (s *RegularStateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the state machine.

func (*RegularStateMachine) NALookup

func (s *RegularStateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the state machine.

func (*RegularStateMachine) OnDisk

func (s *RegularStateMachine) OnDisk() bool

OnDisk returns a boolean flag indicating whether this is an on disk state machine.

func (*RegularStateMachine) Open

func (s *RegularStateMachine) Open(stopc <-chan struct{}) (uint64, error)

Open opens the state machine.

func (*RegularStateMachine) Prepare

func (s *RegularStateMachine) Prepare() (interface{}, error)

Prepare makes preparations for taking concurrent snapshot.

func (*RegularStateMachine) Recover

func (s *RegularStateMachine) Recover(r io.Reader,
	fs []sm.SnapshotFile, stopc <-chan struct{}) error

Recover recovers the state machine from a snapshot.

func (*RegularStateMachine) Save

func (s *RegularStateMachine) Save(ctx interface{},
	w io.Writer, fc sm.ISnapshotFileCollection, stopc <-chan struct{}) error

Save saves the snapshot.

func (*RegularStateMachine) Sync

func (s *RegularStateMachine) Sync() error

Sync synchronizes all in-core state with that on disk.

func (*RegularStateMachine) Type

Type returns the type of the state machine.

func (*RegularStateMachine) Update

func (s *RegularStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

Update updates the state machine.

type SMFactoryFunc

type SMFactoryFunc func(clusterID uint64,
	nodeID uint64, done <-chan struct{}) IManagedStateMachine

SMFactoryFunc is the function type for creating an IStateMachine instance

type SSMeta

type SSMeta struct {
	From            uint64
	Index           uint64
	Term            uint64
	OnDiskIndex     uint64 // applied index of IOnDiskStateMachine
	Request         SSRequest
	Membership      pb.Membership
	Type            pb.StateMachineType
	Session         *bytes.Buffer
	Ctx             interface{}
	CompressionType config.CompressionType
}

SSMeta is the metadata of a snapshot.

type SSReqType

type SSReqType uint64

SSReqType is the type of a snapshot request.

const (
	// Periodic is the value to indicate periodic snapshot.
	Periodic SSReqType = iota
	// UserRequested is the value to indicate user requested snapshot.
	UserRequested
	// Exported is the value to indicate exported snapshot.
	Exported
	// Streaming is the value to indicate snapshot streaming.
	Streaming
)

type SSRequest

type SSRequest struct {
	Type               SSReqType
	Key                uint64
	Path               string
	OverrideCompaction bool
	CompactionOverhead uint64
}

SSRequest is the type for describing the details of a snapshot request.

func (*SSRequest) Exported

func (r *SSRequest) Exported() bool

Exported returns a boolean value indicating whether the snapshot request is to create an exported snapshot.

func (*SSRequest) Streaming

func (r *SSRequest) Streaming() bool

Streaming returns a boolean value indicating whether the snapshot request is to stream snapshot.

type SSVersion

type SSVersion uint64

SSVersion is the snapshot version value type.

const (
	// V1SnapshotVersion is the value of snapshot version 1.
	V1SnapshotVersion SSVersion = 1
	// V2SnapshotVersion is the value of snapshot version 2.
	V2SnapshotVersion SSVersion = 2
	// SnapshotVersion is the snapshot binary format version.
	SnapshotVersion SSVersion = V2SnapshotVersion
	// SnapshotHeaderSize is the size of snapshot in number of bytes.
	SnapshotHeaderSize = settings.SnapshotHeaderSize

	// DefaultChecksumType is the default checksum type.
	DefaultChecksumType = defaultChecksumType
)

type Session

type Session struct {
	ClientID      RaftClientID
	RespondedUpTo RaftSeriesID
	History       map[RaftSeriesID]sm.Result
}

Session is the session object maintained on the raft side.

func (*Session) AddResponse

func (s *Session) AddResponse(id RaftSeriesID, result sm.Result)

AddResponse adds a response.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager is the wrapper struct that implements client session related functionalites used in the IManagedStateMachine interface.

func NewSessionManager

func NewSessionManager() *SessionManager

NewSessionManager returns a new SessionManager instance.

func (*SessionManager) AddResponse

func (ds *SessionManager) AddResponse(session *Session,
	seriesID uint64, result sm.Result)

AddResponse adds the specified result to the session.

func (*SessionManager) ClientRegistered

func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool)

ClientRegistered returns whether the specified client exists in the system.

func (*SessionManager) GetSessionHash

func (ds *SessionManager) GetSessionHash() uint64

GetSessionHash returns an uint64 integer representing the state of the session manager.

func (*SessionManager) LoadSessions

func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error

LoadSessions loads and restores sessions from io.Reader.

func (*SessionManager) MustHaveClientSeries

func (ds *SessionManager) MustHaveClientSeries(session *Session,
	seriesID uint64)

MustHaveClientSeries checks whether the session manager contains a client session identified as clientID and whether it has seriesID responded.

func (*SessionManager) RegisterClientID

func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result

RegisterClientID registers a new client, it returns the input client id if it is previously unknown, or 0 when the client has already been registered.

func (*SessionManager) SaveSessions

func (ds *SessionManager) SaveSessions(writer io.Writer) error

SaveSessions saves the sessions to the provided io.writer.

func (*SessionManager) UnregisterClientID

func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result

UnregisterClientID removes the specified client session from the system. It returns the client id if the client is successfully removed, or 0 if the client session does not exist.

func (*SessionManager) UpdateRequired

func (ds *SessionManager) UpdateRequired(session *Session,
	seriesID uint64) (sm.Result, bool, bool)

UpdateRequired return a tuple of request result, responded before, update required.

func (*SessionManager) UpdateRespondedTo

func (ds *SessionManager) UpdateRespondedTo(session *Session,
	respondedTo uint64)

UpdateRespondedTo updates the responded to value of the specified client session.

type SnapshotReader

type SnapshotReader struct {
	// contains filtered or unexported fields
}

SnapshotReader is an io.Reader for reading from snapshot files.

func NewSnapshotReader

func NewSnapshotReader(fp string, fs vfs.IFS) (*SnapshotReader, error)

NewSnapshotReader creates a new snapshot reader instance.

func (*SnapshotReader) Close

func (sr *SnapshotReader) Close() error

Close closes the snapshot reader instance.

func (*SnapshotReader) GetHeader

func (sr *SnapshotReader) GetHeader() (pb.SnapshotHeader, error)

GetHeader returns the snapshot header instance.

func (*SnapshotReader) Read

func (sr *SnapshotReader) Read(data []byte) (int, error)

Read reads up to len(data) bytes from the snapshot file.

func (*SnapshotReader) ValidatePayload

func (sr *SnapshotReader) ValidatePayload(header pb.SnapshotHeader)

ValidatePayload validates whether the snapshot content matches the checksum recorded in the header.

type SnapshotValidator

type SnapshotValidator struct {
	// contains filtered or unexported fields
}

SnapshotValidator is the validator used to check incoming snapshot chunks.

func NewSnapshotValidator

func NewSnapshotValidator() *SnapshotValidator

NewSnapshotValidator creates and returns a new SnapshotValidator instance.

func (*SnapshotValidator) AddChunk

func (v *SnapshotValidator) AddChunk(data []byte, chunkID uint64) bool

AddChunk adds a new snapshot chunk to the validator.

func (*SnapshotValidator) Validate

func (v *SnapshotValidator) Validate() bool

Validate validates the added chunks and return a boolean flag indicating whether the snapshot chunks are valid.

type SnapshotWriter

type SnapshotWriter struct {
	// contains filtered or unexported fields
}

SnapshotWriter is an io.Writer used to write snapshot file.

func NewSnapshotWriter

func NewSnapshotWriter(fp string,
	v SSVersion, ct pb.CompressionType, fs vfs.IFS) (*SnapshotWriter, error)

NewSnapshotWriter creates a new snapshot writer instance.

func (*SnapshotWriter) Close

func (sw *SnapshotWriter) Close() error

Close closes the snapshot writer instance.

func (*SnapshotWriter) GetPayloadChecksum

func (sw *SnapshotWriter) GetPayloadChecksum() []byte

GetPayloadChecksum returns the payload checksum.

func (*SnapshotWriter) GetPayloadSize

func (sw *SnapshotWriter) GetPayloadSize(sz uint64) uint64

GetPayloadSize returns the payload size.

func (*SnapshotWriter) Write

func (sw *SnapshotWriter) Write(data []byte) (int, error)

Write writes the specified data to the snapshot.

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

StateMachine is a manager class that manages application state machine

func NewStateMachine

func NewStateMachine(sm IManagedStateMachine,
	snapshotter ISnapshotter,
	cfg config.Config, node INode, fs vfs.IFS) *StateMachine

NewStateMachine creates a new application state machine object.

func (*StateMachine) Concurrent

func (s *StateMachine) Concurrent() bool

Concurrent returns a boolean flag indicating whether the state machine is capable of taking concurrent snapshot.

func (*StateMachine) DestroyedC

func (s *StateMachine) DestroyedC() <-chan struct{}

DestroyedC return a chan struct{} used to indicate whether the SM has been fully unloaded.

func (*StateMachine) GetBatchedLastApplied

func (s *StateMachine) GetBatchedLastApplied() uint64

GetBatchedLastApplied returns the batched last applied value.

func (*StateMachine) GetHash

func (s *StateMachine) GetHash() (uint64, error)

GetHash returns the state machine hash.

func (*StateMachine) GetLastApplied

func (s *StateMachine) GetLastApplied() uint64

GetLastApplied returns the last applied value.

func (*StateMachine) GetMembership

func (s *StateMachine) GetMembership() pb.Membership

GetMembership returns the membership info maintained by the state machine.

func (*StateMachine) GetMembershipHash

func (s *StateMachine) GetMembershipHash() uint64

GetMembershipHash returns the hash of the membership instance.

func (*StateMachine) GetSessionHash

func (s *StateMachine) GetSessionHash() uint64

GetSessionHash returns the session hash.

func (*StateMachine) GetSyncedIndex

func (s *StateMachine) GetSyncedIndex() uint64

GetSyncedIndex returns the index value that is known to have been synchronized.

func (*StateMachine) Handle

func (s *StateMachine) Handle(batch []Task, apply []sm.Entry) (Task, error)

Handle pulls the committed record and apply it if there is any available.

func (*StateMachine) Loaded

func (s *StateMachine) Loaded(from From)

Loaded marks the state machine as loaded from the specified compone.

func (*StateMachine) Lookup

func (s *StateMachine) Lookup(query interface{}) (interface{}, error)

Lookup queries the local state machine.

func (*StateMachine) NALookup

func (s *StateMachine) NALookup(query []byte) ([]byte, error)

NALookup queries the local state machine.

func (*StateMachine) Offloaded

func (s *StateMachine) Offloaded(from From) bool

Offloaded marks the state machine as offloaded from the specified compone. It returns a boolean value indicating whether the node has been fully unloaded after unloading from the specified compone.

func (*StateMachine) OnDiskStateMachine

func (s *StateMachine) OnDiskStateMachine() bool

OnDiskStateMachine returns a boolean flag indicating whether it is an on disk state machine.

func (*StateMachine) OpenOnDiskStateMachine

func (s *StateMachine) OpenOnDiskStateMachine() (uint64, error)

OpenOnDiskStateMachine opens the on disk state machine.

func (*StateMachine) ReadyToStreamSnapshot

func (s *StateMachine) ReadyToStreamSnapshot() bool

ReadyToStreamSnapshot returns a boolean flag to indicate whether the state machine is ready to stream snapshot. It can not stream a full snapshot when membership state is catching up with the all disk SM state. however, meta only snapshot can be taken at any time.

func (*StateMachine) RecoverFromSnapshot

func (s *StateMachine) RecoverFromSnapshot(t Task) (uint64, error)

RecoverFromSnapshot applies the snapshot.

func (*StateMachine) SaveSnapshot

func (s *StateMachine) SaveSnapshot(req SSRequest) (*pb.Snapshot,
	*server.SSEnv, error)

SaveSnapshot creates a snapshot.

func (*StateMachine) SetBatchedLastApplied

func (s *StateMachine) SetBatchedLastApplied(index uint64)

SetBatchedLastApplied sets the batched last applied value. This method is mostly used in tests.

func (*StateMachine) StreamSnapshot

func (s *StateMachine) StreamSnapshot(sink pb.IChunkSink) error

StreamSnapshot starts to stream snapshot from the current SM to a remote node targeted by the provided sink.

func (*StateMachine) Sync

func (s *StateMachine) Sync() error

Sync synchronizes state machine's in-core state with that on disk.

func (*StateMachine) TaskChanBusy

func (s *StateMachine) TaskChanBusy() bool

TaskChanBusy returns whether the TaskC chan is busy. Busy is defined as having more than half of its buffer occupied.

func (*StateMachine) TaskQ

func (s *StateMachine) TaskQ() *TaskQueue

TaskQ returns the task queue.

type Task

type Task struct {
	ClusterID         uint64
	NodeID            uint64
	Index             uint64
	SnapshotAvailable bool
	InitialSnapshot   bool
	SnapshotRequested bool
	StreamSnapshot    bool
	PeriodicSync      bool
	NewNode           bool
	SSRequest         SSRequest
	Entries           []pb.Entry
}

Task describes a task that need to be handled by StateMachine.

func (*Task) IsSnapshotTask

func (t *Task) IsSnapshotTask() bool

IsSnapshotTask returns a boolean flag indicating whether it is a snapshot task.

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue is a queue of tasks to be processed by the state machine.

func NewTaskQueue

func NewTaskQueue() *TaskQueue

NewTaskQueue creates and returns a new task queue.

func (*TaskQueue) Add

func (tq *TaskQueue) Add(task Task)

Add adds a new task to the queue.

func (*TaskQueue) Get

func (tq *TaskQueue) Get() (Task, bool)

Get returns a task from the queue if there is any.

func (*TaskQueue) GetAll

func (tq *TaskQueue) GetAll() []Task

GetAll returns all tasks currently in the queue.

func (*TaskQueue) MoreEntryToApply

func (tq *TaskQueue) MoreEntryToApply() bool

MoreEntryToApply returns a boolean value indicating whether it is ok to queue more entries to apply.

func (*TaskQueue) Size

func (tq *TaskQueue) Size() uint64

Size returns the number of queued tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL