Documentation ¶
Index ¶
- Constants
- Variables
- func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
- func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string
- func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)
- func HandleSyncLogTailReq(ctx context.Context, ckpClient CheckpointClient, mgr *Manager, ...) (resp api.SyncLogTailResp, closeCB func(), err error)
- func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)
- func LoadBlkColumnsByMeta(cxt context.Context, colTypes []types.Type, colNames []string, id uint16, ...) (*containers.Batch, error)
- func LoadCNBlkColumnsByMeta(cxt context.Context, colTypes []types.Type, colNames []string, id uint16, ...) (*batch.Batch, error)
- func LoadCheckpointEntries(ctx context.Context, metLoc string, tableID uint64, tableName string, ...) ([]*api.Entry, error)
- func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
- func NewDirtyCollector(sourcer *Manager, clock clock.Clock, catalog *catalog.Catalog, ...) *dirtyCollector
- func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
- type BaseCollector
- func (collector *BaseCollector) Close()
- func (collector *BaseCollector) OrphanData() *CheckpointData
- func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
- func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
- func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
- type BlockT
- type BoundTableOperator
- type CNCheckpointData
- func (data *CNCheckpointData) GetTableData(tid uint64) (ins, del, cnIns, segDel *api.Batch, err error)
- func (data *CNCheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
- func (data *CNCheckpointData) PrefetchFrom(ctx context.Context, service fileservice.FileService, key objectio.Location) (err error)
- func (data *CNCheckpointData) ReadFrom(ctx context.Context, reader *blockio.BlockReader, m *mpool.MPool) (err error)
- type CatalogLogtailRespBuilder
- type CheckpointClient
- type CheckpointData
- func (data *CheckpointData) ApplyReplayTo(c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
- func (data *CheckpointData) Close()
- func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetSegBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, ...)
- func (data *CheckpointData) PrefetchFrom(ctx context.Context, service fileservice.FileService, key objectio.Location) (err error)
- func (data *CheckpointData) PrintData()
- func (data *CheckpointData) ReadFrom(ctx context.Context, reader *blockio.BlockReader, m *mpool.MPool) (err error)
- func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
- func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
- func (data *CheckpointData) WriteTo(writer *blockio.BlockWriter) (blks []objectio.BlockObject, err error)
- type CheckpointMeta
- type Collector
- type DirtyEntryInterceptor
- type DirtyTreeEntry
- type GlobalCollector
- func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
- func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
- func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
- type IncrementalCollector
- type Logtailer
- type LogtailerImpl
- func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
- func (l *LogtailerImpl) RangeLogtail(ctx context.Context, from, to timestamp.Timestamp) ([]logtail.TableLogtail, []func(), error)
- func (l *LogtailerImpl) RegisterCallback(...)
- func (l *LogtailerImpl) TableLogtail(ctx context.Context, table api.TableID, from, to timestamp.Timestamp) (logtail.TableLogtail, func(), error)
- type Manager
- func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)
- func (mgr *Manager) GetReader(from, to types.TS) *Reader
- func (mgr *Manager) GetTableOperator(from, to types.TS, catalog *catalog.Catalog, dbID, tableID uint64, scope Scope, ...) *BoundTableOperator
- func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)
- func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)
- func (mgr *Manager) RegisterCallback(...) error
- func (mgr *Manager) Start()
- func (mgr *Manager) Stop()
- type Reader
- type RespBuilder
- type RowT
- type Scope
- type TableLogtailRespBuilder
- type TableRespKind
- type TxnLogtailRespBuilder
- type TxnTable
Constants ¶
const ( SnapshotAttr_TID = catalog.SnapshotAttr_TID SnapshotAttr_DBID = catalog.SnapshotAttr_DBID SegmentAttr_ID = catalog.SegmentAttr_ID SegmentAttr_CreateAt = catalog.SegmentAttr_CreateAt SegmentAttr_SegNode = catalog.SegmentAttr_SegNode SnapshotAttr_BlockMaxRow = catalog.SnapshotAttr_BlockMaxRow SnapshotAttr_SegmentMaxBlock = catalog.SnapshotAttr_SegmentMaxBlock SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start" SnapshotMetaAttr_BlockInsertBatchEnd = "block_insert_batch_end" SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start" SnapshotMetaAttr_BlockDeleteBatchEnd = "block_delete_batch_end" SnapshotMetaAttr_SegDeleteBatchStart = "seg_delete_batch_start" SnapshotMetaAttr_SegDeleteBatchEnd = "seg_delete_batch_end" SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra )
const ( MetaIDX uint16 = iota DBInsertIDX DBInsertTxnIDX DBDeleteIDX DBDeleteTxnIDX TBLInsertIDX TBLInsertTxnIDX TBLDeleteIDX TBLDeleteTxnIDX TBLColInsertIDX TBLColDeleteIDX SEGInsertIDX SEGInsertTxnIDX SEGDeleteIDX SEGDeleteTxnIDX BLKMetaInsertIDX BLKMetaInsertTxnIDX BLKMetaDeleteIDX BLKMetaDeleteTxnIDX BLKDNMetaInsertIDX BLKDNMetaInsertTxnIDX BLKDNMetaDeleteIDX BLKDNMetaDeleteTxnIDX BLKCNMetaInsertIDX BLKInsertIDX BLKInsertTxnIDX BLKDeleteIDX BLKDeleteTxnIDX )
const ( Checkpoint_Meta_TID_IDX = 2 Checkpoint_Meta_Insert_Block_Start_IDX = 3 Checkpoint_Meta_Insert_Block_End_IDX = 4 Checkpoint_Meta_Delete_Block_Start_IDX = 5 Checkpoint_Meta_Delete_Block_End_IDX = 6 Checkpoint_Meta_Segment_Start_IDX = 7 Checkpoint_Meta_Segment_End_IDX = 8 )
const (
LogtailHeartbeatDuration = time.Millisecond * 2
)
const MaxIDX = BLKCNMetaInsertIDX + 1
const Size90M = 90 * 1024 * 1024
Variables ¶
var ( // for blk meta response BlkMetaSchema *catalog.Schema DelSchema *catalog.Schema SegSchema *catalog.Schema TxnNodeSchema *catalog.Schema DBDNSchema *catalog.Schema TblDNSchema *catalog.Schema SegDNSchema *catalog.Schema BlkDNSchema *catalog.Schema MetaSchema *catalog.Schema )
var ( SegmentSchemaAttr = []string{ SegmentAttr_ID, SegmentAttr_CreateAt, SegmentAttr_SegNode, } SegmentSchemaTypes = []types.Type{ types.New(types.T_uuid, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_blob, 0, 0), } TxnNodeSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, } TxnNodeSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), } DBDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } DBDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } TblDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_BlockMaxRow, SnapshotAttr_SegmentMaxBlock, SnapshotAttr_SchemaExtra, } TblDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint16, 0, 0), types.New(types.T_varchar, 0, 0), } SegmentDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } SegmentDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } BlockDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, pkgcatalog.BlockMeta_MetaLoc, pkgcatalog.BlockMeta_DeltaLoc, } BlockDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } MetaSchemaAttr = []string{ SnapshotAttr_TID, SnapshotMetaAttr_BlockInsertBatchStart, SnapshotMetaAttr_BlockInsertBatchEnd, SnapshotMetaAttr_BlockDeleteBatchStart, SnapshotMetaAttr_BlockDeleteBatchEnd, SnapshotMetaAttr_SegDeleteBatchStart, SnapshotMetaAttr_SegDeleteBatchEnd, } MetaShcemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), } BaseAttr = []string{ catalog.AttrRowID, catalog.AttrCommitTs, } BaseTypes = []types.Type{ types.T_Rowid.ToType(), types.T_TS.ToType(), } )
Functions ¶
func BatchToString ¶
func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func DataChangeToLogtailBatch ¶ added in v0.8.0
func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value
func DebugBatchToString ¶
func GlobalCheckpointDataFactory ¶ added in v0.7.0
func HandleSyncLogTailReq ¶
func HandleSyncLogTailReq( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog, req api.SyncLogTailReq, canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)
func LoadBlkColumnsByMeta ¶ added in v0.8.0
func LoadCNBlkColumnsByMeta ¶ added in v0.8.0
func LoadCheckpointEntries ¶
func MockCallback ¶ added in v0.8.0
func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
func NewDirtyCollector ¶
func ToStringTemplate ¶
func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
Types ¶
type BaseCollector ¶ added in v0.7.0
type BaseCollector struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func (*BaseCollector) Close ¶ added in v0.7.0
func (collector *BaseCollector) Close()
func (*BaseCollector) OrphanData ¶ added in v0.7.0
func (collector *BaseCollector) OrphanData() *CheckpointData
func (*BaseCollector) VisitBlk ¶ added in v0.7.0
func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
func (*BaseCollector) VisitDB ¶ added in v0.7.0
func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
func (*BaseCollector) VisitSeg ¶ added in v0.7.0
func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
func (*BaseCollector) VisitTable ¶ added in v0.7.0
func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
type BoundTableOperator ¶
type BoundTableOperator struct {
// contains filtered or unexported fields
}
BoundTableOperator holds a read only reader, knows how to iterate catalog entries.
func NewBoundTableOperator ¶
func (*BoundTableOperator) Run ¶
func (c *BoundTableOperator) Run() error
Run takes a RespBuilder to visit every table/segment/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry
type CNCheckpointData ¶ added in v0.8.0
type CNCheckpointData struct {
// contains filtered or unexported fields
}
func NewCNCheckpointData ¶ added in v0.8.0
func NewCNCheckpointData() *CNCheckpointData
func (*CNCheckpointData) GetTableData ¶ added in v0.8.0
func (data *CNCheckpointData) GetTableData(tid uint64) (ins, del, cnIns, segDel *api.Batch, err error)
func (*CNCheckpointData) GetTableMeta ¶ added in v0.8.0
func (data *CNCheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
func (*CNCheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CNCheckpointData) PrefetchFrom( ctx context.Context, service fileservice.FileService, key objectio.Location) (err error)
func (*CNCheckpointData) ReadFrom ¶ added in v0.8.0
func (data *CNCheckpointData) ReadFrom( ctx context.Context, reader *blockio.BlockReader, m *mpool.MPool) (err error)
type CatalogLogtailRespBuilder ¶
type CatalogLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator
func (*CatalogLogtailRespBuilder) BuildResp ¶
func (b *CatalogLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*CatalogLogtailRespBuilder) Close ¶
func (b *CatalogLogtailRespBuilder) Close()
func (*CatalogLogtailRespBuilder) VisitDB ¶
func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error
VisitDB = catalog.Processor.OnDatabase
func (*CatalogLogtailRespBuilder) VisitTbl ¶
func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error
VisitTbl = catalog.Processor.OnTable
type CheckpointClient ¶
type CheckpointData ¶
type CheckpointData struct {
// contains filtered or unexported fields
}
func NewCheckpointData ¶
func NewCheckpointData() *CheckpointData
func (*CheckpointData) ApplyReplayTo ¶
func (data *CheckpointData) ApplyReplayTo( c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
func (*CheckpointData) Close ¶
func (data *CheckpointData) Close()
func (*CheckpointData) GetBlkBatchs ¶
func (data *CheckpointData) GetBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDBBatchs ¶
func (data *CheckpointData) GetDBBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDNBlkBatchs ¶
func (data *CheckpointData) GetDNBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetSegBatchs ¶
func (data *CheckpointData) GetSegBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTblBatchs ¶
func (data *CheckpointData) GetTblBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CheckpointData) PrefetchFrom( ctx context.Context, service fileservice.FileService, key objectio.Location) (err error)
func (*CheckpointData) PrintData ¶
func (data *CheckpointData) PrintData()
func (*CheckpointData) ReadFrom ¶
func (data *CheckpointData) ReadFrom( ctx context.Context, reader *blockio.BlockReader, m *mpool.MPool) (err error)
TODO: There need a global io pool
func (*CheckpointData) UpdateBlkMeta ¶
func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
func (*CheckpointData) UpdateSegMeta ¶ added in v0.8.0
func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
func (*CheckpointData) WriteTo ¶
func (data *CheckpointData) WriteTo( writer *blockio.BlockWriter) (blks []objectio.BlockObject, err error)
type CheckpointMeta ¶
type CheckpointMeta struct {
// contains filtered or unexported fields
}
func NewCheckpointMeta ¶
func NewCheckpointMeta() *CheckpointMeta
type Collector ¶
type Collector interface { String() string Run() ScanInRange(from, to types.TS) (*DirtyTreeEntry, int) ScanInRangePruned(from, to types.TS) *DirtyTreeEntry IsCommitted(from, to types.TS) bool GetAndRefreshMerged() *DirtyTreeEntry Merge() *DirtyTreeEntry GetMaxLSN(from, to types.TS) uint64 Init(maxts types.TS) }
type DirtyEntryInterceptor ¶
type DirtyTreeEntry ¶
func NewDirtyTreeEntry ¶
func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry
func NewEmptyDirtyTreeEntry ¶
func NewEmptyDirtyTreeEntry() *DirtyTreeEntry
func (*DirtyTreeEntry) GetTimeRange ¶
func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)
func (*DirtyTreeEntry) GetTree ¶
func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)
func (*DirtyTreeEntry) IsEmpty ¶
func (entry *DirtyTreeEntry) IsEmpty() bool
func (*DirtyTreeEntry) Merge ¶
func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)
func (*DirtyTreeEntry) String ¶
func (entry *DirtyTreeEntry) String() string
type GlobalCollector ¶ added in v0.7.0
type GlobalCollector struct { *BaseCollector // contains filtered or unexported fields }
func NewGlobalCollector ¶ added in v0.7.0
func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector
func (*GlobalCollector) VisitBlk ¶ added in v0.7.0
func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
func (*GlobalCollector) VisitDB ¶ added in v0.7.0
func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
func (*GlobalCollector) VisitSeg ¶ added in v0.7.0
func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
func (*GlobalCollector) VisitTable ¶ added in v0.7.0
func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
type IncrementalCollector ¶
type IncrementalCollector struct {
*BaseCollector
}
func NewIncrementalCollector ¶
func NewIncrementalCollector(start, end types.TS) *IncrementalCollector
type Logtailer ¶ added in v0.7.0
type Logtailer interface { // RangeLogtail returns logtail for all tables within the range (from, to]. // NOTE: caller should keep time range monotonous, or there would be a checkpoint. RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) // TableLogtail returns logtail for the specified table. // // NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value. TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error) // Now is a time getter from TxnManager. Users of Logtailer should get a timestamp // from Now and use the timestamp to collect logtail, in that case, all txn prepared // before it are visible. Now() (timestamp.Timestamp, timestamp.Timestamp) }
Logtailer provides logtail for the specified table.
type LogtailerImpl ¶ added in v0.7.0
type LogtailerImpl struct {
// contains filtered or unexported fields
}
func NewLogtailer ¶ added in v0.7.0
func NewLogtailer( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog) *LogtailerImpl
func (*LogtailerImpl) Now ¶ added in v0.8.0
func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.
func (*LogtailerImpl) RangeLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error)
RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table
func (*LogtailerImpl) RegisterCallback ¶ added in v0.8.0
func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)
func (*LogtailerImpl) TableLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error)
TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`
type Manager ¶ added in v0.7.0
type Manager struct { txnbase.NoopCommitListener // contains filtered or unexported fields }
Logtail manager holds sorted txn handles. Its main jobs:
- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp
func NewManager ¶ added in v0.7.0
func (*Manager) GetReader ¶ added in v0.7.0
GetReader get a snapshot of all txn prepared between from and to.
func (*Manager) GetTableOperator ¶ added in v0.7.0
func (*Manager) OnEndPrePrepare ¶ added in v0.7.0
OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager
func (*Manager) OnEndPrepareWAL ¶ added in v0.8.0
func (*Manager) RegisterCallback ¶ added in v0.8.0
type Reader ¶ added in v0.7.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is a snapshot of all txn prepared between from and to. Dirty tables/segments/blocks can be queried based on those txn
func (*Reader) GetDirtyByTable ¶ added in v0.7.0
Merge all dirty table/segment/block of **a table** into one tree
func (*Reader) HasCatalogChanges ¶ added in v0.7.0
HasCatalogChanges returns true if any txn in the reader modified the Catalog
func (*Reader) IsCommitted ¶ added in v0.8.0
type RespBuilder ¶
type RespBuilder interface { catalog.Processor BuildResp() (api.SyncLogTailResp, error) Close() }
type TableLogtailRespBuilder ¶
type TableLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator
func NewTableLogtailRespBuilder ¶
func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder
func (*TableLogtailRespBuilder) BuildResp ¶
func (b *TableLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*TableLogtailRespBuilder) Close ¶
func (b *TableLogtailRespBuilder) Close()
func (*TableLogtailRespBuilder) VisitBlk ¶
func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error
VisitBlk = catalog.Processor.OnBlock
func (*TableLogtailRespBuilder) VisitSeg ¶ added in v0.8.0
func (b *TableLogtailRespBuilder) VisitSeg(e *catalog.SegmentEntry) error
type TableRespKind ¶ added in v0.8.0
type TableRespKind int
const ( TableRespKind_Data TableRespKind = iota TableRespKind_Blk TableRespKind_Seg )
type TxnLogtailRespBuilder ¶ added in v0.8.0
type TxnLogtailRespBuilder struct {
// contains filtered or unexported fields
}
func NewTxnLogtailRespBuilder ¶ added in v0.8.0
func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder
func (*TxnLogtailRespBuilder) BuildResp ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) BuildResp()
func (*TxnLogtailRespBuilder) Close ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) Close()
func (*TxnLogtailRespBuilder) CollectLogtail ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())