Documentation ¶
Index ¶
- Constants
- Variables
- func NewRunner(fs *objectio.ObjectFS, catalog *catalog.Catalog, scheduler tasks.TaskScheduler, ...) *runner
- type CheckpointEntry
- func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error
- func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error
- func (e *CheckpointEntry) GetByTableID(fs *objectio.ObjectFS, tid uint64) (ins, del, cnIns *api.Batch, err error)
- func (e *CheckpointEntry) GetEnd() types.TS
- func (e *CheckpointEntry) GetLocation() string
- func (e *CheckpointEntry) GetStart() types.TS
- func (e *CheckpointEntry) GetState() State
- func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
- func (e *CheckpointEntry) IsCommitted() bool
- func (e *CheckpointEntry) IsFinished() bool
- func (e *CheckpointEntry) IsIncremental() bool
- func (e *CheckpointEntry) IsPendding() bool
- func (e *CheckpointEntry) IsRunning() bool
- func (e *CheckpointEntry) LessEq(ts types.TS) bool
- func (e *CheckpointEntry) Read(ctx context.Context, scheduler tasks.JobScheduler, fs *objectio.ObjectFS) (data *logtail.CheckpointData, err error)
- func (e *CheckpointEntry) Replay(ctx context.Context, c *catalog.Catalog, fs *objectio.ObjectFS, ...) (readDuration, applyDuration time.Duration, err error)
- func (e *CheckpointEntry) SetLocation(location string)
- func (e *CheckpointEntry) SetState(state State) (ok bool)
- func (e *CheckpointEntry) String() string
- type DirtyCtx
- type EntryType
- type Observer
- type Option
- func WithCollectInterval(interval time.Duration) Option
- func WithFlushInterval(interval time.Duration) Option
- func WithForceFlushCheckInterval(interval time.Duration) Option
- func WithForceFlushTimeout(to time.Duration) Option
- func WithGlobalMinCount(count int) Option
- func WithGlobalVersionInterval(interval time.Duration) Option
- func WithMinCount(count int) Option
- func WithMinIncrementalInterval(interval time.Duration) Option
- func WithObserver(o Observer) Option
- type Runner
- type RunnerReader
- type State
- type TestRunner
Constants ¶
View Source
const ( PrefixIncremental = "incremental" PrefixGlobal = "global" PrefixMetadata = "meta" CheckpointDir = "ckp/" )
View Source
const ( CheckpointAttr_StartTS = "start_ts" CheckpointAttr_EndTS = "end_ts" CheckpointAttr_MetaLocation = "meta_location" CheckpointAttr_EntryType = "entry_type" )
Variables ¶
View Source
var ( CheckpointSchemaAttr = []string{ CheckpointAttr_StartTS, CheckpointAttr_EndTS, CheckpointAttr_MetaLocation, CheckpointAttr_EntryType, } CheckpointSchemaTypes = []types.Type{ types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0, 0), types.New(types.T_bool, 0, 0, 0), } )
View Source
var (
CheckpointSchema *catalog.Schema
)
Functions ¶
Types ¶
type CheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry(start, end types.TS, typ EntryType) *CheckpointEntry
func (*CheckpointEntry) GCEntry ¶ added in v0.7.0
func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error
func (*CheckpointEntry) GCMetadata ¶ added in v0.7.0
func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error
func (*CheckpointEntry) GetByTableID ¶ added in v0.6.0
func (*CheckpointEntry) GetEnd ¶ added in v0.6.0
func (e *CheckpointEntry) GetEnd() types.TS
func (*CheckpointEntry) GetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) GetLocation() string
func (*CheckpointEntry) GetStart ¶ added in v0.6.0
func (e *CheckpointEntry) GetStart() types.TS
func (*CheckpointEntry) GetState ¶ added in v0.6.0
func (e *CheckpointEntry) GetState() State
func (*CheckpointEntry) HasOverlap ¶ added in v0.6.0
func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
func (*CheckpointEntry) IsCommitted ¶ added in v0.6.0
func (e *CheckpointEntry) IsCommitted() bool
func (*CheckpointEntry) IsFinished ¶ added in v0.6.0
func (e *CheckpointEntry) IsFinished() bool
func (*CheckpointEntry) IsIncremental ¶ added in v0.6.0
func (e *CheckpointEntry) IsIncremental() bool
func (*CheckpointEntry) IsPendding ¶ added in v0.6.0
func (e *CheckpointEntry) IsPendding() bool
func (*CheckpointEntry) IsRunning ¶ added in v0.6.0
func (e *CheckpointEntry) IsRunning() bool
func (*CheckpointEntry) Read ¶ added in v0.6.0
func (e *CheckpointEntry) Read( ctx context.Context, scheduler tasks.JobScheduler, fs *objectio.ObjectFS, ) (data *logtail.CheckpointData, err error)
func (*CheckpointEntry) SetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) SetLocation(location string)
func (*CheckpointEntry) SetState ¶ added in v0.6.0
func (e *CheckpointEntry) SetState(state State) (ok bool)
func (*CheckpointEntry) String ¶ added in v0.6.0
func (e *CheckpointEntry) String() string
type Option ¶ added in v0.6.0
type Option func(*runner)
func WithCollectInterval ¶ added in v0.6.0
func WithFlushInterval ¶ added in v0.6.0
func WithForceFlushCheckInterval ¶ added in v0.6.0
func WithForceFlushTimeout ¶ added in v0.6.0
func WithGlobalMinCount ¶ added in v0.7.0
func WithGlobalVersionInterval ¶ added in v0.7.0
func WithMinCount ¶ added in v0.6.0
func WithMinIncrementalInterval ¶ added in v0.6.0
func WithObserver ¶ added in v0.6.0
type Runner ¶ added in v0.6.0
type Runner interface { TestRunner RunnerReader Start() Stop() EnqueueWait(any) error Replay(catalog.DataFactory) (types.TS, error) FlushTable(dbID, tableID uint64, ts types.TS) error GCByTS(ctx context.Context, ts types.TS) error // for test, delete in next phase DebugUpdateOptions(opts ...Option) }
type RunnerReader ¶ added in v0.7.0
type RunnerReader interface { GetAllIncrementalCheckpoints() []*CheckpointEntry GetAllGlobalCheckpoints() []*CheckpointEntry GetPenddingIncrementalCount() int GetGlobalCheckpointCount() int CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry MaxLSN() uint64 }
type TestRunner ¶ added in v0.7.0
type TestRunner interface { EnableCheckpoint() DisableCheckpoint() CleanPenddingCheckpoint() ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error ForceIncrementalCheckpoint(end types.TS) error IsAllChangesFlushed(start, end types.TS, printTree bool) bool MaxLSNInRange(end types.TS) uint64 ExistPendingEntryToGC() bool MaxGlobalCheckpoint() *CheckpointEntry ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) (err error) GetDirtyCollector() logtail.Collector }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.