Documentation ¶
Index ¶
- Constants
- Variables
- func BackupCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)
- func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func CorrectUsageWrongPlacement(c *catalog.Catalog) (int, float64, error)
- func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
- func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string
- func EliminateErrorsOnCache(c *catalog.Catalog, end types.TS) int
- func FillUsageBatOfGlobal(collector *GlobalCollector)
- func FillUsageBatOfIncremental(collector *IncrementalCollector)
- func GetMetaIdxesByVersion(ver uint32) []uint16
- func GetStorageUsageHistory(ctx context.Context, locations []objectio.Location, versions []uint32, ...) ([][]UsageData, [][]UsageData, error)
- func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)
- func HandleSyncLogTailReq(ctx context.Context, ckpClient CheckpointClient, mgr *Manager, ...) (resp api.SyncLogTailResp, closeCB func(), err error)
- func IDXString(idx uint16) string
- func IncrementalCheckpointDataFactory(start, end types.TS, collectUsage bool, skipLoadObjectStats bool) func(c *catalog.Catalog) (*CheckpointData, error)
- func LoadBlkColumnsByMeta(version uint32, cxt context.Context, colTypes []types.Type, colNames []string, ...) ([]*containers.Batch, error)
- func LoadCNSubBlkColumnsByMeta(version uint32, cxt context.Context, colTypes []types.Type, colNames []string, ...) ([]*batch.Batch, error)
- func LoadCNSubBlkColumnsByMetaWithId(cxt context.Context, colTypes []types.Type, colNames []string, dataType uint16, ...) (ioResult *batch.Batch, err error)
- func LoadCheckpointEntries(ctx context.Context, metLoc string, tableID uint64, tableName string, ...) ([]*api.Entry, []func(), error)
- func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
- func NewDirtyCollector(sourcer *Manager, clock clock.Clock, catalog *catalog.Catalog, ...) *dirtyCollector
- func PairAccountVsDB(c *catalog.Catalog) map[uint64]uint64
- func ReWriteCheckpointAndBlockFromKey(ctx context.Context, fs, dstFs fileservice.FileService, ...) (objectio.Location, objectio.Location, []string, error)
- func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
- type BaseCollector
- func (collector *BaseCollector) Allocator() *mpool.MPool
- func (collector *BaseCollector) Close()
- func (collector *BaseCollector) LoadAndCollectObject(c *catalog.Catalog, visitObject func(*catalog.ObjectEntry) error) error
- func (collector *BaseCollector) OrphanData() *CheckpointData
- func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
- func (collector *BaseCollector) VisitBlkForBackup(entry *catalog.BlockEntry) (err error)
- func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *BaseCollector) VisitObj(entry *catalog.ObjectEntry) (err error)
- func (collector *BaseCollector) VisitObjForBackup(entry *catalog.ObjectEntry) (err error)
- func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
- type BlockLocation
- func (l BlockLocation) Contains(i common.ClosedInterval) bool
- func (l BlockLocation) GetEndOffset() uint64
- func (l BlockLocation) GetID() uint16
- func (l BlockLocation) GetLocation() objectio.Location
- func (l BlockLocation) GetStartOffset() uint64
- func (l BlockLocation) SetEndOffset(end uint64)
- func (l BlockLocation) SetID(id uint16)
- func (l BlockLocation) SetLocation(location objectio.Location)
- func (l BlockLocation) SetStartOffset(start uint64)
- func (l BlockLocation) String() string
- type BlockLocations
- type BlockLocationsIterator
- type BlockT
- type BoundTableOperator
- type CNCheckpointData
- func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()
- func (data *CNCheckpointData) GetTableDataFromBats(tid uint64, bats []*batch.Batch) (ins, del, cnIns, objInfo *api.Batch, err error)
- func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)
- func (data *CNCheckpointData) InitMetaIdx(ctx context.Context, version uint32, reader *blockio.BlockReader, ...) error
- func (data *CNCheckpointData) PrefetchFrom(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CNCheckpointData) PrefetchMetaFrom(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CNCheckpointData) PrefetchMetaIdx(ctx context.Context, version uint32, idxes []uint16, key objectio.Location, ...) (err error)
- func (data *CNCheckpointData) ReadFromData(ctx context.Context, tableID uint64, location objectio.Location, ...) (dataBats []*batch.Batch, err error)
- type CatalogLogtailRespBuilder
- type CheckpointClient
- type CheckpointData
- func LoadCheckpointEntriesFromKey(ctx context.Context, fs fileservice.FileService, location objectio.Location, ...) ([]objectio.ObjectName, *CheckpointData, error)
- func LoadSpecifiedCkpBatch(ctx context.Context, location objectio.Location, version uint32, ...) (data *CheckpointData, err error)
- func NewCheckpointData(mp *mpool.MPool) *CheckpointData
- func NewCheckpointDataWithVersion(ver uint32, mp *mpool.MPool) *CheckpointData
- func (data *CheckpointData) Allocator() *mpool.MPool
- func (data *CheckpointData) ApplyReplayTo(c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
- func (data *CheckpointData) Close()
- func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)
- func (data *CheckpointData) ExportStats(prefix string) []zap.Field
- func (data *CheckpointData) FormatData(mp *mpool.MPool) (err error)
- func (data *CheckpointData) GetBatches() []*containers.Batch
- func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetObjectBatchs() *containers.Batch
- func (data *CheckpointData) GetTNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTNObjectBatchs() *containers.Batch
- func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, ...)
- func (data *CheckpointData) PrefetchFrom(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CheckpointData) PrefetchMeta(ctx context.Context, version uint32, service fileservice.FileService, ...) (err error)
- func (data *CheckpointData) PrintData()
- func (data *CheckpointData) ReadFrom(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CheckpointData) ReadTNMetaBatch(ctx context.Context, version uint32, location objectio.Location, ...) (err error)
- func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
- func (data *CheckpointData) UpdateBlockDeleteBlkMeta(tid uint64, insStart, insEnd int32)
- func (data *CheckpointData) UpdateBlockInsertBlkMeta(tid uint64, insStart, insEnd int32)
- func (data *CheckpointData) UpdateObjectInsertMeta(tid uint64, delStart, delEnd int32)
- func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
- func (data *CheckpointData) WriteTo(fs fileservice.FileService, blockRows int, checkpointSize int) (CNLocation, TNLocation objectio.Location, checkpointFiles []string, err error)
- type CheckpointMeta
- type Collector
- type DirtyEntryInterceptor
- type DirtyTreeEntry
- type GlobalCollector
- func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
- func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *GlobalCollector) VisitObj(entry *catalog.ObjectEntry) error
- func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
- type IncrementalCollector
- type Logtailer
- type LogtailerImpl
- func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
- func (l *LogtailerImpl) RangeLogtail(ctx context.Context, from, to timestamp.Timestamp) ([]logtail.TableLogtail, []func(), error)
- func (l *LogtailerImpl) RegisterCallback(...)
- func (l *LogtailerImpl) TableLogtail(ctx context.Context, table api.TableID, from, to timestamp.Timestamp) (logtail.TableLogtail, func(), error)
- type Manager
- func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)
- func (mgr *Manager) GetReader(from, to types.TS) *Reader
- func (mgr *Manager) GetTableOperator(from, to types.TS, catalog *catalog.Catalog, dbID, tableID uint64, scope Scope, ...) *BoundTableOperator
- func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)
- func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)
- func (mgr *Manager) RegisterCallback(...) error
- func (mgr *Manager) Start()
- func (mgr *Manager) Stop()
- func (mgr *Manager) TryCompactTable()
- type Reader
- type RespBuilder
- type RowT
- type Scope
- type StorageUsageCache
- func (c *StorageUsageCache) CacheLen() int
- func (c *StorageUsageCache) ClearForUpdate()
- func (c *StorageUsageCache) Delete(usage UsageData)
- func (c *StorageUsageCache) GatherAccountSize(id uint64) (size uint64, exist bool)
- func (c *StorageUsageCache) GatherAllAccSize() (usages map[uint64]uint64)
- func (c *StorageUsageCache) Get(usage UsageData) (ret UsageData, exist bool)
- func (c *StorageUsageCache) IsExpired() bool
- func (c *StorageUsageCache) Iter() btree.IterG[UsageData]
- func (c *StorageUsageCache) LessFunc() func(a UsageData, b UsageData) bool
- func (c *StorageUsageCache) MemUsed() float64
- func (c *StorageUsageCache) SetOrReplace(usage UsageData)
- func (c *StorageUsageCache) String() string
- type StorageUsageCacheOption
- type TNUsageMemo
- func (m *TNUsageMemo) AddReqTrace(accountId uint64, tSize uint64, t time.Time, hint string)
- func (m *TNUsageMemo) CacheLen() int
- func (m *TNUsageMemo) Clear()
- func (m *TNUsageMemo) ClearDroppedAccounts(reserved map[uint64]struct{}) string
- func (m *TNUsageMemo) ClearNewAccCache()
- func (m *TNUsageMemo) Delete(usage UsageData)
- func (m *TNUsageMemo) DeltaUpdate(delta UsageData, del bool)
- func (m *TNUsageMemo) EnterProcessing()
- func (m *TNUsageMemo) EstablishFromCKPs(c *catalog.Catalog)
- func (m *TNUsageMemo) GatherAccountSize(id uint64) (size uint64, exist bool)
- func (m *TNUsageMemo) GatherAllAccSize() (usages map[uint64]uint64)
- func (m *TNUsageMemo) GatherNewAccountSize(id uint64) (size uint64, exist bool)
- func (m *TNUsageMemo) Get(usage UsageData) (old UsageData, exist bool)
- func (m *TNUsageMemo) GetAllReqTrace() (accountIds []uint64, timestamps []time.Time, sizes []uint64, hints []string)
- func (m *TNUsageMemo) GetCache() *StorageUsageCache
- func (m *TNUsageMemo) GetDelayed() map[uint64]UsageData
- func (m *TNUsageMemo) GetNewAccCacheLatestUpdate() types.TS
- func (m *TNUsageMemo) HasUpdate() bool
- func (m *TNUsageMemo) LeaveProcessing()
- func (m *TNUsageMemo) MemoryUsed() float64
- func (m *TNUsageMemo) PrepareReplay(datas []*CheckpointData, vers []uint32)
- func (m *TNUsageMemo) Replace(new UsageData)
- func (m *TNUsageMemo) UpdateNewAccCache(usage UsageData, del bool)
- type TableLogtailRespBuilder
- type TableMeta
- type TableRespKind
- type TempFKey
- type TempFilter
- type TxnLogtailRespBuilder
- type TxnTable
- func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)
- func (table *TxnTable) ForeachRowInBetween(from, to types.TS, skipBlkOp func(blk BlockT) bool, ...) (readRows int)
- func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)
- func (table *TxnTable) TryCompact(from types.TS, rt *dbutils.Runtime) (to types.TS)
- type UsageData
Constants ¶
const ( UsageAccID uint8 = iota UsageDBID UsageTblID UsageObjID UsageSize UsageMAX )
const ( SnapshotAttr_TID = catalog.SnapshotAttr_TID SnapshotAttr_DBID = catalog.SnapshotAttr_DBID ObjectAttr_ID = catalog.ObjectAttr_ID ObjectAttr_CreateAt = catalog.ObjectAttr_CreateAt ObjectAttr_SegNode = catalog.ObjectAttr_SegNode SnapshotAttr_BlockMaxRow = catalog.SnapshotAttr_BlockMaxRow SnapshotAttr_ObjectMaxBlock = catalog.SnapshotAttr_ObjectMaxBlock ObjectAttr_ObjectStats = catalog.ObjectAttr_ObjectStats ObjectAttr_State = catalog.ObjectAttr_State ObjectAttr_Sorted = catalog.ObjectAttr_Sorted EntryNode_CreateAt = catalog.EntryNode_CreateAt EntryNode_DeleteAt = catalog.EntryNode_DeleteAt SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start" SnapshotMetaAttr_BlockInsertBatchEnd = "block_insert_batch_end" SnapshotMetaAttr_BlockInsertBatchLocation = "block_insert_batch_location" SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start" SnapshotMetaAttr_BlockDeleteBatchEnd = "block_delete_batch_end" SnapshotMetaAttr_BlockDeleteBatchLocation = "block_delete_batch_location" SnapshotMetaAttr_BlockCNInsertBatchLocation = "block_cn_insert_batch_location" SnapshotMetaAttr_SegDeleteBatchStart = "seg_delete_batch_start" SnapshotMetaAttr_SegDeleteBatchEnd = "seg_delete_batch_end" SnapshotMetaAttr_SegDeleteBatchLocation = "seg_delete_batch_location" CheckpointMetaAttr_BlockLocation = "checkpoint_meta_block_location" CheckpointMetaAttr_SchemaType = "checkpoint_meta_schema_type" CheckpointMetaAttr_StorageUsageInsLocation = "checkpoint_meta_storage_usage_ins_location" CheckpointMetaAttr_StorageUsageDelLocation = "checkpoint_meta_storage_usage_del_location" AccountIDDbNameTblName = catalog.AccountIDDbNameTblName AccountIDDbName = catalog.AccountIDDbName // supporting `show accounts` in checkpoint CheckpointMetaAttr_ObjectSize = "checkpoint_meta_object_size" CheckpointMetaAttr_ObjectID = "checkpoint_meta_object_id" SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra )
const ( CheckpointVersion1 uint32 = 1 CheckpointVersion2 uint32 = 2 CheckpointVersion3 uint32 = 3 CheckpointVersion4 uint32 = 4 CheckpointVersion5 uint32 = 5 CheckpointVersion6 uint32 = 6 CheckpointVersion7 uint32 = 7 CheckpointVersion8 uint32 = 8 CheckpointVersion9 uint32 = 9 CheckpointVersion10 uint32 = 10 CheckpointVersion11 uint32 = 11 CheckpointCurrentVersion = CheckpointVersion11 )
const ( MetaIDX uint16 = iota DBInsertIDX DBInsertTxnIDX DBDeleteIDX DBDeleteTxnIDX TBLInsertIDX TBLInsertTxnIDX TBLDeleteIDX TBLDeleteTxnIDX TBLColInsertIDX TBLColDeleteIDX SEGInsertIDX SEGInsertTxnIDX SEGDeleteIDX SEGDeleteTxnIDX BLKMetaInsertIDX BLKMetaInsertTxnIDX BLKMetaDeleteIDX BLKMetaDeleteTxnIDX BLKTNMetaInsertIDX BLKTNMetaInsertTxnIDX BLKTNMetaDeleteIDX BLKTNMetaDeleteTxnIDX BLKCNMetaInsertIDX TNMetaIDX StorageUsageInsIDX ObjectInfoIDX TNObjectInfoIDX StorageUsageDelIDX )
const ( Checkpoint_Meta_TID_IDX = 2 Checkpoint_Meta_Insert_Block_LOC_IDX = 3 Checkpoint_Meta_CN_Delete_Block_LOC_IDX = 4 Checkpoint_Meta_Delete_Block_LOC_IDX = 5 Checkpoint_Meta_Object_LOC_IDX = 6 Checkpoint_Meta_Usage_Ins_LOC_IDX = 7 Checkpoint_Meta_Usage_Del_LOC_IDX = 8 )
const ( Checkpoint_Meta_Insert_Block_Start_IDX = 3 Checkpoint_Meta_Insert_Block_End_IDX = 4 Checkpoint_Meta_Delete_Block_Start_IDX = 5 Checkpoint_Meta_Delete_Block_End_IDX = 6 Checkpoint_Meta_Object_Start_IDX = 7 Checkpoint_Meta_Object_End_IDX = 8 )
for ver1-3
const ( LocationOffset = 0 LocationLength = objectio.LocationLen StartOffsetOffset = LocationOffset + LocationLength StartOffsetLength = 8 EndOffsetOffset = StartOffsetOffset + StartOffsetLength EndOffsetLength = 8 BlockLocationLength = EndOffsetOffset + EndOffsetLength )
func (l BlockLocations) append iterator
Location is a fixed-length unmodifiable byte array. Layout: Location(objectio.Location) | StartOffset(uint64) | EndOffset(uint64)
const ( BlockInsert = iota BlockDelete CNBlockInsert ObjectInfo StorageUsageIns StorageUsageDel )
const DefaultCheckpointBlockRows = 10000
const DefaultCheckpointSize = 512 * 1024 * 1024
const (
LogtailHeartbeatDuration = time.Millisecond * 2
)
const MaxIDX = StorageUsageDelIDX + 1
const MetaMaxIdx = StorageUsageDel + 1
const Size90M = 90 * 1024 * 1024
const StorageUsageMagic uint64 = 0x1A2B3C4D5E6F
const UsageBatMetaTableId uint64 = StorageUsageMagic
Variables ¶
var ( // for blk meta response BlkMetaSchema *catalog.Schema // latest version BlkMetaSchema_V1 *catalog.Schema // previous version DelSchema *catalog.Schema SegSchema *catalog.Schema TxnNodeSchema *catalog.Schema DBTNSchema *catalog.Schema TblTNSchema *catalog.Schema SegTNSchema *catalog.Schema BlkTNSchema *catalog.Schema MetaSchema_V1 *catalog.Schema MetaSchema *catalog.Schema DBDelSchema *catalog.Schema TblDelSchema *catalog.Schema ColumnDelSchema *catalog.Schema TNMetaSchema *catalog.Schema ObjectInfoSchema *catalog.Schema DBSpecialDeleteSchema *catalog.Schema TBLSpecialDeleteSchema *catalog.Schema StorageUsageSchema *catalog.Schema )
var ( DBSpecialDeleteAttr = []string{ pkgcatalog.SystemDBAttr_ID, AccountIDDbName, } DBSpecialDeleteTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, 0, 0), } TBLSpecialDeleteAttr = []string{ pkgcatalog.SystemRelAttr_ID, AccountIDDbNameTblName, } TBLSpecialDeleteTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, 0, 0), } ObjectSchemaAttr = []string{ ObjectAttr_ID, ObjectAttr_CreateAt, ObjectAttr_SegNode, } ObjectSchemaTypes = []types.Type{ types.New(types.T_uuid, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_blob, 0, 0), } TxnNodeSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, } TxnNodeSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), } DBTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } DBTNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } TblTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_BlockMaxRow, SnapshotAttr_ObjectMaxBlock, SnapshotAttr_SchemaExtra, } TblTNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint16, 0, 0), types.New(types.T_varchar, 0, 0), } ObjectTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } ObjectTNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), } BlockTNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, pkgcatalog.BlockMeta_MetaLoc, pkgcatalog.BlockMeta_DeltaLoc, } BlockTNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } MetaSchemaAttr_V1 = []string{ SnapshotAttr_TID, SnapshotMetaAttr_BlockInsertBatchStart, SnapshotMetaAttr_BlockInsertBatchEnd, SnapshotMetaAttr_BlockDeleteBatchStart, SnapshotMetaAttr_BlockDeleteBatchEnd, SnapshotMetaAttr_SegDeleteBatchStart, SnapshotMetaAttr_SegDeleteBatchEnd, } MetaShcemaTypes_V1 = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), types.New(types.T_int32, 0, 0), } MetaSchemaAttr = []string{ SnapshotAttr_TID, SnapshotMetaAttr_BlockInsertBatchLocation, SnapshotMetaAttr_BlockCNInsertBatchLocation, SnapshotMetaAttr_BlockDeleteBatchLocation, SnapshotMetaAttr_SegDeleteBatchLocation, CheckpointMetaAttr_StorageUsageInsLocation, CheckpointMetaAttr_StorageUsageDelLocation, } MetaShcemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } DBDelSchemaAttr = []string{ pkgcatalog.SystemDBAttr_ID, } DBDelSchemaTypes = []types.Type{ types.T_uint64.ToType(), } TblDelSchemaAttr = []string{ pkgcatalog.SystemRelAttr_ID, } TblDelSchemaTypes = []types.Type{ types.T_uint64.ToType(), } ColumnDelSchemaAttr = []string{ pkgcatalog.SystemColAttr_UniqName, } ColumnDelSchemaTypes = []types.Type{ types.T_varchar.ToType(), } TNMetaSchemaAttr = []string{ CheckpointMetaAttr_BlockLocation, CheckpointMetaAttr_SchemaType, } TNMetaShcemaTypes = []types.Type{ types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_uint16, 0, 0), } BaseAttr = []string{ catalog.AttrRowID, catalog.AttrCommitTs, } BaseTypes = []types.Type{ types.T_Rowid.ToType(), types.T_TS.ToType(), } ObjectInfoAttr = []string{ ObjectAttr_ObjectStats, ObjectAttr_State, ObjectAttr_Sorted, SnapshotAttr_DBID, SnapshotAttr_TID, EntryNode_CreateAt, EntryNode_DeleteAt, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, } ObjectInfoTypes = []types.Type{ types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_bool, 0, 0), types.New(types.T_bool, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), } StorageUsageSchemaAttrs = []string{ pkgcatalog.SystemColAttr_AccID, SnapshotAttr_DBID, SnapshotAttr_TID, CheckpointMetaAttr_ObjectID, CheckpointMetaAttr_ObjectSize, } StorageUsageSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uint64, 0, 0), types.New(types.T_uuid, 0, 0), types.New(types.T_uint64, 0, 0), } )
Functions ¶
func BackupCheckpointDataFactory ¶ added in v1.1.0
func BatchToString ¶
func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func CorrectUsageWrongPlacement ¶ added in v1.1.0
func DataChangeToLogtailBatch ¶ added in v0.8.0
func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch
GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value
func DebugBatchToString ¶
func EliminateErrorsOnCache ¶ added in v1.1.1
func FillUsageBatOfGlobal ¶ added in v1.0.1
func FillUsageBatOfGlobal(collector *GlobalCollector)
func FillUsageBatOfIncremental ¶ added in v1.0.1
func FillUsageBatOfIncremental(collector *IncrementalCollector)
func GetMetaIdxesByVersion ¶ added in v1.0.0
func GetStorageUsageHistory ¶ added in v1.1.0
func GetStorageUsageHistory( ctx context.Context, locations []objectio.Location, versions []uint32, fs fileservice.FileService, mp *mpool.MPool) ([][]UsageData, [][]UsageData, error)
GetStorageUsageHistory is for debug to show these storage usage changes.
1. load each ckp meta batch.
2. load the specified storage usage ins/del batch using locations storing in meta batch.
func GlobalCheckpointDataFactory ¶ added in v0.7.0
func HandleSyncLogTailReq ¶
func HandleSyncLogTailReq( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog, req api.SyncLogTailReq, canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)
func LoadBlkColumnsByMeta ¶ added in v0.8.0
func LoadCNSubBlkColumnsByMeta ¶ added in v1.0.0
func LoadCNSubBlkColumnsByMetaWithId ¶ added in v1.0.0
func LoadCheckpointEntries ¶
func MockCallback ¶ added in v0.8.0
func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
func NewDirtyCollector ¶
func ReWriteCheckpointAndBlockFromKey ¶ added in v1.1.0
func ToStringTemplate ¶
func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
Types ¶
type BaseCollector ¶ added in v0.7.0
type BaseCollector struct { *catalog.LoopProcessor Objects []*catalog.ObjectEntry // for storage usage Usage struct { // db, tbl deletes Deletes []interface{} ObjInserts []*catalog.ObjectEntry ObjDeletes []*catalog.ObjectEntry ReservedAccIds map[uint64]struct{} } UsageMemo *TNUsageMemo // contains filtered or unexported fields }
func (*BaseCollector) Allocator ¶ added in v1.1.0
func (collector *BaseCollector) Allocator() *mpool.MPool
func (*BaseCollector) Close ¶ added in v0.7.0
func (collector *BaseCollector) Close()
func (*BaseCollector) LoadAndCollectObject ¶ added in v1.1.1
func (collector *BaseCollector) LoadAndCollectObject(c *catalog.Catalog, visitObject func(*catalog.ObjectEntry) error) error
func (*BaseCollector) OrphanData ¶ added in v0.7.0
func (collector *BaseCollector) OrphanData() *CheckpointData
func (*BaseCollector) VisitBlk ¶ added in v0.7.0
func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
func (*BaseCollector) VisitBlkForBackup ¶ added in v1.1.0
func (collector *BaseCollector) VisitBlkForBackup(entry *catalog.BlockEntry) (err error)
func (*BaseCollector) VisitDB ¶ added in v0.7.0
func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
func (*BaseCollector) VisitObj ¶ added in v1.1.0
func (collector *BaseCollector) VisitObj(entry *catalog.ObjectEntry) (err error)
func (*BaseCollector) VisitObjForBackup ¶ added in v1.1.0
func (collector *BaseCollector) VisitObjForBackup(entry *catalog.ObjectEntry) (err error)
func (*BaseCollector) VisitTable ¶ added in v0.7.0
func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
type BlockLocation ¶ added in v1.0.0
type BlockLocation []byte
func BuildBlockLoaction ¶ added in v1.0.0
func BuildBlockLoaction(id uint16, start, end uint64) BlockLocation
func BuildBlockLoactionWithLocation ¶ added in v1.0.0
func BuildBlockLoactionWithLocation(name objectio.ObjectName, extent objectio.Extent, rows uint32, id uint16, start, end uint64) BlockLocation
func (BlockLocation) Contains ¶ added in v1.0.0
func (l BlockLocation) Contains(i common.ClosedInterval) bool
func (BlockLocation) GetEndOffset ¶ added in v1.0.0
func (l BlockLocation) GetEndOffset() uint64
func (BlockLocation) GetID ¶ added in v1.0.0
func (l BlockLocation) GetID() uint16
func (BlockLocation) GetLocation ¶ added in v1.0.0
func (l BlockLocation) GetLocation() objectio.Location
func (BlockLocation) GetStartOffset ¶ added in v1.0.0
func (l BlockLocation) GetStartOffset() uint64
func (BlockLocation) SetEndOffset ¶ added in v1.0.0
func (l BlockLocation) SetEndOffset(end uint64)
func (BlockLocation) SetID ¶ added in v1.0.0
func (l BlockLocation) SetID(id uint16)
func (BlockLocation) SetLocation ¶ added in v1.0.0
func (l BlockLocation) SetLocation(location objectio.Location)
func (BlockLocation) SetStartOffset ¶ added in v1.0.0
func (l BlockLocation) SetStartOffset(start uint64)
func (BlockLocation) String ¶ added in v1.0.0
func (l BlockLocation) String() string
type BlockLocations ¶ added in v1.0.0
type BlockLocations []byte
func NewEmptyBlockLocations ¶ added in v1.0.0
func NewEmptyBlockLocations() BlockLocations
func (*BlockLocations) Append ¶ added in v1.0.0
func (l *BlockLocations) Append(loc BlockLocation)
func (BlockLocations) MakeIterator ¶ added in v1.0.0
func (l BlockLocations) MakeIterator() *BlockLocationsIterator
func (BlockLocations) String ¶ added in v1.0.0
func (l BlockLocations) String() string
type BlockLocationsIterator ¶ added in v1.0.0
type BlockLocationsIterator struct { *BlockLocations // contains filtered or unexported fields }
func (*BlockLocationsIterator) HasNext ¶ added in v1.0.0
func (i *BlockLocationsIterator) HasNext() bool
func (*BlockLocationsIterator) Next ¶ added in v1.0.0
func (i *BlockLocationsIterator) Next() BlockLocation
type BoundTableOperator ¶
type BoundTableOperator struct {
// contains filtered or unexported fields
}
BoundTableOperator holds a read only reader, knows how to iterate catalog entries.
func NewBoundTableOperator ¶
func (*BoundTableOperator) Run ¶
func (c *BoundTableOperator) Run() error
Run takes a RespBuilder to visit every table/Object/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry
type CNCheckpointData ¶ added in v0.8.0
type CNCheckpointData struct {
// contains filtered or unexported fields
}
func NewCNCheckpointData ¶ added in v0.8.0
func NewCNCheckpointData() *CNCheckpointData
func (*CNCheckpointData) GetCloseCB ¶ added in v1.0.0
func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()
func (*CNCheckpointData) GetTableDataFromBats ¶ added in v1.0.0
func (*CNCheckpointData) GetTableMeta ¶ added in v0.8.0
func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)
func (*CNCheckpointData) InitMetaIdx ¶ added in v1.0.0
func (*CNCheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CNCheckpointData) PrefetchFrom( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location, tableID uint64) (err error)
func (*CNCheckpointData) PrefetchMetaFrom ¶ added in v1.0.0
func (data *CNCheckpointData) PrefetchMetaFrom( ctx context.Context, version uint32, location objectio.Location, service fileservice.FileService, tableID uint64) (err error)
func (*CNCheckpointData) PrefetchMetaIdx ¶ added in v1.0.0
func (data *CNCheckpointData) PrefetchMetaIdx( ctx context.Context, version uint32, idxes []uint16, key objectio.Location, service fileservice.FileService, ) (err error)
type CatalogLogtailRespBuilder ¶
type CatalogLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator
func (*CatalogLogtailRespBuilder) BuildResp ¶
func (b *CatalogLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*CatalogLogtailRespBuilder) Close ¶
func (b *CatalogLogtailRespBuilder) Close()
func (*CatalogLogtailRespBuilder) VisitDB ¶
func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error
VisitDB = catalog.Processor.OnDatabase
func (*CatalogLogtailRespBuilder) VisitTbl ¶
func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error
VisitTbl = catalog.Processor.OnTable
type CheckpointClient ¶
type CheckpointData ¶
type CheckpointData struct {
// contains filtered or unexported fields
}
func LoadCheckpointEntriesFromKey ¶ added in v1.0.0
func LoadCheckpointEntriesFromKey( ctx context.Context, fs fileservice.FileService, location objectio.Location, version uint32, softDeletes *map[string]bool, ) ([]objectio.ObjectName, *CheckpointData, error)
func LoadSpecifiedCkpBatch ¶ added in v1.0.1
func LoadSpecifiedCkpBatch( ctx context.Context, location objectio.Location, version uint32, batchIdx uint16, fs fileservice.FileService, ) (data *CheckpointData, err error)
LoadSpecifiedCkpBatch loads a specified checkpoint data batch
func NewCheckpointData ¶
func NewCheckpointData(mp *mpool.MPool) *CheckpointData
func NewCheckpointDataWithVersion ¶ added in v1.1.0
func NewCheckpointDataWithVersion(ver uint32, mp *mpool.MPool) *CheckpointData
for test
func (*CheckpointData) Allocator ¶ added in v1.1.0
func (data *CheckpointData) Allocator() *mpool.MPool
func (*CheckpointData) ApplyReplayTo ¶
func (data *CheckpointData) ApplyReplayTo( c *catalog.Catalog, dataFactory catalog.DataFactory, ) (err error)
func (*CheckpointData) Close ¶
func (data *CheckpointData) Close()
func (*CheckpointData) CloseWhenLoadFromCache ¶ added in v1.0.0
func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)
func (*CheckpointData) ExportStats ¶ added in v1.1.0
func (data *CheckpointData) ExportStats(prefix string) []zap.Field
func (*CheckpointData) FormatData ¶ added in v1.1.0
func (data *CheckpointData) FormatData(mp *mpool.MPool) (err error)
func (*CheckpointData) GetBatches ¶ added in v1.0.0
func (data *CheckpointData) GetBatches() []*containers.Batch
func (*CheckpointData) GetBlkBatchs ¶
func (data *CheckpointData) GetBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDBBatchs ¶
func (data *CheckpointData) GetDBBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetObjectBatchs ¶ added in v1.1.0
func (data *CheckpointData) GetObjectBatchs() *containers.Batch
func (*CheckpointData) GetTNBlkBatchs ¶ added in v1.0.0
func (data *CheckpointData) GetTNBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTNObjectBatchs ¶ added in v1.1.0
func (data *CheckpointData) GetTNObjectBatchs() *containers.Batch
func (*CheckpointData) GetTblBatchs ¶
func (data *CheckpointData) GetTblBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) PrefetchFrom ¶ added in v0.8.0
func (data *CheckpointData) PrefetchFrom( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location) (err error)
func (*CheckpointData) PrefetchMeta ¶ added in v1.0.0
func (data *CheckpointData) PrefetchMeta( ctx context.Context, version uint32, service fileservice.FileService, key objectio.Location) (err error)
func (*CheckpointData) PrintData ¶
func (data *CheckpointData) PrintData()
func (*CheckpointData) ReadFrom ¶
func (data *CheckpointData) ReadFrom( ctx context.Context, version uint32, location objectio.Location, reader *blockio.BlockReader, fs fileservice.FileService, ) (err error)
TODO: There need a global io pool
func (*CheckpointData) ReadTNMetaBatch ¶ added in v1.0.0
func (data *CheckpointData) ReadTNMetaBatch( ctx context.Context, version uint32, location objectio.Location, reader *blockio.BlockReader, ) (err error)
func (*CheckpointData) UpdateBlkMeta ¶
func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
func (*CheckpointData) UpdateBlockDeleteBlkMeta ¶ added in v1.1.0
func (data *CheckpointData) UpdateBlockDeleteBlkMeta(tid uint64, insStart, insEnd int32)
func (*CheckpointData) UpdateBlockInsertBlkMeta ¶ added in v1.1.0
func (data *CheckpointData) UpdateBlockInsertBlkMeta(tid uint64, insStart, insEnd int32)
func (*CheckpointData) UpdateObjectInsertMeta ¶ added in v1.1.1
func (data *CheckpointData) UpdateObjectInsertMeta(tid uint64, delStart, delEnd int32)
func (*CheckpointData) UpdateSegMeta ¶ added in v0.8.0
func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)
func (*CheckpointData) WriteTo ¶
func (data *CheckpointData) WriteTo( fs fileservice.FileService, blockRows int, checkpointSize int, ) (CNLocation, TNLocation objectio.Location, checkpointFiles []string, err error)
type CheckpointMeta ¶
type CheckpointMeta struct {
// contains filtered or unexported fields
}
func NewCheckpointMeta ¶
func NewCheckpointMeta() *CheckpointMeta
func (*CheckpointMeta) DecodeFromString ¶ added in v1.0.0
func (m *CheckpointMeta) DecodeFromString(keys [][]byte) (err error)
func (*CheckpointMeta) String ¶ added in v1.0.0
func (m *CheckpointMeta) String() string
type Collector ¶
type Collector interface { String() string Run() ScanInRange(from, to types.TS) (*DirtyTreeEntry, int) ScanInRangePruned(from, to types.TS) *DirtyTreeEntry IsCommitted(from, to types.TS) bool GetAndRefreshMerged() *DirtyTreeEntry Merge() *DirtyTreeEntry GetMaxLSN(from, to types.TS) uint64 Init(maxts types.TS) }
type DirtyEntryInterceptor ¶
type DirtyTreeEntry ¶
func NewDirtyTreeEntry ¶
func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry
func NewEmptyDirtyTreeEntry ¶
func NewEmptyDirtyTreeEntry() *DirtyTreeEntry
func (*DirtyTreeEntry) GetTimeRange ¶
func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)
func (*DirtyTreeEntry) GetTree ¶
func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)
func (*DirtyTreeEntry) IsEmpty ¶
func (entry *DirtyTreeEntry) IsEmpty() bool
func (*DirtyTreeEntry) Merge ¶
func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)
func (*DirtyTreeEntry) String ¶
func (entry *DirtyTreeEntry) String() string
type GlobalCollector ¶ added in v0.7.0
type GlobalCollector struct { *BaseCollector // contains filtered or unexported fields }
func NewGlobalCollector ¶ added in v0.7.0
func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector
func (*GlobalCollector) VisitBlk ¶ added in v0.7.0
func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
func (*GlobalCollector) VisitDB ¶ added in v0.7.0
func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
func (*GlobalCollector) VisitObj ¶ added in v1.1.0
func (collector *GlobalCollector) VisitObj(entry *catalog.ObjectEntry) error
func (*GlobalCollector) VisitTable ¶ added in v0.7.0
func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
type IncrementalCollector ¶
type IncrementalCollector struct {
*BaseCollector
}
func NewBackupCollector ¶ added in v1.1.0
func NewBackupCollector(start, end types.TS) *IncrementalCollector
func NewIncrementalCollector ¶
func NewIncrementalCollector(start, end types.TS, skipLoadObjectStats bool) *IncrementalCollector
type Logtailer ¶ added in v0.7.0
type Logtailer interface { // RangeLogtail returns logtail for all tables within the range (from, to]. // NOTE: caller should keep time range monotonous, or there would be a checkpoint. RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) // TableLogtail returns logtail for the specified table. // // NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value. TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error) // Now is a time getter from TxnManager. Users of Logtailer should get a timestamp // from Now and use the timestamp to collect logtail, in that case, all txn prepared // before it are visible. Now() (timestamp.Timestamp, timestamp.Timestamp) }
Logtailer provides logtail for the specified table.
type LogtailerImpl ¶ added in v0.7.0
type LogtailerImpl struct {
// contains filtered or unexported fields
}
func NewLogtailer ¶ added in v0.7.0
func NewLogtailer( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog) *LogtailerImpl
func (*LogtailerImpl) Now ¶ added in v0.8.0
func (l *LogtailerImpl) Now() (timestamp.Timestamp, timestamp.Timestamp)
Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.
func (*LogtailerImpl) RangeLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, []func(), error)
RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table
func (*LogtailerImpl) RegisterCallback ¶ added in v0.8.0
func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)
func (*LogtailerImpl) TableLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, func(), error)
TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`
type Manager ¶ added in v0.7.0
type Manager struct { txnbase.NoopCommitListener // contains filtered or unexported fields }
Logtail manager holds sorted txn handles. Its main jobs:
- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp
func NewManager ¶ added in v0.7.0
func (*Manager) GetReader ¶ added in v0.7.0
GetReader get a snapshot of all txn prepared between from and to.
func (*Manager) GetTableOperator ¶ added in v0.7.0
func (*Manager) OnEndPrePrepare ¶ added in v0.7.0
OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager
func (*Manager) OnEndPrepareWAL ¶ added in v0.8.0
func (*Manager) RegisterCallback ¶ added in v0.8.0
func (*Manager) TryCompactTable ¶ added in v1.0.0
func (mgr *Manager) TryCompactTable()
type Reader ¶ added in v0.7.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is a snapshot of all txn prepared between from and to. Dirty tables/objects/blocks can be queried based on those txn
func (*Reader) GetDirtyByTable ¶ added in v0.7.0
Merge all dirty table/object/block of **a table** into one tree
func (*Reader) HasCatalogChanges ¶ added in v0.7.0
HasCatalogChanges returns true if any txn in the reader modified the Catalog
func (*Reader) IsCommitted ¶ added in v0.8.0
type RespBuilder ¶
type RespBuilder interface { catalog.Processor BuildResp() (api.SyncLogTailResp, error) Close() }
type StorageUsageCache ¶ added in v1.1.0
type StorageUsageCache struct { // when two requests happens within [lastUpdate, lastUpdate + lazyThreshold], // it will reuse the cached result, no new query to TN. sync.Mutex // contains filtered or unexported fields }
func NewStorageUsageCache ¶ added in v1.1.0
func NewStorageUsageCache(opts ...StorageUsageCacheOption) *StorageUsageCache
func (*StorageUsageCache) CacheLen ¶ added in v1.1.0
func (c *StorageUsageCache) CacheLen() int
func (*StorageUsageCache) ClearForUpdate ¶ added in v1.1.0
func (c *StorageUsageCache) ClearForUpdate()
func (*StorageUsageCache) Delete ¶ added in v1.1.0
func (c *StorageUsageCache) Delete(usage UsageData)
func (*StorageUsageCache) GatherAccountSize ¶ added in v1.1.0
func (c *StorageUsageCache) GatherAccountSize(id uint64) (size uint64, exist bool)
func (*StorageUsageCache) GatherAllAccSize ¶ added in v1.1.0
func (c *StorageUsageCache) GatherAllAccSize() (usages map[uint64]uint64)
func (*StorageUsageCache) Get ¶ added in v1.1.0
func (c *StorageUsageCache) Get(usage UsageData) (ret UsageData, exist bool)
func (*StorageUsageCache) IsExpired ¶ added in v1.1.0
func (c *StorageUsageCache) IsExpired() bool
func (*StorageUsageCache) Iter ¶ added in v1.1.0
func (c *StorageUsageCache) Iter() btree.IterG[UsageData]
func (*StorageUsageCache) LessFunc ¶ added in v1.1.0
func (c *StorageUsageCache) LessFunc() func(a UsageData, b UsageData) bool
func (*StorageUsageCache) MemUsed ¶ added in v1.1.0
func (c *StorageUsageCache) MemUsed() float64
MemUsed returns the memory used in megabytes
func (*StorageUsageCache) SetOrReplace ¶ added in v1.1.1
func (c *StorageUsageCache) SetOrReplace(usage UsageData)
func (*StorageUsageCache) String ¶ added in v1.1.0
func (c *StorageUsageCache) String() string
type StorageUsageCacheOption ¶ added in v1.1.0
type StorageUsageCacheOption = func(c *StorageUsageCache)
func WithLazyThreshold ¶ added in v1.1.0
func WithLazyThreshold(lazy int) StorageUsageCacheOption
WithLazyThreshold sets lazyThreshold to lazy seconds
func WithLessFunc ¶ added in v1.1.0
func WithLessFunc(less func(a UsageData, b UsageData) bool) StorageUsageCacheOption
type TNUsageMemo ¶ added in v1.1.0
func NewTNUsageMemo ¶ added in v1.1.0
func NewTNUsageMemo() *TNUsageMemo
func (*TNUsageMemo) AddReqTrace ¶ added in v1.1.0
func (*TNUsageMemo) CacheLen ¶ added in v1.1.0
func (m *TNUsageMemo) CacheLen() int
func (*TNUsageMemo) Clear ¶ added in v1.1.0
func (m *TNUsageMemo) Clear()
func (*TNUsageMemo) ClearDroppedAccounts ¶ added in v1.1.0
func (m *TNUsageMemo) ClearDroppedAccounts(reserved map[uint64]struct{}) string
func (*TNUsageMemo) ClearNewAccCache ¶ added in v1.1.0
func (m *TNUsageMemo) ClearNewAccCache()
func (*TNUsageMemo) Delete ¶ added in v1.1.0
func (m *TNUsageMemo) Delete(usage UsageData)
func (*TNUsageMemo) DeltaUpdate ¶ added in v1.1.1
func (m *TNUsageMemo) DeltaUpdate(delta UsageData, del bool)
DeltaUpdate does setting or updating with delta size (delta.Size)
func (*TNUsageMemo) EnterProcessing ¶ added in v1.1.0
func (m *TNUsageMemo) EnterProcessing()
func (*TNUsageMemo) EstablishFromCKPs ¶ added in v1.1.0
func (m *TNUsageMemo) EstablishFromCKPs(c *catalog.Catalog)
EstablishFromCKPs replays usage info which stored in ckps into the tn cache
func (*TNUsageMemo) GatherAccountSize ¶ added in v1.1.0
func (m *TNUsageMemo) GatherAccountSize(id uint64) (size uint64, exist bool)
func (*TNUsageMemo) GatherAllAccSize ¶ added in v1.1.0
func (m *TNUsageMemo) GatherAllAccSize() (usages map[uint64]uint64)
func (*TNUsageMemo) GatherNewAccountSize ¶ added in v1.1.0
func (m *TNUsageMemo) GatherNewAccountSize(id uint64) (size uint64, exist bool)
func (*TNUsageMemo) Get ¶ added in v1.1.0
func (m *TNUsageMemo) Get(usage UsageData) (old UsageData, exist bool)
func (*TNUsageMemo) GetAllReqTrace ¶ added in v1.1.0
func (*TNUsageMemo) GetCache ¶ added in v1.1.0
func (m *TNUsageMemo) GetCache() *StorageUsageCache
func (*TNUsageMemo) GetDelayed ¶ added in v1.1.0
func (m *TNUsageMemo) GetDelayed() map[uint64]UsageData
func (*TNUsageMemo) GetNewAccCacheLatestUpdate ¶ added in v1.1.0
func (m *TNUsageMemo) GetNewAccCacheLatestUpdate() types.TS
func (*TNUsageMemo) HasUpdate ¶ added in v1.1.0
func (m *TNUsageMemo) HasUpdate() bool
func (*TNUsageMemo) LeaveProcessing ¶ added in v1.1.0
func (m *TNUsageMemo) LeaveProcessing()
func (*TNUsageMemo) MemoryUsed ¶ added in v1.1.0
func (m *TNUsageMemo) MemoryUsed() float64
func (*TNUsageMemo) PrepareReplay ¶ added in v1.1.0
func (m *TNUsageMemo) PrepareReplay(datas []*CheckpointData, vers []uint32)
func (*TNUsageMemo) Replace ¶ added in v1.1.1
func (m *TNUsageMemo) Replace(new UsageData)
Replace replaces the old usage with newUsage
func (*TNUsageMemo) UpdateNewAccCache ¶ added in v1.1.0
func (m *TNUsageMemo) UpdateNewAccCache(usage UsageData, del bool)
type TableLogtailRespBuilder ¶
type TableLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator
func NewTableLogtailRespBuilder ¶
func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder
func (*TableLogtailRespBuilder) BuildResp ¶
func (b *TableLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*TableLogtailRespBuilder) Close ¶
func (b *TableLogtailRespBuilder) Close()
func (*TableLogtailRespBuilder) VisitBlk ¶
func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error
VisitBlk = catalog.Processor.OnBlock
func (*TableLogtailRespBuilder) VisitObj ¶ added in v1.1.0
func (b *TableLogtailRespBuilder) VisitObj(e *catalog.ObjectEntry) error
type TableMeta ¶ added in v1.0.0
type TableMeta struct { common.ClosedInterval // contains filtered or unexported fields }
func NewTableMeta ¶ added in v1.0.0
func NewTableMeta() *TableMeta
type TableRespKind ¶ added in v0.8.0
type TableRespKind int
const ( TableRespKind_Data TableRespKind = iota TableRespKind_Blk TableRespKind_Obj )
type TempFilter ¶ added in v1.0.0
var TempF *TempFilter
func (*TempFilter) Add ¶ added in v1.0.0
func (f *TempFilter) Add(id uint64)
func (*TempFilter) Check ¶ added in v1.0.0
func (f *TempFilter) Check(id uint64) (skip bool)
type TxnLogtailRespBuilder ¶ added in v0.8.0
type TxnLogtailRespBuilder struct {
// contains filtered or unexported fields
}
func NewTxnLogtailRespBuilder ¶ added in v0.8.0
func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder
func (*TxnLogtailRespBuilder) BuildResp ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) BuildResp()
func (*TxnLogtailRespBuilder) Close ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) Close()
func (*TxnLogtailRespBuilder) CollectLogtail ¶ added in v0.8.0
func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())
type TxnTable ¶ added in v0.7.0
func NewTxnTable ¶ added in v0.7.0
func (*TxnTable) ForeachRowInBetween ¶ added in v0.7.0
func (*TxnTable) TruncateByTimeStamp ¶ added in v0.7.0
type UsageData ¶ added in v1.0.1
func MockUsageData ¶ added in v1.1.0
MockUsageData generates accCnt * dbCnt * tblCnt UsageDatas. the accIds, dbIds and tblIds are random produced. this func ensure that all ids are different.