Documentation
¶
Index ¶
- Constants
- Variables
- func NewRunner(fs *objectio.ObjectFS, catalog *catalog.Catalog, scheduler tasks.TaskScheduler, ...) *runner
- type CheckpointEntry
- func (e *CheckpointEntry) GetByTableID(fs *objectio.ObjectFS, tid uint64) (ins, del, cnIns *api.Batch, err error)
- func (e *CheckpointEntry) GetEnd() types.TS
- func (e *CheckpointEntry) GetLocation() string
- func (e *CheckpointEntry) GetStart() types.TS
- func (e *CheckpointEntry) GetState() State
- func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
- func (e *CheckpointEntry) IsCommitted() bool
- func (e *CheckpointEntry) IsFinished() bool
- func (e *CheckpointEntry) IsIncremental() bool
- func (e *CheckpointEntry) IsPendding() bool
- func (e *CheckpointEntry) IsRunning() bool
- func (e *CheckpointEntry) Read(fs *objectio.ObjectFS) (data *logtail.CheckpointData, err error)
- func (e *CheckpointEntry) Replay(c *catalog.Catalog, fs *objectio.ObjectFS, dataFactory catalog.DataFactory) (readDuration, applyDuration time.Duration, err error)
- func (e *CheckpointEntry) SetLocation(location string)
- func (e *CheckpointEntry) SetState(state State) (ok bool)
- func (e *CheckpointEntry) String() string
- type DirtyCtx
- type Observer
- type Option
- func WithCollectInterval(interval time.Duration) Option
- func WithFlushInterval(interval time.Duration) Option
- func WithForceFlushCheckInterval(interval time.Duration) Option
- func WithForceFlushTimeout(to time.Duration) Option
- func WithMinCount(count int) Option
- func WithMinGlobalInterval(interval time.Duration) Option
- func WithMinIncrementalInterval(interval time.Duration) Option
- func WithObserver(o Observer) Option
- type Runner
- type State
Constants ¶
View Source
const ( PrefixIncremental = "incremental" PrefixGlobal = "global" PrefixMetadata = "meta" CheckpointDir = "ckp/" )
View Source
const ( CheckpointAttr_StartTS = "start_ts" CheckpointAttr_EndTS = "end_ts" CheckpointAttr_MetaLocation = "meta_location" )
Variables ¶
View Source
var ( CheckpointSchemaAttr = []string{ CheckpointAttr_StartTS, CheckpointAttr_EndTS, CheckpointAttr_MetaLocation, } CheckpointSchemaTypes = []types.Type{ types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_varchar, 0, 0, 0), } )
View Source
var (
CheckpointSchema *catalog.Schema
)
Functions ¶
Types ¶
type CheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry(start, end types.TS) *CheckpointEntry
func (*CheckpointEntry) GetByTableID ¶ added in v0.6.0
func (*CheckpointEntry) GetEnd ¶ added in v0.6.0
func (e *CheckpointEntry) GetEnd() types.TS
func (*CheckpointEntry) GetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) GetLocation() string
func (*CheckpointEntry) GetStart ¶ added in v0.6.0
func (e *CheckpointEntry) GetStart() types.TS
func (*CheckpointEntry) GetState ¶ added in v0.6.0
func (e *CheckpointEntry) GetState() State
func (*CheckpointEntry) HasOverlap ¶ added in v0.6.0
func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
func (*CheckpointEntry) IsCommitted ¶ added in v0.6.0
func (e *CheckpointEntry) IsCommitted() bool
func (*CheckpointEntry) IsFinished ¶ added in v0.6.0
func (e *CheckpointEntry) IsFinished() bool
func (*CheckpointEntry) IsIncremental ¶ added in v0.6.0
func (e *CheckpointEntry) IsIncremental() bool
func (*CheckpointEntry) IsPendding ¶ added in v0.6.0
func (e *CheckpointEntry) IsPendding() bool
func (*CheckpointEntry) IsRunning ¶ added in v0.6.0
func (e *CheckpointEntry) IsRunning() bool
func (*CheckpointEntry) Read ¶ added in v0.6.0
func (e *CheckpointEntry) Read(fs *objectio.ObjectFS) (data *logtail.CheckpointData, err error)
func (*CheckpointEntry) Replay ¶ added in v0.6.0
func (e *CheckpointEntry) Replay( c *catalog.Catalog, fs *objectio.ObjectFS, dataFactory catalog.DataFactory) (readDuration, applyDuration time.Duration, err error)
func (*CheckpointEntry) SetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) SetLocation(location string)
func (*CheckpointEntry) SetState ¶ added in v0.6.0
func (e *CheckpointEntry) SetState(state State) (ok bool)
func (*CheckpointEntry) String ¶ added in v0.6.0
func (e *CheckpointEntry) String() string
type Option ¶ added in v0.6.0
type Option func(*runner)
func WithCollectInterval ¶ added in v0.6.0
func WithFlushInterval ¶ added in v0.6.0
func WithForceFlushCheckInterval ¶ added in v0.6.0
func WithForceFlushTimeout ¶ added in v0.6.0
func WithMinCount ¶ added in v0.6.0
func WithMinGlobalInterval ¶ added in v0.6.0
func WithMinIncrementalInterval ¶ added in v0.6.0
func WithObserver ¶ added in v0.6.0
type Runner ¶ added in v0.6.0
type Runner interface { Start() Stop() EnqueueWait(any) error Replay(catalog.DataFactory) (types.TS, error) MaxLSN() uint64 MockCheckpoint(end types.TS) FlushTable(dbID, tableID uint64, ts types.TS) error // for test, delete in next phase TestCheckpoint(entry *CheckpointEntry) DebugUpdateOptions(opts ...Option) GetAllCheckpoints() []*CheckpointEntry CollectCheckpointsInRange(start, end types.TS) (ckpLoc string, lastEnd types.TS) }
Click to show internal directories.
Click to hide internal directories.