checkpoint

package
v1.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PrefetchData uint16 = iota
	PrefetchMetaIdx
	ReadMetaIdx
	ReadData
)
View Source
const (
	PrefixIncremental = "incremental"
	PrefixGlobal      = "global"
	PrefixMetadata    = "meta"
	CheckpointDir     = "ckp/"
)
View Source
const (
	CheckpointAttr_StartTS      = "start_ts"
	CheckpointAttr_EndTS        = "end_ts"
	CheckpointAttr_MetaLocation = "meta_location"
	CheckpointAttr_EntryType    = "entry_type"
	CheckpointAttr_Version      = "version"
	CheckpointAttr_AllLocations = "all_locations"

	CheckpointSchemaColumnCountV1 = 5 // start, end, loc, type, ver
)

Variables

View Source
var (
	CheckpointSchema *catalog.Schema
)

Functions

func NewRunner added in v0.6.0

func NewRunner(
	ctx context.Context,
	rt *dbutils.Runtime,
	catalog *catalog.Catalog,
	source logtail.Collector,
	wal wal.Driver,
	opts ...Option) *runner

Types

type CheckpointEntry added in v0.6.0

type CheckpointEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewCheckpointEntry added in v0.6.0

func NewCheckpointEntry(start, end types.TS, typ EntryType) *CheckpointEntry

func (*CheckpointEntry) CheckPrintTime added in v0.8.0

func (e *CheckpointEntry) CheckPrintTime() bool

func (*CheckpointEntry) GCEntry added in v0.7.0

func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error

func (*CheckpointEntry) GCMetadata added in v0.7.0

func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error

func (*CheckpointEntry) GetByTableID added in v0.6.0

func (e *CheckpointEntry) GetByTableID(ctx context.Context, fs *objectio.ObjectFS, tid uint64) (ins, del, cnIns, segDel *api.Batch, err error)

func (*CheckpointEntry) GetEnd added in v0.6.0

func (e *CheckpointEntry) GetEnd() types.TS

func (*CheckpointEntry) GetLocation added in v0.6.0

func (e *CheckpointEntry) GetLocation() objectio.Location

func (*CheckpointEntry) GetStart added in v0.6.0

func (e *CheckpointEntry) GetStart() types.TS

func (*CheckpointEntry) GetState added in v0.6.0

func (e *CheckpointEntry) GetState() State

func (*CheckpointEntry) GetVersion added in v1.0.0

func (e *CheckpointEntry) GetVersion() uint32

func (*CheckpointEntry) HasOverlap added in v0.6.0

func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool

func (*CheckpointEntry) IsCommitted added in v0.6.0

func (e *CheckpointEntry) IsCommitted() bool

func (*CheckpointEntry) IsFinished added in v0.6.0

func (e *CheckpointEntry) IsFinished() bool

func (*CheckpointEntry) IsIncremental added in v0.6.0

func (e *CheckpointEntry) IsIncremental() bool

func (*CheckpointEntry) IsPendding added in v0.6.0

func (e *CheckpointEntry) IsPendding() bool

func (*CheckpointEntry) IsRunning added in v0.6.0

func (e *CheckpointEntry) IsRunning() bool

func (*CheckpointEntry) LessEq added in v0.7.0

func (e *CheckpointEntry) LessEq(ts types.TS) bool

func (*CheckpointEntry) Prefetch added in v0.8.0

func (e *CheckpointEntry) Prefetch(
	ctx context.Context,
	fs *objectio.ObjectFS,
	data *logtail.CheckpointData,
) (err error)

func (*CheckpointEntry) PrefetchMetaIdx added in v1.0.0

func (e *CheckpointEntry) PrefetchMetaIdx(
	ctx context.Context,
	fs *objectio.ObjectFS,
) (data *logtail.CheckpointData, err error)

func (*CheckpointEntry) Read added in v0.6.0

func (e *CheckpointEntry) Read(
	ctx context.Context,
	fs *objectio.ObjectFS,
	data *logtail.CheckpointData,
) (err error)

func (*CheckpointEntry) ReadMetaIdx added in v1.0.0

func (e *CheckpointEntry) ReadMetaIdx(
	ctx context.Context,
	fs *objectio.ObjectFS,
	data *logtail.CheckpointData,
) (err error)

func (*CheckpointEntry) SetLocation added in v0.6.0

func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location)

func (*CheckpointEntry) SetPrintTime added in v0.8.0

func (e *CheckpointEntry) SetPrintTime()

func (*CheckpointEntry) SetState added in v0.6.0

func (e *CheckpointEntry) SetState(state State) (ok bool)

func (*CheckpointEntry) String added in v0.6.0

func (e *CheckpointEntry) String() string

type DirtyCtx added in v0.6.0

type DirtyCtx struct {
	// contains filtered or unexported fields
}

type EntryType added in v0.7.0

type EntryType int8
const (
	ET_Global EntryType = iota
	ET_Incremental
)

type Observer added in v0.6.0

type Observer interface {
	OnNewCheckpoint(ts types.TS)
}

type Option added in v0.6.0

type Option func(*runner)

func WithCheckpointBlockRows added in v1.0.0

func WithCheckpointBlockRows(rows int) Option

func WithCollectInterval added in v0.6.0

func WithCollectInterval(interval time.Duration) Option

func WithFlushInterval added in v0.6.0

func WithFlushInterval(interval time.Duration) Option

func WithForceFlushCheckInterval added in v0.6.0

func WithForceFlushCheckInterval(interval time.Duration) Option

func WithForceFlushTimeout added in v0.6.0

func WithForceFlushTimeout(to time.Duration) Option

func WithGlobalMinCount added in v0.7.0

func WithGlobalMinCount(count int) Option

func WithGlobalVersionInterval added in v0.7.0

func WithGlobalVersionInterval(interval time.Duration) Option

func WithMinCount added in v0.6.0

func WithMinCount(count int) Option

func WithMinIncrementalInterval added in v0.6.0

func WithMinIncrementalInterval(interval time.Duration) Option

func WithObserver added in v0.6.0

func WithObserver(o Observer) Option

type Runner added in v0.6.0

type Runner interface {
	TestRunner
	RunnerReader
	Start()
	Stop()
	String() string
	EnqueueWait(any) error
	Replay(catalog.DataFactory) (types.TS, error)

	FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
	GCByTS(ctx context.Context, ts types.TS) error

	// for test, delete in next phase
	DebugUpdateOptions(opts ...Option)
	GetAllCheckpoints() []*CheckpointEntry
}

type RunnerReader added in v0.7.0

type RunnerReader interface {
	GetAllIncrementalCheckpoints() []*CheckpointEntry
	GetAllGlobalCheckpoints() []*CheckpointEntry
	GetPenddingIncrementalCount() int
	GetGlobalCheckpointCount() int
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
	MaxLSN() uint64
}

type State added in v0.6.0

type State int8
const (
	ST_Running State = iota
	ST_Pending
	ST_Finished
)

type TestRunner added in v0.7.0

type TestRunner interface {
	EnableCheckpoint()
	DisableCheckpoint()

	CleanPenddingCheckpoint()
	ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error
	ForceIncrementalCheckpoint(end types.TS) error
	IsAllChangesFlushed(start, end types.TS, printTree bool) bool
	MaxLSNInRange(end types.TS) uint64

	ExistPendingEntryToGC() bool
	MaxGlobalCheckpoint() *CheckpointEntry
	MaxCheckpoint() *CheckpointEntry
	ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) (err error)
	GetDirtyCollector() logtail.Collector
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL