checkpoint

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PrefixIncremental = "incremental"
	PrefixGlobal      = "global"
	PrefixMetadata    = "meta"
	CheckpointDir     = "ckp/"
)
View Source
const (
	CheckpointAttr_StartTS      = "start_ts"
	CheckpointAttr_EndTS        = "end_ts"
	CheckpointAttr_MetaLocation = "meta_location"
	CheckpointAttr_EntryType    = "entry_type"
)

Variables

View Source
var (
	CheckpointSchema *catalog.Schema
)

Functions

func NewRunner added in v0.6.0

func NewRunner(
	fs *objectio.ObjectFS,
	catalog *catalog.Catalog,
	scheduler tasks.TaskScheduler,
	source logtail.Collector,
	wal wal.Driver,
	opts ...Option) *runner

Types

type CheckpointEntry added in v0.6.0

type CheckpointEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewCheckpointEntry added in v0.6.0

func NewCheckpointEntry(start, end types.TS, typ EntryType) *CheckpointEntry

func (*CheckpointEntry) GCEntry added in v0.7.0

func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error

func (*CheckpointEntry) GCMetadata added in v0.7.0

func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error

func (*CheckpointEntry) GetByTableID added in v0.6.0

func (e *CheckpointEntry) GetByTableID(fs *objectio.ObjectFS, tid uint64) (ins, del, cnIns *api.Batch, err error)

func (*CheckpointEntry) GetEnd added in v0.6.0

func (e *CheckpointEntry) GetEnd() types.TS

func (*CheckpointEntry) GetLocation added in v0.6.0

func (e *CheckpointEntry) GetLocation() string

func (*CheckpointEntry) GetStart added in v0.6.0

func (e *CheckpointEntry) GetStart() types.TS

func (*CheckpointEntry) GetState added in v0.6.0

func (e *CheckpointEntry) GetState() State

func (*CheckpointEntry) HasOverlap added in v0.6.0

func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool

func (*CheckpointEntry) IsCommitted added in v0.6.0

func (e *CheckpointEntry) IsCommitted() bool

func (*CheckpointEntry) IsFinished added in v0.6.0

func (e *CheckpointEntry) IsFinished() bool

func (*CheckpointEntry) IsIncremental added in v0.6.0

func (e *CheckpointEntry) IsIncremental() bool

func (*CheckpointEntry) IsPendding added in v0.6.0

func (e *CheckpointEntry) IsPendding() bool

func (*CheckpointEntry) IsRunning added in v0.6.0

func (e *CheckpointEntry) IsRunning() bool

func (*CheckpointEntry) LessEq added in v0.7.0

func (e *CheckpointEntry) LessEq(ts types.TS) bool

func (*CheckpointEntry) Read added in v0.6.0

func (e *CheckpointEntry) Read(
	ctx context.Context,
	scheduler tasks.JobScheduler,
	fs *objectio.ObjectFS,
) (data *logtail.CheckpointData, err error)

func (*CheckpointEntry) Replay added in v0.6.0

func (e *CheckpointEntry) Replay(
	ctx context.Context,
	c *catalog.Catalog,
	fs *objectio.ObjectFS,
	dataFactory catalog.DataFactory) (readDuration, applyDuration time.Duration, err error)

func (*CheckpointEntry) SetLocation added in v0.6.0

func (e *CheckpointEntry) SetLocation(location string)

func (*CheckpointEntry) SetState added in v0.6.0

func (e *CheckpointEntry) SetState(state State) (ok bool)

func (*CheckpointEntry) String added in v0.6.0

func (e *CheckpointEntry) String() string

type DirtyCtx added in v0.6.0

type DirtyCtx struct {
	// contains filtered or unexported fields
}

type EntryType added in v0.7.0

type EntryType int8
const (
	ET_Global EntryType = iota
	ET_Incremental
)

type Observer added in v0.6.0

type Observer interface {
	OnNewCheckpoint(ts types.TS)
}

type Option added in v0.6.0

type Option func(*runner)

func WithCollectInterval added in v0.6.0

func WithCollectInterval(interval time.Duration) Option

func WithFlushInterval added in v0.6.0

func WithFlushInterval(interval time.Duration) Option

func WithForceFlushCheckInterval added in v0.6.0

func WithForceFlushCheckInterval(interval time.Duration) Option

func WithForceFlushTimeout added in v0.6.0

func WithForceFlushTimeout(to time.Duration) Option

func WithGlobalMinCount added in v0.7.0

func WithGlobalMinCount(count int) Option

func WithGlobalVersionInterval added in v0.7.0

func WithGlobalVersionInterval(interval time.Duration) Option

func WithMinCount added in v0.6.0

func WithMinCount(count int) Option

func WithMinIncrementalInterval added in v0.6.0

func WithMinIncrementalInterval(interval time.Duration) Option

func WithObserver added in v0.6.0

func WithObserver(o Observer) Option

type Runner added in v0.6.0

type Runner interface {
	TestRunner
	RunnerReader
	Start()
	Stop()
	EnqueueWait(any) error
	Replay(catalog.DataFactory) (types.TS, error)

	FlushTable(dbID, tableID uint64, ts types.TS) error
	GCByTS(ctx context.Context, ts types.TS) error

	// for test, delete in next phase
	DebugUpdateOptions(opts ...Option)
}

type RunnerReader added in v0.7.0

type RunnerReader interface {
	GetAllIncrementalCheckpoints() []*CheckpointEntry
	GetAllGlobalCheckpoints() []*CheckpointEntry
	GetPenddingIncrementalCount() int
	GetGlobalCheckpointCount() int
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
	MaxLSN() uint64
}

type State added in v0.6.0

type State int8
const (
	ST_Running State = iota
	ST_Pending
	ST_Finished
)

type TestRunner added in v0.7.0

type TestRunner interface {
	EnableCheckpoint()
	DisableCheckpoint()

	CleanPenddingCheckpoint()
	ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error
	ForceIncrementalCheckpoint(end types.TS) error
	IsAllChangesFlushed(start, end types.TS, printTree bool) bool
	MaxLSNInRange(end types.TS) uint64

	ExistPendingEntryToGC() bool
	MaxGlobalCheckpoint() *CheckpointEntry
	ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) (err error)
	GetDirtyCollector() logtail.Collector
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL