raftchunking

package
v0.0.0-...-52b58a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2022 License: Apache-2.0, MPL-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ChunkSize is the threshold used for breaking a large value into chunks.
	// Defaults to the suggested max data size for the raft library.
	ChunkSize = raft.SuggestedMaxDataSize
)

Functions

func ChunkingApply

func ChunkingApply(cmd, extensions []byte, timeout time.Duration, applyFunc ApplyFunc) raft.ApplyFuture

ChunkingApply takes in a byte slice and chunks into ChunkSize (or less if EOF) chunks, calling Apply on each. It requires a corresponding wrapper around the FSM to handle reconstructing on the other end. Timeout will be the timeout for each individual operation, not total. The return value is a future whose Error() will return only when all underlying Apply futures have had Error() return. Note that any error indicates that the entire operation will not be applied, assuming the correct FSM wrapper is used. If extensions is passed in, it will be set as the Extensions value on the Apply once all chunks are received.

Types

type ApplyFunc

type ApplyFunc func(raft.Log, time.Duration) raft.ApplyFuture

type ChunkInfo

type ChunkInfo struct {
	OpNum       uint64
	SequenceNum uint32
	NumChunks   uint32
	Term        uint64
	Data        []byte
}

ChunkInfo holds chunk information

type ChunkMap

type ChunkMap map[uint64][]*ChunkInfo

ChunkMap represents a set of data chunks. We use ChunkInfo with Data instead of bare []byte in case there is a need to extend this info later.

type ChunkStorage

type ChunkStorage interface {
	// StoreChunk stores Data from ChunkInfo according to the other metadata
	// (OpNum, SeqNum). The bool returns whether or not all chunks have been
	// received, as in, the number of non-nil chunks is the same as NumChunks.
	StoreChunk(*ChunkInfo) (bool, error)

	// FinalizeOp gets all chunks for an op number and then removes the chunk
	// info for that op from the store. It should only be called when
	// StoreChunk for a given op number returns true but should be safe to call
	// at any time; clearing an op can be accomplished by calling this function
	// and ignoring the non-error result.
	FinalizeOp(uint64) ([]*ChunkInfo, error)

	// GetState gets all currently tracked ops, for snapshotting
	GetChunks() (ChunkMap, error)

	// RestoreChunks restores the current FSM state from a map
	RestoreChunks(ChunkMap) error
}

type ChunkingBatchingFSM

type ChunkingBatchingFSM struct {
	*ChunkingFSM
	// contains filtered or unexported fields
}

func NewChunkingBatchingFSM

func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM

func (*ChunkingBatchingFSM) ApplyBatch

func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{}

ApplyBatch applies the logs, handling chunking as needed. The return value will be an array containing an error or whatever is returned from the underlying Apply for each log.

type ChunkingConfigurationStore

type ChunkingConfigurationStore struct {
	*ChunkingFSM
	// contains filtered or unexported fields
}

func NewChunkingConfigurationStore

func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore

func (*ChunkingConfigurationStore) StoreConfiguration

func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration)

type ChunkingFSM

type ChunkingFSM struct {
	// contains filtered or unexported fields
}

ChunkingFSM is an FSM that implements chunking; it's the sister of ChunkingApply.

N.B.: If a term change happens the final apply from the client will have a nil result and not be passed through to the underlying FSM. To detect this, the final apply to the underlying FSM is wrapped in ChunkingSuccess.

func NewChunkingFSM

func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM

func (*ChunkingFSM) Apply

func (c *ChunkingFSM) Apply(l *raft.Log) interface{}

Apply applies the log, handling chunking as needed. The return value will either be an error or whatever is returned from the underlying Apply.

func (*ChunkingFSM) CurrentState

func (c *ChunkingFSM) CurrentState() (*State, error)

func (*ChunkingFSM) Restore

func (c *ChunkingFSM) Restore(rc io.ReadCloser) error

func (*ChunkingFSM) RestoreState

func (c *ChunkingFSM) RestoreState(state *State) error

func (*ChunkingFSM) Snapshot

func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error)

func (*ChunkingFSM) Underlying

func (c *ChunkingFSM) Underlying() raft.FSM

Note: this is used in tests via the Raft package test helper functions, even if it's not used in client code

type ChunkingSuccess

type ChunkingSuccess struct {
	Response interface{}
}

type InmemChunkStorage

type InmemChunkStorage struct {
	// contains filtered or unexported fields
}

InmemChunkStorage satisfies ChunkStorage using an in-memory-only tracking method.

func NewInmemChunkStorage

func NewInmemChunkStorage() *InmemChunkStorage

func (*InmemChunkStorage) FinalizeOp

func (i *InmemChunkStorage) FinalizeOp(opNum uint64) ([]*ChunkInfo, error)

func (*InmemChunkStorage) GetChunks

func (i *InmemChunkStorage) GetChunks() (ChunkMap, error)

func (*InmemChunkStorage) RestoreChunks

func (i *InmemChunkStorage) RestoreChunks(chunks ChunkMap) error

func (*InmemChunkStorage) StoreChunk

func (i *InmemChunkStorage) StoreChunk(chunk *ChunkInfo) (bool, error)

type State

type State struct {
	ChunkMap ChunkMap
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL