Documentation ¶
Index ¶
- Constants
- Variables
- func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
- func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string
- func GetMetaIdxesByVersion(ver uint32) []uint16
- func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)
- func HandleSyncLogTailReq(ctx context.Context, ckpClient CheckpointClient, mgr *Manager, ...) (resp api.SyncLogTailResp, closeCB func(), err error)
- func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)
- func LoadBlkColumnsByMeta(version uint32, cxt context.Context, colTypes []types.Type, colNames []string, ...) ([]*containers.Batch, error)
- func LoadCNSubBlkColumnsByMeta(version uint32, cxt context.Context, colTypes []types.Type, colNames []string, ...) ([]*batch.Batch, error)
- func LoadCNSubBlkColumnsByMetaWithId(cxt context.Context, colTypes []types.Type, colNames []string, dataType uint16, ...) (ioResult *batch.Batch, err error)
- func LoadCheckpointEntries(ctx context.Context, metLoc string, tableID uint64, tableName string, ...) ([]*api.Entry, []func(), error)
- func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
- func NewDirtyCollector(sourcer *Manager, clock clock.Clock, catalog *catalog.Catalog, ...) *dirtyCollector
- func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
- type BaseCollector
- func (collector *BaseCollector) Close()
- func (collector *BaseCollector) OrphanData() *CheckpointData
- func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
- func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
- func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
- type BlockLocation
- func (l BlockLocation) Contains(i common.ClosedInterval) bool
- func (l BlockLocation) GetEndOffset() uint64
- func (l BlockLocation) GetID() uint16
- func (l BlockLocation) GetLocation() objectio.Location
- func (l BlockLocation) GetStartOffset() uint64
- func (l BlockLocation) SetEndOffset(end uint64)
- func (l BlockLocation) SetID(id uint16)
- func (l BlockLocation) SetLocation(location objectio.Location)
- func (l BlockLocation) SetStartOffset(start uint64)
- func (l BlockLocation) String() string
- type BlockLocations
- type BlockLocationsIterator
- type BlockT
- type BoundTableOperator
- type CNCheckpointData
- func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()
- func (data *CNCheckpointData) GetTableDataFromBats(tid uint64, bats []*batch.Batch) (ins, del, cnIns, segDel *api.Batch, err error)
- func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)
- func (data *CNCheckpointData) InitMetaIdx(ctx context.Context, version uint32, reader *blockio.BlockReader, ...) error
- func (data *CNCheckpointData) PrefetchFrom(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CNCheckpointData) PrefetchMetaFrom(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CNCheckpointData) PrefetchMetaIdx(ctx context.Context, version uint32, idxes []uint16, key objectio.Location, ...) (err error)
- func (data *CNCheckpointData) ReadFromData(ctx context.Context, tableID uint64, location objectio.Location, ...) (dataBats []*batch.Batch, err error)
- func (data *CNCheckpointData) ReadFromDataWithKey(ctx context.Context, location objectio.Location, fs fileservice.FileService, ...) (cnBatch *batch.Batch, err error)
- type CatalogLogtailRespBuilder
- type CheckpointClient
- type CheckpointData
- func (data *CheckpointData) ApplyReplayTo(c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
- func (data *CheckpointData) Close()
- func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)
- func (data *CheckpointData) GetBatches() []*containers.Batch
- func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetSegBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, ...)
- func (data *CheckpointData) PrefetchFrom(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CheckpointData) PrefetchMeta(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CheckpointData) PrintData()
- func (data *CheckpointData) ReadFrom(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CheckpointData) ReadTNMetaBatch(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
- func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
- func (data *CheckpointData) WriteTo(fs fileservice.FileService, blockRows int, checkpointSize int) (CNLocation, TNLocation objectio.Location, err error)
- type CheckpointMeta
- type Collector
- type DirtyEntryInterceptor
- type DirtyTreeEntry
- type GlobalCollector
- func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
- func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
- func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
- type IncrementalCollector
- type Logtailer
- type LogtailerImpl
- func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
- func (l *LogtailerImpl) RangeLogtail(ctx context.Context, from, to timestamp.Timestamp) ([]logtail.TableLogtail, []func(), error)
- func (l *LogtailerImpl) RegisterCallback(...)
- func (l *LogtailerImpl) TableLogtail(ctx context.Context, table api.TableID, from, to timestamp.Timestamp) (logtail.TableLogtail, func(), error)
- type Manager
- func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)
- func (mgr *Manager) GetReader(from, to types.TS) *Reader
- func (mgr *Manager) GetTableOperator(from, to types.TS, catalog *catalog.Catalog, dbID, tableID uint64, scope Scope, ...) *BoundTableOperator
- func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)
- func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)
- func (mgr *Manager) RegisterCallback(...) error
- func (mgr *Manager) Start()
- func (mgr *Manager) Stop()
- func (mgr *Manager) TryCompactTable()
- type Reader
- type RespBuilder
- type RowT
- type Scope
- type TableLogtailRespBuilder
- type TableMeta
- type TableRespKind
- type TxnLogtailRespBuilder
- type TxnTable
- func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)
- func (table *TxnTable) ForeachRowInBetween(from, to types.TS, skipBlkOp func(blk BlockT) bool, ...) (readRows int)
- func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)
- func (table *TxnTable) TryCompact(from types.TS, rt *dbutils.Runtime) (to types.TS)
Constants ¶
const ( SnapshotAttr_TID = catalog.SnapshotAttr_TID SnapshotAttr_DBID = catalog.SnapshotAttr_DBID SegmentAttr_ID = catalog.SegmentAttr_ID SegmentAttr_CreateAt = catalog.SegmentAttr_CreateAt SegmentAttr_SegNode = catalog.SegmentAttr_SegNode SnapshotAttr_BlockMaxRow = catalog.SnapshotAttr_BlockMaxRow SnapshotAttr_SegmentMaxBlock = catalog.SnapshotAttr_SegmentMaxBlock SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start" SnapshotMetaAttr_BlockInsertBatchEnd = "block_insert_batch_end" SnapshotMetaAttr_BlockInsertBatchLocation = "block_insert_batch_location" SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start" SnapshotMetaAttr_BlockDeleteBatchEnd = "block_delete_batch_end" SnapshotMetaAttr_BlockDeleteBatchLocation = "block_delete_batch_location" SnapshotMetaAttr_BlockCNInsertBatchLocation = "block_cn_insert_batch_location" SnapshotMetaAttr_SegDeleteBatchStart = "seg_delete_batch_start" SnapshotMetaAttr_SegDeleteBatchEnd = "seg_delete_batch_end" SnapshotMetaAttr_SegDeleteBatchLocation = "seg_delete_batch_location" CheckpointMetaAttr_BlockLocation = "checkpoint_meta_block_location" CheckpointMetaAttr_SchemaType = "checkpoint_meta_schema_type" AccountIDDbNameTblName = catalog.AccountIDDbNameTblName AccountIDDbName = catalog.AccountIDDbName SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra )
const ( CheckpointVersion1 uint32 = 1 CheckpointVersion2 uint32 = 2 CheckpointVersion3 uint32 = 3 CheckpointVersion4 uint32 = 4 CheckpointVersion5 uint32 = 5 CheckpointVersion6 uint32 = 6 CheckpointVersion7 uint32 = 7 CheckpointVersion8 uint32 = 8 CheckpointCurrentVersion = CheckpointVersion8 )
const ( MetaIDX uint16 = iota DBInsertIDX DBInsertTxnIDX DBDeleteIDX DBDeleteTxnIDX TBLInsertIDX TBLInsertTxnIDX TBLDeleteIDX TBLDeleteTxnIDX TBLColInsertIDX TBLColDeleteIDX SEGInsertIDX SEGInsertTxnIDX SEGDeleteIDX SEGDeleteTxnIDX BLKMetaInsertIDX BLKMetaInsertTxnIDX BLKMetaDeleteIDX BLKMetaDeleteTxnIDX BLKTNMetaInsertIDX BLKTNMetaInsertTxnIDX BLKTNMetaDeleteIDX BLKTNMetaDeleteTxnIDX BLKCNMetaInsertIDX TNMetaIDX )
const ( Checkpoint_Meta_TID_IDX = 2 Checkpoint_Meta_Insert_Block_LOC_IDX = 3 Checkpoint_Meta_CN_Delete_Block_LOC_IDX = 4 Checkpoint_Meta_Delete_Block_LOC_IDX = 5 Checkpoint_Meta_Segment_LOC_IDX = 6 )
const ( Checkpoint_Meta_Insert_Block_Start_IDX = 3 Checkpoint_Meta_Insert_Block_End_IDX = 4 Checkpoint_Meta_Delete_Block_Start_IDX = 5 Checkpoint_Meta_Delete_Block_End_IDX = 6 Checkpoint_Meta_Segment_Start_IDX = 7 Checkpoint_Meta_Segment_End_IDX = 8 )
for ver1-3
const ( LocationOffset = 0 LocationLength = objectio.LocationLen StartOffsetOffset = LocationOffset + LocationLength StartOffsetLength = 8 EndOffsetOffset = StartOffsetOffset + StartOffsetLength EndOffsetLength = 8 BlockLocationLength = EndOffsetOffset + EndOffsetLength )
func (l BlockLocations) append iterator
Location is a fixed-length unmodifiable byte array. Layout: Location(objectio.Location) | StartOffset(uint64) | EndOffset(uint64)
const ( BlockInsert = iota BlockDelete CNBlockInsert SegmentDelete )
const DefaultCheckpointBlockRows = 10000
const DefaultCheckpointSize = 1024 * 1024 * 1024
const (
LogtailHeartbeatDuration = time.Millisecond * 2
)
const MaxIDX = TNMetaIDX + 1
const MetaMaxIdx = SegmentDelete + 1
const Size90M = 90 * 1024 * 1024
Variables ¶
var ( // for blk meta response BlkMetaSchema *catalog.Schema // latest version BlkMetaSchema_V1 *catalog.Schema // previous version DelSchema *catalog.Schema SegSchema *catalog.Schema TxnNodeSchema *catalog.Schema DBTNSchema *catalog.Schema TblTNSchema *catalog.Schema SegTNSchema *catalog.Schema BlkTNSchema *catalog.Schema MetaSchema_V1 *catalog.Schema MetaSchema *catalog.Schema DBDelSchema *catalog.Schema TblDelSchema *catalog.Schema ColumnDelSchema *catalog.Schema TNMetaSchema *catalog.Schema DBSpecialDeleteSchema *catalog.Schema TBLSpecialDeleteSchema *catalog.Schema )
var ( DBSpecialDeleteAttr = []string{ pkgcatalog.SystemDBAttr_ID, AccountIDDbName, } DBSpecialDeleteTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, 0, 0), } TBLSpecialDeleteAttr = []string{ pkgcatalog.SystemRelAttr_ID, AccountIDDbNameTblName, } TBLSpecialDeleteTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, 0, 0), } SegmentSchemaAttr = []string{ SegmentAttr_ID, SegmentAttr_CreateAt, SegmentAttr_SegNode, } SegmentSchemaTypes = []types.Type{ types.New(types.T_uuid, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_blob, 0, 0), } TxnNodeSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, } TxnNodeSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), } DBTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } DBTNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } TblTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_BlockMaxRow, SnapshotAttr_SegmentMaxBlock, SnapshotAttr_SchemaExtra, } TblTNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint16, 0, 0), types.New(types.T_varchar, 0, 0), } SegmentTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } SegmentTNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } BlockTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, pkgcatalog.BlockMeta_MetaLoc, pkgcatalog.BlockMeta_DeltaLoc, } BlockTNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } MetaSchemaAttr_V1 = []string{ SnapshotAttr_TID, SnapshotMetaAttr_BlockInsertBatchStart, SnapshotMetaAttr_BlockInsertBatchEnd, SnapshotMetaAttr_BlockDeleteBatchStart, SnapshotMetaAttr_BlockDeleteBatchEnd, SnapshotMetaAttr_SegDeleteBatchStart, SnapshotMetaAttr_SegDeleteBatchEnd, } MetaShcemaTypes_V1 = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), } MetaSchemaAttr = []string{ SnapshotAttr_TID, SnapshotMetaAttr_BlockInsertBatchLocation, SnapshotMetaAttr_BlockCNInsertBatchLocation, SnapshotMetaAttr_BlockDeleteBatchLocation, SnapshotMetaAttr_SegDeleteBatchLocation, } MetaShcemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } DBDelSchemaAttr = []string{ pkgcatalog.SystemDBAttr_ID, } DBDelSchemaTypes = []types.Type{ types.T_uint64.ToType(), } TblDelSchemaAttr = []string{ pkgcatalog.SystemRelAttr_ID, } TblDelSchemaTypes = []types.Type{ types.T_uint64.ToType(), } ColumnDelSchemaAttr = []string{ pkgcatalog.SystemColAttr_UniqName, } ColumnDelSchemaTypes = []types.Type{ types.T_varchar.ToType(), } TNMetaSchemaAttr = []string{ CheckpointMetaAttr_BlockLocation, CheckpointMetaAttr_SchemaType, } TNMetaShcemaTypes = []types.Type{ types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_uint16, 0, 0), } BaseAttr = []string{ catalog.AttrRowID, catalog.AttrCommitTs, } BaseTypes = []types.Type{ types.T_Rowid.ToType(), types.T_TS.ToType(), } )
Functions ¶
func BatchToString ¶
func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func DataChangeToLogtailBatch ¶ added in v0.8.0
func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value
func DebugBatchToString ¶
func GetMetaIdxesByVersion ¶ added in v1.0.0
func GlobalCheckpointDataFactory ¶ added in v0.7.0
func HandleSyncLogTailReq ¶
func HandleSyncLogTailReq( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog, req api.SyncLogTailReq, canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)
func LoadBlkColumnsByMeta ¶ added in v0.8.0
func LoadCNSubBlkColumnsByMeta ¶ added in v1.0.0
func LoadCNSubBlkColumnsByMetaWithId ¶ added in v1.0.0
func LoadCheckpointEntries ¶
func MockCallback ¶ added in v0.8.0
func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
func NewDirtyCollector ¶
func ToStringTemplate ¶
func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
Types ¶
type BaseCollector ¶ added in v0.7.0
type BaseCollector struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func (*BaseCollector) Close ¶ added in v0.7.0
func (collector *BaseCollector) Close()
func (*BaseCollector) OrphanData ¶ added in v0.7.0
func (collector *BaseCollector) OrphanData() *CheckpointData
func (*BaseCollector) VisitBlk ¶ added in v0.7.0
func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
func (*BaseCollector) VisitDB ¶ added in v0.7.0
func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
func (*BaseCollector) VisitSeg ¶ added in v0.7.0
func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
func (*BaseCollector) VisitTable ¶ added in v0.7.0
func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
type BlockLocation ¶ added in v1.0.0
type BlockLocation []byte
func BuildBlockLoaction ¶ added in v1.0.0
func BuildBlockLoaction(id uint16, start, end uint64) BlockLocation
func BuildBlockLoactionWithLocation ¶ added in v1.0.0
func BuildBlockLoactionWithLocation(name objectio.ObjectName, extent objectio.Extent, rows uint32, id uint16, start, end uint64) BlockLocation
func (BlockLocation) Contains ¶ added in v1.0.0
func (l BlockLocation) Contains(i common.ClosedInterval) bool
func (BlockLocation) GetEndOffset ¶ added in v1.0.0
func (l BlockLocation) GetEndOffset() uint64
func (BlockLocation) GetID ¶ added in v1.0.0
func (l BlockLocation) GetID() uint16
func (BlockLocation) GetLocation ¶ added in v1.0.0
func (l BlockLocation) GetLocation() objectio.Location
func (BlockLocation) GetStartOffset ¶ added in v1.0.0
func (l BlockLocation) GetStartOffset() uint64
func (BlockLocation) SetEndOffset ¶ added in v1.0.0
func (l BlockLocation) SetEndOffset(end uint64)
func (BlockLocation) SetID ¶ added in v1.0.0
func (l BlockLocation) SetID(id uint16)
func (BlockLocation) SetLocation ¶ added in v1.0.0
func (l BlockLocation) SetLocation(location objectio.Location)
func (BlockLocation) SetStartOffset ¶ added in v1.0.0
func (l BlockLocation) SetStartOffset(start uint64)
func (BlockLocation) String ¶ added in v1.0.0
func (l BlockLocation) String() string
type BlockLocations ¶ added in v1.0.0
type BlockLocations []byte
func NewEmptyBlockLocations ¶ added in v1.0.0
func NewEmptyBlockLocations() BlockLocations
func (*BlockLocations) Append ¶ added in v1.0.0
func (l *BlockLocations) Append(loc BlockLocation)
func (BlockLocations) MakeIterator ¶ added in v1.0.0
func (l BlockLocations) MakeIterator() *BlockLocationsIterator
func (BlockLocations) String ¶ added in v1.0.0
func (l BlockLocations) String() string
type BlockLocationsIterator ¶ added in v1.0.0
type BlockLocationsIterator struct { *BlockLocations // contains filtered or unexported fields }
func (*BlockLocationsIterator) HasNext ¶ added in v1.0.0
func (i *BlockLocationsIterator) HasNext() bool
func (*BlockLocationsIterator) Next ¶ added in v1.0.0
func (i *BlockLocationsIterator) Next() BlockLocation
type BoundTableOperator ¶
type BoundTableOperator struct {
// contains filtered or unexported fields
}
BoundTableOperator holds a read only reader, knows how to iterate catalog entries.
func NewBoundTableOperator ¶
func (*BoundTableOperator) Run ¶
func (c *BoundTableOperator) Run() error
Run takes a RespBuilder to visit every table/segment/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry
type CNCheckpointData ¶ added in v0.8.0
type CNCheckpointData struct {
// contains filtered or unexported fields
}
func NewCNCheckpointData ¶ added in v0.8.0
func NewCNCheckpointData() *CNCheckpointData
func (*CNCheckpointData) GetCloseCB ¶ added in v1.0.0
func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()
func (*CNCheckpointData) GetTableDataFromBats ¶ added in v1.0.0
func (*CNCheckpointData) GetTableMeta ¶ added in v0.8.0
func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)
func (*CNCheckpointData) InitMetaIdx ¶ added in v1.0.0
func (*CNCheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CNCheckpointData) PrefetchFrom( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location, tableID uint64) (err error)
func (*CNCheckpointData) PrefetchMetaFrom ¶ added in v1.0.0
func (data *CNCheckpointData) PrefetchMetaFrom( ctx context.Context, version uint32, location objectio.Location, service fileservice.FileService, tableID uint64) (err error)
func (*CNCheckpointData) PrefetchMetaIdx ¶ added in v1.0.0
func (data *CNCheckpointData) PrefetchMetaIdx( ctx context.Context, version uint32, idxes []uint16, key objectio.Location, service fileservice.FileService, ) (err error)
func (*CNCheckpointData) ReadFromData ¶ added in v1.0.0
func (*CNCheckpointData) ReadFromDataWithKey ¶ added in v1.0.0
func (data *CNCheckpointData) ReadFromDataWithKey( ctx context.Context, location objectio.Location, fs fileservice.FileService, m *mpool.MPool, ) (cnBatch *batch.Batch, err error)
type CatalogLogtailRespBuilder ¶
type CatalogLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator
func (*CatalogLogtailRespBuilder) BuildResp ¶
func (b *CatalogLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*CatalogLogtailRespBuilder) Close ¶
func (b *CatalogLogtailRespBuilder) Close()
func (*CatalogLogtailRespBuilder) VisitDB ¶
func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error
VisitDB = catalog.Processor.OnDatabase
func (*CatalogLogtailRespBuilder) VisitTbl ¶
func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error
VisitTbl = catalog.Processor.OnTable
type CheckpointClient ¶
type CheckpointData ¶
type CheckpointData struct {
// contains filtered or unexported fields
}
func LoadCheckpointEntriesFromKey ¶ added in v1.0.0
func LoadCheckpointEntriesFromKey(ctx context.Context, fs fileservice.FileService, location objectio.Location, version uint32) ([]objectio.Location, *CheckpointData, error)
func NewCheckpointData ¶
func NewCheckpointData() *CheckpointData
func (*CheckpointData) ApplyReplayTo ¶
func (data *CheckpointData) ApplyReplayTo( c *catalog.Catalog, dataFactory catalog.DataFactory, ) (err error)
func (*CheckpointData) Close ¶
func (data *CheckpointData) Close()
func (*CheckpointData) CloseWhenLoadFromCache ¶ added in v1.0.0
func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)
func (*CheckpointData) GetBatches ¶ added in v1.0.0
func (data *CheckpointData) GetBatches() []*containers.Batch
func (*CheckpointData) GetBlkBatchs ¶
func (data *CheckpointData) GetBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDBBatchs ¶
func (data *CheckpointData) GetDBBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetSegBatchs ¶
func (data *CheckpointData) GetSegBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTNBlkBatchs ¶ added in v1.0.0
func (data *CheckpointData) GetTNBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTblBatchs ¶
func (data *CheckpointData) GetTblBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CheckpointData) PrefetchFrom( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location) (err error)
func (*CheckpointData) PrefetchMeta ¶ added in v1.0.0
func (data *CheckpointData) PrefetchMeta( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location) (err error)
func (*CheckpointData) PrintData ¶
func (data *CheckpointData) PrintData()
func (*CheckpointData) ReadFrom ¶
func (data *CheckpointData) ReadFrom( ctx context.Context, version uint32, location objectio.Location, reader *blockio.BlockReader, fs fileservice.FileService, m *mpool.MPool, ) (err error)
TODO: There need a global io pool
func (*CheckpointData) ReadTNMetaBatch ¶ added in v1.0.0
func (data *CheckpointData) ReadTNMetaBatch( ctx context.Context, version uint32, location objectio.Location, reader *blockio.BlockReader, ) (err error)
func (*CheckpointData) UpdateBlkMeta ¶
func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
func (*CheckpointData) UpdateSegMeta ¶ added in v0.8.0
func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
func (*CheckpointData) WriteTo ¶
func (data *CheckpointData) WriteTo( fs fileservice.FileService, blockRows int, checkpointSize int, ) (CNLocation, TNLocation objectio.Location, err error)
type CheckpointMeta ¶
type CheckpointMeta struct {
// contains filtered or unexported fields
}
func NewCheckpointMeta ¶
func NewCheckpointMeta() *CheckpointMeta
func (*CheckpointMeta) DecodeFromString ¶ added in v1.0.0
func (m *CheckpointMeta) DecodeFromString(keys [][]byte) (err error)
func (*CheckpointMeta) String ¶ added in v1.0.0
func (m *CheckpointMeta) String() string
type Collector ¶
type Collector interface { String() string Run() ScanInRange(from, to types.TS) (*DirtyTreeEntry, int) ScanInRangePruned(from, to types.TS) *DirtyTreeEntry IsCommitted(from, to types.TS) bool GetAndRefreshMerged() *DirtyTreeEntry Merge() *DirtyTreeEntry GetMaxLSN(from, to types.TS) uint64 Init(maxts types.TS) }
type DirtyEntryInterceptor ¶
type DirtyTreeEntry ¶
func NewDirtyTreeEntry ¶
func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry
func NewEmptyDirtyTreeEntry ¶
func NewEmptyDirtyTreeEntry() *DirtyTreeEntry
func (*DirtyTreeEntry) GetTimeRange ¶
func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)
func (*DirtyTreeEntry) GetTree ¶
func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)
func (*DirtyTreeEntry) IsEmpty ¶
func (entry *DirtyTreeEntry) IsEmpty() bool
func (*DirtyTreeEntry) Merge ¶
func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)
func (*DirtyTreeEntry) String ¶
func (entry *DirtyTreeEntry) String() string
type GlobalCollector ¶ added in v0.7.0
type GlobalCollector struct { *BaseCollector // contains filtered or unexported fields }
func NewGlobalCollector ¶ added in v0.7.0
func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector
func (*GlobalCollector) VisitBlk ¶ added in v0.7.0
func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
func (*GlobalCollector) VisitDB ¶ added in v0.7.0
func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
func (*GlobalCollector) VisitSeg ¶ added in v0.7.0
func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
func (*GlobalCollector) VisitTable ¶ added in v0.7.0
func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
type IncrementalCollector ¶
type IncrementalCollector struct {
*BaseCollector
}
func NewIncrementalCollector ¶
func NewIncrementalCollector(start, end types.TS) *IncrementalCollector
type Logtailer ¶ added in v0.7.0
type Logtailer interface { // RangeLogtail returns logtail for all tables within the range (from, to]. // NOTE: caller should keep time range monotonous, or there would be a checkpoint. RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) // TableLogtail returns logtail for the specified table. // // NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value. TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error) // Now is a time getter from TxnManager. Users of Logtailer should get a timestamp // from Now and use the timestamp to collect logtail, in that case, all txn prepared // before it are visible. Now() (timestamp.Timestamp, timestamp.Timestamp) }
Logtailer provides logtail for the specified table.
type LogtailerImpl ¶ added in v0.7.0
type LogtailerImpl struct {
// contains filtered or unexported fields
}
func NewLogtailer ¶ added in v0.7.0
func NewLogtailer( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog) *LogtailerImpl
func (*LogtailerImpl) Now ¶ added in v0.8.0
func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.
func (*LogtailerImpl) RangeLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error)
RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table
func (*LogtailerImpl) RegisterCallback ¶ added in v0.8.0
func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)
func (*LogtailerImpl) TableLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error)
TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`
type Manager ¶ added in v0.7.0
type Manager struct { txnbase.NoopCommitListener // contains filtered or unexported fields }
Logtail manager holds sorted txn handles. Its main jobs:
- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp
func NewManager ¶ added in v0.7.0
func (*Manager) GetReader ¶ added in v0.7.0
GetReader get a snapshot of all txn prepared between from and to.
func (*Manager) GetTableOperator ¶ added in v0.7.0
func (*Manager) OnEndPrePrepare ¶ added in v0.7.0
OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager
func (*Manager) OnEndPrepareWAL ¶ added in v0.8.0
func (*Manager) RegisterCallback ¶ added in v0.8.0
func (*Manager) TryCompactTable ¶ added in v1.0.0
func (mgr *Manager) TryCompactTable()
type Reader ¶ added in v0.7.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is a snapshot of all txn prepared between from and to. Dirty tables/segments/blocks can be queried based on those txn
func (*Reader) GetDirtyByTable ¶ added in v0.7.0
Merge all dirty table/segment/block of **a table** into one tree
func (*Reader) HasCatalogChanges ¶ added in v0.7.0
HasCatalogChanges returns true if any txn in the reader modified the Catalog
func (*Reader) IsCommitted ¶ added in v0.8.0
type RespBuilder ¶
type RespBuilder interface { catalog.Processor BuildResp() (api.SyncLogTailResp, error) Close() }
type TableLogtailRespBuilder ¶
type TableLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator
func NewTableLogtailRespBuilder ¶
func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder
func (*TableLogtailRespBuilder) BuildResp ¶
func (b *TableLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*TableLogtailRespBuilder) Close ¶
func (b *TableLogtailRespBuilder) Close()
func (*TableLogtailRespBuilder) VisitBlk ¶
func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error
VisitBlk = catalog.Processor.OnBlock
func (*TableLogtailRespBuilder) VisitSeg ¶ added in v0.8.0
func (b *TableLogtailRespBuilder) VisitSeg(e *catalog.SegmentEntry) error
type TableMeta ¶ added in v1.0.0
type TableMeta struct { common.ClosedInterval // contains filtered or unexported fields }
func NewTableMeta ¶ added in v1.0.0
func NewTableMeta() *TableMeta
type TableRespKind ¶ added in v0.8.0
type TableRespKind int
const ( TableRespKind_Data TableRespKind = iota TableRespKind_Blk TableRespKind_Seg )
type TxnLogtailRespBuilder ¶ added in v0.8.0
type TxnLogtailRespBuilder struct {
// contains filtered or unexported fields
}
func NewTxnLogtailRespBuilder ¶ added in v0.8.0
func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder
func (*TxnLogtailRespBuilder) BuildResp ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) BuildResp()
func (*TxnLogtailRespBuilder) Close ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) Close()
func (*TxnLogtailRespBuilder) CollectLogtail ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())