logtail

package
v1.2.3-hotfix-20241016 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2024 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SnapshotTypeIdx types.Enum = iota
	SnapshotTypeCluster
	SnapshotTypeAccount
)
View Source
const (
	ColSnapshotId uint16 = iota
	ColSName
	ColTS
	ColLevel
	ColAccountName
	ColDatabaseName
	ColTableName
	ColObjId

	MaxColSnapshot
)

mo_snapshot's schema

View Source
const (
	UsageAccID uint8 = iota
	UsageDBID
	UsageTblID
	UsageObjID
	UsageSize

	UsageMAX
)
View Source
const (
	SnapshotAttr_TID                            = catalog.SnapshotAttr_TID
	SnapshotAttr_DBID                           = catalog.SnapshotAttr_DBID
	ObjectAttr_ID                               = catalog.ObjectAttr_ID
	ObjectAttr_CreateAt                         = catalog.ObjectAttr_CreateAt
	ObjectAttr_SegNode                          = catalog.ObjectAttr_SegNode
	SnapshotAttr_BlockMaxRow                    = catalog.SnapshotAttr_BlockMaxRow
	SnapshotAttr_ObjectMaxBlock                 = catalog.SnapshotAttr_ObjectMaxBlock
	ObjectAttr_ObjectStats                      = catalog.ObjectAttr_ObjectStats
	ObjectAttr_State                            = catalog.ObjectAttr_State
	ObjectAttr_Sorted                           = catalog.ObjectAttr_Sorted
	EntryNode_CreateAt                          = catalog.EntryNode_CreateAt
	EntryNode_DeleteAt                          = catalog.EntryNode_DeleteAt
	SnapshotMetaAttr_BlockInsertBatchStart      = "block_insert_batch_start"
	SnapshotMetaAttr_BlockInsertBatchEnd        = "block_insert_batch_end"
	SnapshotMetaAttr_BlockInsertBatchLocation   = "block_insert_batch_location"
	SnapshotMetaAttr_BlockDeleteBatchStart      = "block_delete_batch_start"
	SnapshotMetaAttr_BlockDeleteBatchEnd        = "block_delete_batch_end"
	SnapshotMetaAttr_BlockDeleteBatchLocation   = "block_delete_batch_location"
	SnapshotMetaAttr_BlockCNInsertBatchLocation = "block_cn_insert_batch_location"
	SnapshotMetaAttr_SegDeleteBatchStart        = "seg_delete_batch_start"
	SnapshotMetaAttr_SegDeleteBatchEnd          = "seg_delete_batch_end"
	SnapshotMetaAttr_SegDeleteBatchLocation     = "seg_delete_batch_location"
	CheckpointMetaAttr_BlockLocation            = "checkpoint_meta_block_location"
	CheckpointMetaAttr_SchemaType               = "checkpoint_meta_schema_type"

	CheckpointMetaAttr_StorageUsageInsLocation = "checkpoint_meta_storage_usage_ins_location"
	CheckpointMetaAttr_StorageUsageDelLocation = "checkpoint_meta_storage_usage_del_location"

	AccountIDDbNameTblName = catalog.AccountIDDbNameTblName
	AccountIDDbName        = catalog.AccountIDDbName

	// supporting `show accounts` in checkpoint
	CheckpointMetaAttr_ObjectSize = "checkpoint_meta_object_size"
	CheckpointMetaAttr_ObjectID   = "checkpoint_meta_object_id"

	SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra
)
View Source
const (
	CheckpointVersion1  uint32 = 1
	CheckpointVersion2  uint32 = 2
	CheckpointVersion3  uint32 = 3
	CheckpointVersion4  uint32 = 4
	CheckpointVersion5  uint32 = 5
	CheckpointVersion6  uint32 = 6
	CheckpointVersion7  uint32 = 7
	CheckpointVersion8  uint32 = 8
	CheckpointVersion9  uint32 = 9
	CheckpointVersion10 uint32 = 10
	CheckpointVersion11 uint32 = 11

	CheckpointCurrentVersion = CheckpointVersion11
)
View Source
const (
	MetaIDX uint16 = iota

	DBInsertIDX
	DBInsertTxnIDX
	DBDeleteIDX
	DBDeleteTxnIDX

	TBLInsertIDX
	TBLInsertTxnIDX
	TBLDeleteIDX
	TBLDeleteTxnIDX
	TBLColInsertIDX
	TBLColDeleteIDX

	SEGInsertIDX
	SEGInsertTxnIDX
	SEGDeleteIDX
	SEGDeleteTxnIDX

	BLKMetaInsertIDX
	BLKMetaInsertTxnIDX
	BLKMetaDeleteIDX
	BLKMetaDeleteTxnIDX

	BLKTNMetaInsertIDX
	BLKTNMetaInsertTxnIDX
	BLKTNMetaDeleteIDX
	BLKTNMetaDeleteTxnIDX

	BLKCNMetaInsertIDX

	TNMetaIDX

	StorageUsageInsIDX

	ObjectInfoIDX
	TNObjectInfoIDX

	StorageUsageDelIDX
)
View Source
const (
	Checkpoint_Meta_TID_IDX                 = 2
	Checkpoint_Meta_Insert_Block_LOC_IDX    = 3
	Checkpoint_Meta_CN_Delete_Block_LOC_IDX = 4
	Checkpoint_Meta_Delete_Block_LOC_IDX    = 5
	Checkpoint_Meta_Object_LOC_IDX          = 6
	Checkpoint_Meta_Usage_Ins_LOC_IDX       = 7
	Checkpoint_Meta_Usage_Del_LOC_IDX       = 8
)
View Source
const (
	Checkpoint_Meta_Insert_Block_Start_IDX = 3
	Checkpoint_Meta_Insert_Block_End_IDX   = 4
	Checkpoint_Meta_Delete_Block_Start_IDX = 5
	Checkpoint_Meta_Delete_Block_End_IDX   = 6
	Checkpoint_Meta_Object_Start_IDX       = 7
	Checkpoint_Meta_Object_End_IDX         = 8
)

for ver1-3

View Source
const (
	LocationOffset      = 0
	LocationLength      = objectio.LocationLen
	StartOffsetOffset   = LocationOffset + LocationLength
	StartOffsetLength   = 8
	EndOffsetOffset     = StartOffsetOffset + StartOffsetLength
	EndOffsetLength     = 8
	BlockLocationLength = EndOffsetOffset + EndOffsetLength
)

func (l BlockLocations) append iterator

Location is a fixed-length unmodifiable byte array. Layout: Location(objectio.Location) | StartOffset(uint64) | EndOffset(uint64)

View Source
const (
	BlockInsert = iota
	BlockDelete
	CNBlockInsert
	ObjectInfo
	StorageUsageIns
	StorageUsageDel
)
View Source
const DefaultCheckpointBlockRows = 10000
View Source
const DefaultCheckpointSize = 512 * 1024 * 1024
View Source
const (
	LogtailHeartbeatDuration = time.Millisecond * 2
)
View Source
const MaxIDX = StorageUsageDelIDX + 1
View Source
const MetaMaxIdx = StorageUsageDel + 1
View Source
const Size90M = 90 * 1024 * 1024
View Source
const StorageUsageMagic uint64 = 0x1A2B3C4D5E6F
View Source
const UsageBatMetaTableId uint64 = StorageUsageMagic

Variables

View Source
var (
	// for blk meta response
	BlkMetaSchema    *catalog.Schema // latest version
	BlkMetaSchema_V1 *catalog.Schema // previous version
	DelSchema        *catalog.Schema
	SegSchema        *catalog.Schema
	TxnNodeSchema    *catalog.Schema
	DBTNSchema       *catalog.Schema
	TblTNSchema      *catalog.Schema
	SegTNSchema      *catalog.Schema
	BlkTNSchema      *catalog.Schema
	MetaSchema_V1    *catalog.Schema
	MetaSchema       *catalog.Schema
	DBDelSchema      *catalog.Schema
	TblDelSchema     *catalog.Schema
	ColumnDelSchema  *catalog.Schema
	TNMetaSchema     *catalog.Schema
	ObjectInfoSchema *catalog.Schema

	DBSpecialDeleteSchema  *catalog.Schema
	TBLSpecialDeleteSchema *catalog.Schema

	StorageUsageSchema *catalog.Schema
)
View Source
var (
	DBSpecialDeleteAttr = []string{
		pkgcatalog.SystemDBAttr_ID,
		AccountIDDbName,
	}
	DBSpecialDeleteTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	TBLSpecialDeleteAttr = []string{
		pkgcatalog.SystemRelAttr_ID,
		AccountIDDbNameTblName,
	}
	TBLSpecialDeleteTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	ObjectSchemaAttr = []string{
		ObjectAttr_ID,
		ObjectAttr_CreateAt,
		ObjectAttr_SegNode,
	}
	ObjectSchemaTypes = []types.Type{
		types.New(types.T_uuid, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_blob, 0, 0),
	}
	TxnNodeSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
	}
	TxnNodeSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
	}
	DBTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	DBTNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	TblTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_BlockMaxRow,
		SnapshotAttr_ObjectMaxBlock,
		SnapshotAttr_SchemaExtra,
	}
	TblTNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint16, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	ObjectTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	ObjectTNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	BlockTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		pkgcatalog.BlockMeta_MetaLoc,
		pkgcatalog.BlockMeta_DeltaLoc,
	}
	BlockTNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
	}
	MetaSchemaAttr_V1 = []string{
		SnapshotAttr_TID,
		SnapshotMetaAttr_BlockInsertBatchStart,
		SnapshotMetaAttr_BlockInsertBatchEnd,
		SnapshotMetaAttr_BlockDeleteBatchStart,
		SnapshotMetaAttr_BlockDeleteBatchEnd,
		SnapshotMetaAttr_SegDeleteBatchStart,
		SnapshotMetaAttr_SegDeleteBatchEnd,
	}
	MetaShcemaTypes_V1 = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
	}
	MetaSchemaAttr = []string{
		SnapshotAttr_TID,
		SnapshotMetaAttr_BlockInsertBatchLocation,
		SnapshotMetaAttr_BlockCNInsertBatchLocation,
		SnapshotMetaAttr_BlockDeleteBatchLocation,
		SnapshotMetaAttr_SegDeleteBatchLocation,
		CheckpointMetaAttr_StorageUsageInsLocation,
		CheckpointMetaAttr_StorageUsageDelLocation,
	}

	MetaShcemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
	}
	DBDelSchemaAttr = []string{
		pkgcatalog.SystemDBAttr_ID,
	}
	DBDelSchemaTypes = []types.Type{
		types.T_uint64.ToType(),
	}
	TblDelSchemaAttr = []string{
		pkgcatalog.SystemRelAttr_ID,
	}
	TblDelSchemaTypes = []types.Type{
		types.T_uint64.ToType(),
	}
	ColumnDelSchemaAttr = []string{
		pkgcatalog.SystemColAttr_UniqName,
	}
	ColumnDelSchemaTypes = []types.Type{
		types.T_varchar.ToType(),
	}
	TNMetaSchemaAttr = []string{
		CheckpointMetaAttr_BlockLocation,
		CheckpointMetaAttr_SchemaType,
	}
	TNMetaShcemaTypes = []types.Type{
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_uint16, 0, 0),
	}

	BaseAttr = []string{
		catalog.AttrRowID,
		catalog.AttrCommitTs,
	}
	BaseTypes = []types.Type{
		types.T_Rowid.ToType(),
		types.T_TS.ToType(),
	}
	ObjectInfoAttr = []string{
		ObjectAttr_ObjectStats,
		ObjectAttr_State,
		ObjectAttr_Sorted,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		EntryNode_CreateAt,
		EntryNode_DeleteAt,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
	}
	ObjectInfoTypes = []types.Type{
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_bool, 0, 0),
		types.New(types.T_bool, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
	}

	StorageUsageSchemaAttrs = []string{
		pkgcatalog.SystemColAttr_AccID,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		CheckpointMetaAttr_ObjectID,
		CheckpointMetaAttr_ObjectSize,
	}

	StorageUsageSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uuid, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
)

Functions

func BackupCheckpointDataFactory added in v1.1.0

func BackupCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)

func BatchToString

func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func CloseSnapshotList added in v1.2.0

func CloseSnapshotList(snapshots map[uint32]containers.Vector)

func CorrectUsageWrongPlacement added in v1.1.0

func CorrectUsageWrongPlacement(c *catalog.Catalog) (int, float64, error)

func DataChangeToLogtailBatch added in v0.8.0

func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch

GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value

func DebugBatchToString

func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string

func EliminateErrorsOnCache added in v1.1.1

func EliminateErrorsOnCache(c *catalog.Catalog, end types.TS) int

func FillUsageBatOfGlobal added in v1.0.1

func FillUsageBatOfGlobal(collector *GlobalCollector)

func FillUsageBatOfIncremental added in v1.0.1

func FillUsageBatOfIncremental(collector *IncrementalCollector)

func GetMetaIdxesByVersion added in v1.0.0

func GetMetaIdxesByVersion(ver uint32) []uint16

func GetStorageUsageHistory added in v1.1.0

func GetStorageUsageHistory(
	ctx context.Context,
	locations []objectio.Location, versions []uint32,
	fs fileservice.FileService, mp *mpool.MPool) ([][]UsageData, [][]UsageData, error)

GetStorageUsageHistory is for debug to show these storage usage changes.

1. load each ckp meta batch.

2. load the specified storage usage ins/del batch using locations storing in meta batch.

func GlobalCheckpointDataFactory added in v0.7.0

func GlobalCheckpointDataFactory(
	end types.TS,
	versionInterval time.Duration,
) func(c *catalog.Catalog) (*CheckpointData, error)

func HandleSyncLogTailReq

func HandleSyncLogTailReq(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog,
	req api.SyncLogTailReq,
	canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)

func IDXString added in v1.1.0

func IDXString(idx uint16) string

func IncrementalCheckpointDataFactory

func IncrementalCheckpointDataFactory(start, end types.TS, collectUsage bool, skipLoadObjectStats bool) func(c *catalog.Catalog) (*CheckpointData, error)

func LoadBlkColumnsByMeta added in v0.8.0

func LoadBlkColumnsByMeta(
	version uint32,
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	id uint16,
	reader *blockio.BlockReader,
	mp *mpool.MPool,
) ([]*containers.Batch, error)

func LoadCNSubBlkColumnsByMeta added in v1.0.0

func LoadCNSubBlkColumnsByMeta(
	version uint32,
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	id uint16,
	reader *blockio.BlockReader,
	m *mpool.MPool,
) ([]*batch.Batch, error)

func LoadCNSubBlkColumnsByMetaWithId added in v1.0.0

func LoadCNSubBlkColumnsByMetaWithId(
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	dataType uint16,
	id uint16,
	version uint32,
	reader *blockio.BlockReader,
	m *mpool.MPool,
) (bat *batch.Batch, err error)

func LoadCheckpointEntries

func LoadCheckpointEntries(
	ctx context.Context,
	metLoc string,
	tableID uint64,
	tableName string,
	dbID uint64,
	dbName string,
	mp *mpool.MPool,
	fs fileservice.FileService) ([]*api.Entry, []func(), error)

func LoadCheckpointLocations added in v1.2.0

func LoadCheckpointLocations(
	ctx context.Context,
	location objectio.Location,
	version uint32,
	fs fileservice.FileService,
) (map[string]objectio.Location, error)

func MockCallback added in v0.8.0

func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error

func NewDirtyCollector

func NewDirtyCollector(
	sourcer *Manager,
	clock clock.Clock,
	catalog *catalog.Catalog,
	interceptor DirtyEntryInterceptor) *dirtyCollector

func PairAccountVsDB added in v1.1.0

func PairAccountVsDB(c *catalog.Catalog) map[uint64]uint64

func ReWriteCheckpointAndBlockFromKey added in v1.1.0

func ReWriteCheckpointAndBlockFromKey(
	ctx context.Context,
	fs, dstFs fileservice.FileService,
	loc, tnLocation objectio.Location,
	version uint32, ts types.TS,
	softDeletes map[string]bool,
) (objectio.Location, objectio.Location, []string, error)

func ToStringTemplate

func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string

Types

type BaseCollector added in v0.7.0

type BaseCollector struct {
	*catalog.LoopProcessor

	Objects []*catalog.ObjectEntry
	// for storage usage
	Usage struct {
		// db, tbl deletes
		Deletes        []interface{}
		ObjInserts     []*catalog.ObjectEntry
		ObjDeletes     []*catalog.ObjectEntry
		ReservedAccIds map[uint64]struct{}
	}

	UsageMemo *TNUsageMemo
	// contains filtered or unexported fields
}

func (*BaseCollector) Allocator added in v1.1.0

func (collector *BaseCollector) Allocator() *mpool.MPool

func (*BaseCollector) Close added in v0.7.0

func (collector *BaseCollector) Close()

func (*BaseCollector) LoadAndCollectObject added in v1.1.1

func (collector *BaseCollector) LoadAndCollectObject(c *catalog.Catalog, visitObject func(*catalog.ObjectEntry) error) error

func (*BaseCollector) OrphanData added in v0.7.0

func (collector *BaseCollector) OrphanData() *CheckpointData

func (*BaseCollector) VisitDB added in v0.7.0

func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error

func (*BaseCollector) VisitGlobalTombstone added in v1.2.2

func (collector *BaseCollector) VisitGlobalTombstone(entry data.Tombstone) (err error)

func (*BaseCollector) VisitObj added in v1.1.0

func (collector *BaseCollector) VisitObj(entry *catalog.ObjectEntry) (err error)

func (*BaseCollector) VisitObjForBackup added in v1.1.0

func (collector *BaseCollector) VisitObjForBackup(entry *catalog.ObjectEntry) (err error)

func (*BaseCollector) VisitTable added in v0.7.0

func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)

func (*BaseCollector) VisitTombstone added in v1.2.0

func (collector *BaseCollector) VisitTombstone(entry data.Tombstone) (err error)

type BlockLocation added in v1.0.0

type BlockLocation []byte

func BuildBlockLoaction added in v1.0.0

func BuildBlockLoaction(id uint16, start, end uint64) BlockLocation

func BuildBlockLoactionWithLocation added in v1.0.0

func BuildBlockLoactionWithLocation(name objectio.ObjectName, extent objectio.Extent, rows uint32, id uint16, start, end uint64) BlockLocation

func (BlockLocation) Contains added in v1.0.0

func (l BlockLocation) Contains(i common.ClosedInterval) bool

func (BlockLocation) GetEndOffset added in v1.0.0

func (l BlockLocation) GetEndOffset() uint64

func (BlockLocation) GetID added in v1.0.0

func (l BlockLocation) GetID() uint16

func (BlockLocation) GetLocation added in v1.0.0

func (l BlockLocation) GetLocation() objectio.Location

func (BlockLocation) GetStartOffset added in v1.0.0

func (l BlockLocation) GetStartOffset() uint64

func (BlockLocation) SetEndOffset added in v1.0.0

func (l BlockLocation) SetEndOffset(end uint64)

func (BlockLocation) SetID added in v1.0.0

func (l BlockLocation) SetID(id uint16)

func (BlockLocation) SetLocation added in v1.0.0

func (l BlockLocation) SetLocation(location objectio.Location)

func (BlockLocation) SetStartOffset added in v1.0.0

func (l BlockLocation) SetStartOffset(start uint64)

func (BlockLocation) String added in v1.0.0

func (l BlockLocation) String() string

type BlockLocations added in v1.0.0

type BlockLocations []byte

func NewEmptyBlockLocations added in v1.0.0

func NewEmptyBlockLocations() BlockLocations

func (*BlockLocations) Append added in v1.0.0

func (l *BlockLocations) Append(loc BlockLocation)

func (BlockLocations) MakeIterator added in v1.0.0

func (l BlockLocations) MakeIterator() *BlockLocationsIterator

func (BlockLocations) String added in v1.0.0

func (l BlockLocations) String() string

type BlockLocationsIterator added in v1.0.0

type BlockLocationsIterator struct {
	*BlockLocations
	// contains filtered or unexported fields
}

func (*BlockLocationsIterator) HasNext added in v1.0.0

func (i *BlockLocationsIterator) HasNext() bool

func (*BlockLocationsIterator) Next added in v1.0.0

type BlockT added in v0.7.0

type BlockT = *txnBlock

type BoundTableOperator

type BoundTableOperator struct {
	// contains filtered or unexported fields
}

BoundTableOperator holds a read only reader, knows how to iterate catalog entries.

func NewBoundTableOperator

func NewBoundTableOperator(catalog *catalog.Catalog,
	reader *Reader,
	scope Scope,
	dbID, tableID uint64,
	visitor catalog.Processor) *BoundTableOperator

func (*BoundTableOperator) Run

func (c *BoundTableOperator) Run() error

Run takes a RespBuilder to visit every table/Object/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry

type CNCheckpointData added in v0.8.0

type CNCheckpointData struct {
	// contains filtered or unexported fields
}

func NewCNCheckpointData added in v0.8.0

func NewCNCheckpointData() *CNCheckpointData

func (*CNCheckpointData) GetCloseCB added in v1.0.0

func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()

func (*CNCheckpointData) GetTableDataFromBats added in v1.0.0

func (data *CNCheckpointData) GetTableDataFromBats(tid uint64, bats []*batch.Batch) (ins, del, cnIns, objInfo *api.Batch, err error)

func (*CNCheckpointData) GetTableMeta added in v0.8.0

func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)

func (*CNCheckpointData) InitMetaIdx added in v1.0.0

func (data *CNCheckpointData) InitMetaIdx(
	ctx context.Context, version uint32, reader *blockio.BlockReader,
	location objectio.Location, m *mpool.MPool,
) error

func (*CNCheckpointData) PrefetchFrom added in v0.8.0

func (data *CNCheckpointData) PrefetchFrom(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location,
	tableID uint64) (err error)

func (*CNCheckpointData) PrefetchMetaFrom added in v1.0.0

func (data *CNCheckpointData) PrefetchMetaFrom(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	service fileservice.FileService,
	tableID uint64) (err error)

func (*CNCheckpointData) PrefetchMetaIdx added in v1.0.0

func (data *CNCheckpointData) PrefetchMetaIdx(
	ctx context.Context,
	version uint32,
	idxes []uint16,
	key objectio.Location,
	service fileservice.FileService,
) (err error)

func (*CNCheckpointData) ReadFromData added in v1.0.0

func (data *CNCheckpointData) ReadFromData(
	ctx context.Context,
	tableID uint64,
	location objectio.Location,
	reader *blockio.BlockReader,
	version uint32,
	m *mpool.MPool,
) (dataBats []*batch.Batch, err error)

type CatalogLogtailRespBuilder

type CatalogLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewCatalogLogtailRespBuilder

func NewCatalogLogtailRespBuilder(ctx context.Context, scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder

func (*CatalogLogtailRespBuilder) BuildResp

func (*CatalogLogtailRespBuilder) Close

func (b *CatalogLogtailRespBuilder) Close()

func (*CatalogLogtailRespBuilder) VisitDB

func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error

VisitDB = catalog.Processor.OnDatabase

func (*CatalogLogtailRespBuilder) VisitTbl

func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error

VisitTbl = catalog.Processor.OnTable

type CheckpointClient

type CheckpointClient interface {
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
}

type CheckpointData

type CheckpointData struct {
	// contains filtered or unexported fields
}

func LoadCheckpointEntriesFromKey added in v1.0.0

func LoadCheckpointEntriesFromKey(
	ctx context.Context,
	fs fileservice.FileService,
	location objectio.Location,
	version uint32,
	softDeletes *map[string]bool,
	baseTS *types.TS,
) ([]*objectio.BackupObject, *CheckpointData, error)

func LoadSpecifiedCkpBatch added in v1.0.1

func LoadSpecifiedCkpBatch(
	ctx context.Context,
	location objectio.Location,
	version uint32,
	batchIdx uint16,
	fs fileservice.FileService,
) (data *CheckpointData, err error)

LoadSpecifiedCkpBatch loads a specified checkpoint data batch

func NewCheckpointData

func NewCheckpointData(mp *mpool.MPool) *CheckpointData

func NewCheckpointDataWithVersion added in v1.1.0

func NewCheckpointDataWithVersion(ver uint32, mp *mpool.MPool) *CheckpointData

for test

func (*CheckpointData) Allocator added in v1.1.0

func (data *CheckpointData) Allocator() *mpool.MPool

func (*CheckpointData) ApplyReplayTo

func (data *CheckpointData) ApplyReplayTo(
	c *catalog.Catalog,
	dataFactory catalog.DataFactory,
) (err error)

func (*CheckpointData) Close

func (data *CheckpointData) Close()

func (*CheckpointData) CloseWhenLoadFromCache added in v1.0.0

func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)

func (*CheckpointData) ExportStats added in v1.1.0

func (data *CheckpointData) ExportStats(prefix string) []zap.Field

func (*CheckpointData) FormatData added in v1.1.0

func (data *CheckpointData) FormatData(mp *mpool.MPool) (err error)

func (*CheckpointData) GetBatches added in v1.0.0

func (data *CheckpointData) GetBatches() []*containers.Batch

func (*CheckpointData) GetBlkBatchs

func (*CheckpointData) GetDBBatchs

func (*CheckpointData) GetObjectBatchs added in v1.1.0

func (data *CheckpointData) GetObjectBatchs() *containers.Batch

func (*CheckpointData) GetTNBlkBatchs added in v1.0.0

func (data *CheckpointData) GetTNBlkBatchs() (
	*containers.Batch,
	*containers.Batch,
	*containers.Batch,
	*containers.Batch)

func (*CheckpointData) GetTNObjectBatchs added in v1.1.0

func (data *CheckpointData) GetTNObjectBatchs() *containers.Batch

func (*CheckpointData) GetTblBatchs

func (*CheckpointData) PrefetchFrom added in v0.8.0

func (data *CheckpointData) PrefetchFrom(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CheckpointData) PrefetchMeta added in v1.0.0

func (data *CheckpointData) PrefetchMeta(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CheckpointData) PrintData

func (data *CheckpointData) PrintData()

func (*CheckpointData) ReadFrom

func (data *CheckpointData) ReadFrom(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	reader *blockio.BlockReader,
	fs fileservice.FileService,
) (err error)

TODO: There need a global io pool

func (*CheckpointData) ReadTNMetaBatch added in v1.0.0

func (data *CheckpointData) ReadTNMetaBatch(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	reader *blockio.BlockReader,
) (err error)

func (*CheckpointData) UpdateBlkMeta

func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)

func (*CheckpointData) UpdateBlockDeleteBlkMeta added in v1.1.0

func (data *CheckpointData) UpdateBlockDeleteBlkMeta(tid uint64, insStart, insEnd int32)

func (*CheckpointData) UpdateBlockInsertBlkMeta added in v1.1.0

func (data *CheckpointData) UpdateBlockInsertBlkMeta(tid uint64, insStart, insEnd int32)

func (*CheckpointData) UpdateObjectInsertMeta added in v1.1.1

func (data *CheckpointData) UpdateObjectInsertMeta(tid uint64, delStart, delEnd int32)

func (*CheckpointData) UpdateSegMeta added in v0.8.0

func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)

func (*CheckpointData) WriteTo

func (data *CheckpointData) WriteTo(
	fs fileservice.FileService,
	blockRows int,
	checkpointSize int,
) (CNLocation, TNLocation objectio.Location, checkpointFiles []string, err error)

type CheckpointMeta

type CheckpointMeta struct {
	// contains filtered or unexported fields
}

func NewCheckpointMeta

func NewCheckpointMeta() *CheckpointMeta

func (*CheckpointMeta) DecodeFromString added in v1.0.0

func (m *CheckpointMeta) DecodeFromString(keys [][]byte) (err error)

func (*CheckpointMeta) String added in v1.0.0

func (m *CheckpointMeta) String() string

type Collector

type Collector interface {
	String() string
	Run(lag time.Duration)
	ScanInRange(from, to types.TS) (*DirtyTreeEntry, int)
	ScanInRangePruned(from, to types.TS) *DirtyTreeEntry
	IsCommitted(from, to types.TS) bool
	GetAndRefreshMerged() *DirtyTreeEntry
	Merge() *DirtyTreeEntry
	GetMaxLSN(from, to types.TS) uint64
	Init(maxts types.TS)
}

type DirtyEntryInterceptor

type DirtyEntryInterceptor = catalog.Processor

type DirtyTreeEntry

type DirtyTreeEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDirtyTreeEntry

func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry

func NewEmptyDirtyTreeEntry

func NewEmptyDirtyTreeEntry() *DirtyTreeEntry

func (*DirtyTreeEntry) GetTimeRange

func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)

func (*DirtyTreeEntry) GetTree

func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)

func (*DirtyTreeEntry) IsEmpty

func (entry *DirtyTreeEntry) IsEmpty() bool

func (*DirtyTreeEntry) Merge

func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)

func (*DirtyTreeEntry) String

func (entry *DirtyTreeEntry) String() string

type GlobalCollector added in v0.7.0

type GlobalCollector struct {
	*BaseCollector
	// contains filtered or unexported fields
}

func NewGlobalCollector added in v0.7.0

func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector

func (*GlobalCollector) VisitDB added in v0.7.0

func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error

func (*GlobalCollector) VisitObj added in v1.1.0

func (collector *GlobalCollector) VisitObj(entry *catalog.ObjectEntry) error

func (*GlobalCollector) VisitTable added in v0.7.0

func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error

func (*GlobalCollector) VisitTombstone added in v1.2.0

func (collector *GlobalCollector) VisitTombstone(entry data.Tombstone) error

type IncrementalCollector

type IncrementalCollector struct {
	*BaseCollector
}

func NewBackupCollector added in v1.1.0

func NewBackupCollector(start, end types.TS) *IncrementalCollector

func NewIncrementalCollector

func NewIncrementalCollector(start, end types.TS, skipLoadObjectStats bool) *IncrementalCollector

type Logtailer added in v0.7.0

type Logtailer interface {
	// RangeLogtail returns logtail for all tables within the range (from, to].
	// NOTE: caller should keep time range monotonous, or there would be a checkpoint.
	RangeLogtail(
		ctx context.Context, from, to timestamp.Timestamp,
	) ([]logtail.TableLogtail, []func(), error)

	RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

	// TableLogtail returns logtail for the specified table.
	//
	// NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value.
	TableLogtail(
		ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
	) (logtail.TableLogtail, func(), error)

	// Now is a time getter from TxnManager. Users of Logtailer should get a timestamp
	// from Now and use the timestamp to collect logtail, in that case, all txn prepared
	// before it are visible.
	Now() (timestamp.Timestamp, timestamp.Timestamp)
}

Logtailer provides logtail for the specified table.

type LogtailerImpl added in v0.7.0

type LogtailerImpl struct {
	// contains filtered or unexported fields
}

func NewLogtailer added in v0.7.0

func NewLogtailer(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog) *LogtailerImpl

func (*LogtailerImpl) Now added in v0.8.0

Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.

func (*LogtailerImpl) RangeLogtail added in v0.7.0

func (l *LogtailerImpl) RangeLogtail(
	ctx context.Context, from, to timestamp.Timestamp,
) ([]logtail.TableLogtail, []func(), error)

RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table

func (*LogtailerImpl) RegisterCallback added in v0.8.0

func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

func (*LogtailerImpl) TableLogtail added in v0.7.0

func (l *LogtailerImpl) TableLogtail(
	ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
) (logtail.TableLogtail, func(), error)

TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`

type Manager added in v0.7.0

type Manager struct {
	txnbase.NoopCommitListener
	// contains filtered or unexported fields
}

Logtail manager holds sorted txn handles. Its main jobs:

- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp

func NewManager added in v0.7.0

func NewManager(rt *dbutils.Runtime, blockSize int, nowClock func() types.TS) *Manager

func (*Manager) GCByTS added in v0.7.0

func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)

func (*Manager) GetReader added in v0.7.0

func (mgr *Manager) GetReader(from, to types.TS) *Reader

GetReader get a snapshot of all txn prepared between from and to.

func (*Manager) GetTableOperator added in v0.7.0

func (mgr *Manager) GetTableOperator(
	from, to types.TS,
	catalog *catalog.Catalog,
	dbID, tableID uint64,
	scope Scope,
	visitor catalog.Processor,
) *BoundTableOperator

func (*Manager) OnEndPrePrepare added in v0.7.0

func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)

OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager

func (*Manager) OnEndPrepareWAL added in v0.8.0

func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)

func (*Manager) RegisterCallback added in v0.8.0

func (mgr *Manager) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) error

func (*Manager) Start added in v0.8.0

func (mgr *Manager) Start()

func (*Manager) Stop added in v0.8.0

func (mgr *Manager) Stop()

func (*Manager) TryCompactTable added in v1.0.0

func (mgr *Manager) TryCompactTable()

type ObjectAbstract added in v1.2.3

type ObjectAbstract struct {
	TotalObjCnt  int
	TotalObjSize int
	TotalBlkCnt  int
	TotalRowCnt  int
}

type Reader added in v0.7.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a snapshot of all txn prepared between from and to. Dirty tables/objects/blocks can be queried based on those txn

func (*Reader) GetDirty added in v0.7.0

func (r *Reader) GetDirty() (tree *model.Tree, count int)

Merge all dirty table/object/block into one dirty tree

func (*Reader) GetDirtyByTable added in v0.7.0

func (r *Reader) GetDirtyByTable(
	dbID, id uint64,
) (tree *model.TableTree)

Merge all dirty table/object/block of **a table** into one tree

func (*Reader) GetMaxLSN added in v0.7.0

func (r *Reader) GetMaxLSN() (maxLsn uint64)

TODO: optimize

func (*Reader) HasCatalogChanges added in v0.7.0

func (r *Reader) HasCatalogChanges() bool

HasCatalogChanges returns true if any txn in the reader modified the Catalog

func (*Reader) IsCommitted added in v0.8.0

func (r *Reader) IsCommitted() bool

type RespBuilder

type RespBuilder interface {
	catalog.Processor
	BuildResp() (api.SyncLogTailResp, error)
	Close()
}

type RowT added in v0.7.0

type RowT = *txnRow

type Scope

type Scope = int
const (
	// changes for mo_databases
	ScopeDatabases Scope = iota + 1
	// changes for mo_tables
	ScopeTables
	// changes for mo_columns
	ScopeColumns
	// changes for user tables
	ScopeUserTables
)

func DecideTableScope added in v0.7.0

func DecideTableScope(tableID uint64) Scope

type SnapshotMeta added in v1.2.0

type SnapshotMeta struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSnapshotMeta added in v1.2.0

func NewSnapshotMeta() *SnapshotMeta

func (*SnapshotMeta) CopyObjectsLocked added in v1.2.2

func (sm *SnapshotMeta) CopyObjectsLocked() map[uint64]map[objectio.Segmentid]*objectInfo

func (*SnapshotMeta) CopyTablesLocked added in v1.2.2

func (sm *SnapshotMeta) CopyTablesLocked() map[uint32]map[uint64]*TableInfo

func (*SnapshotMeta) GetSnapshot added in v1.2.0

func (*SnapshotMeta) GetSnapshotList added in v1.2.0

func (sm *SnapshotMeta) GetSnapshotList(SnapshotList map[uint32][]types.TS, tid uint64) []types.TS

func (*SnapshotMeta) InitTableInfo added in v1.2.0

func (sm *SnapshotMeta) InitTableInfo(data *CheckpointData)

func (*SnapshotMeta) MergeTableInfo added in v1.2.0

func (sm *SnapshotMeta) MergeTableInfo(SnapshotList map[uint32][]types.TS) error

func (*SnapshotMeta) ReadMeta added in v1.2.0

func (sm *SnapshotMeta) ReadMeta(ctx context.Context, name string, fs fileservice.FileService) error

func (*SnapshotMeta) ReadTableInfo added in v1.2.0

func (sm *SnapshotMeta) ReadTableInfo(ctx context.Context, name string, fs fileservice.FileService) error

func (*SnapshotMeta) Rebuild added in v1.2.0

func (sm *SnapshotMeta) Rebuild(ins *containers.Batch)

func (*SnapshotMeta) RebuildDelta added in v1.2.0

func (sm *SnapshotMeta) RebuildDelta(ins *containers.Batch)

func (*SnapshotMeta) RebuildTableInfo added in v1.2.0

func (sm *SnapshotMeta) RebuildTableInfo(ins *containers.Batch)

func (*SnapshotMeta) RebuildTid added in v1.2.1

func (sm *SnapshotMeta) RebuildTid(ins *containers.Batch)

func (*SnapshotMeta) SaveMeta added in v1.2.0

func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) (uint32, error)

func (*SnapshotMeta) SaveTableInfo added in v1.2.0

func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) (uint32, error)

func (*SnapshotMeta) SetTid added in v1.2.0

func (sm *SnapshotMeta) SetTid(tid uint64)

func (*SnapshotMeta) String added in v1.2.0

func (sm *SnapshotMeta) String() string

func (*SnapshotMeta) TableInfoString added in v1.2.0

func (sm *SnapshotMeta) TableInfoString() string

func (*SnapshotMeta) Update added in v1.2.0

func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta

type StorageUsageCache added in v1.1.0

type StorageUsageCache struct {
	// when two requests happens within [lastUpdate, lastUpdate + lazyThreshold],
	// it will reuse the cached result, no new query to TN.
	sync.Mutex
	// contains filtered or unexported fields
}

func NewStorageUsageCache added in v1.1.0

func NewStorageUsageCache(opts ...StorageUsageCacheOption) *StorageUsageCache

func (*StorageUsageCache) CacheLen added in v1.1.0

func (c *StorageUsageCache) CacheLen() int

func (*StorageUsageCache) ClearForUpdate added in v1.1.0

func (c *StorageUsageCache) ClearForUpdate()

func (*StorageUsageCache) Delete added in v1.1.0

func (c *StorageUsageCache) Delete(usage UsageData)

func (*StorageUsageCache) GatherAccountSize added in v1.1.0

func (c *StorageUsageCache) GatherAccountSize(id uint64) (size uint64, exist bool)

func (*StorageUsageCache) GatherAllAccSize added in v1.1.0

func (c *StorageUsageCache) GatherAllAccSize() (usages map[uint64]uint64)

func (*StorageUsageCache) GatherObjectAbstractForAccounts added in v1.2.3

func (c *StorageUsageCache) GatherObjectAbstractForAccounts() (abstract map[uint64]ObjectAbstract)

func (*StorageUsageCache) Get added in v1.1.0

func (c *StorageUsageCache) Get(usage UsageData) (ret UsageData, exist bool)

func (*StorageUsageCache) IsExpired added in v1.1.0

func (c *StorageUsageCache) IsExpired() bool

func (*StorageUsageCache) Iter added in v1.1.0

func (*StorageUsageCache) LessFunc added in v1.1.0

func (c *StorageUsageCache) LessFunc() func(a UsageData, b UsageData) bool

func (*StorageUsageCache) MemUsed added in v1.1.0

func (c *StorageUsageCache) MemUsed() float64

MemUsed returns the memory used in megabytes

func (*StorageUsageCache) SetOrReplace added in v1.1.1

func (c *StorageUsageCache) SetOrReplace(usage UsageData)

func (*StorageUsageCache) String added in v1.1.0

func (c *StorageUsageCache) String() string

type StorageUsageCacheOption added in v1.1.0

type StorageUsageCacheOption = func(c *StorageUsageCache)

func WithLazyThreshold added in v1.1.0

func WithLazyThreshold(lazy int) StorageUsageCacheOption

WithLazyThreshold sets lazyThreshold to lazy seconds

func WithLessFunc added in v1.1.0

func WithLessFunc(less func(a UsageData, b UsageData) bool) StorageUsageCacheOption

type TNUsageMemo added in v1.1.0

type TNUsageMemo struct {
	sync.Mutex

	C *catalog.Catalog
	// contains filtered or unexported fields
}

func NewTNUsageMemo added in v1.1.0

func NewTNUsageMemo(c *catalog.Catalog) *TNUsageMemo

func (*TNUsageMemo) AddReqTrace added in v1.1.0

func (m *TNUsageMemo) AddReqTrace(accountId uint64, tSize uint64, t time.Time, hint string)

func (*TNUsageMemo) CacheLen added in v1.1.0

func (m *TNUsageMemo) CacheLen() int

func (*TNUsageMemo) Clear added in v1.1.0

func (m *TNUsageMemo) Clear()

func (*TNUsageMemo) ClearDroppedAccounts added in v1.1.0

func (m *TNUsageMemo) ClearDroppedAccounts(reserved map[uint64]struct{}) string

func (*TNUsageMemo) ClearNewAccCache added in v1.1.0

func (m *TNUsageMemo) ClearNewAccCache()

func (*TNUsageMemo) Delete added in v1.1.0

func (m *TNUsageMemo) Delete(usage UsageData)

func (*TNUsageMemo) DeltaUpdate added in v1.1.1

func (m *TNUsageMemo) DeltaUpdate(delta UsageData, del bool)

DeltaUpdate does setting or updating with delta size (delta.Size)

func (*TNUsageMemo) EnterProcessing added in v1.1.0

func (m *TNUsageMemo) EnterProcessing()

func (*TNUsageMemo) EstablishFromCKPs added in v1.1.0

func (m *TNUsageMemo) EstablishFromCKPs(c *catalog.Catalog)

EstablishFromCKPs replays usage info which stored in ckps into the tn cache

func (*TNUsageMemo) GatherAccountSize added in v1.1.0

func (m *TNUsageMemo) GatherAccountSize(id uint64) (size uint64, exist bool)

func (*TNUsageMemo) GatherAllAccSize added in v1.1.0

func (m *TNUsageMemo) GatherAllAccSize() (usages map[uint64]uint64)

func (*TNUsageMemo) GatherNewAccountSize added in v1.1.0

func (m *TNUsageMemo) GatherNewAccountSize(id uint64) (size uint64, exist bool)

func (*TNUsageMemo) GatherObjectAbstractForAllAccount added in v1.2.3

func (m *TNUsageMemo) GatherObjectAbstractForAllAccount() map[uint64]ObjectAbstract

func (*TNUsageMemo) GatherSpecialTableSize added in v1.2.1

func (m *TNUsageMemo) GatherSpecialTableSize() (size uint64)

func (*TNUsageMemo) Get added in v1.1.0

func (m *TNUsageMemo) Get(usage UsageData) (old UsageData, exist bool)

func (*TNUsageMemo) GetAllReqTrace added in v1.1.0

func (m *TNUsageMemo) GetAllReqTrace() (accountIds []uint64, timestamps []time.Time, sizes []uint64, hints []string)

func (*TNUsageMemo) GetCache added in v1.1.0

func (m *TNUsageMemo) GetCache() *StorageUsageCache

func (*TNUsageMemo) GetDelayed added in v1.1.0

func (m *TNUsageMemo) GetDelayed() map[uint64]UsageData

func (*TNUsageMemo) GetNewAccCacheLatestUpdate added in v1.1.0

func (m *TNUsageMemo) GetNewAccCacheLatestUpdate() types.TS

func (*TNUsageMemo) HasUpdate added in v1.1.0

func (m *TNUsageMemo) HasUpdate() bool

func (*TNUsageMemo) LeaveProcessing added in v1.1.0

func (m *TNUsageMemo) LeaveProcessing()

func (*TNUsageMemo) MemoryUsed added in v1.1.0

func (m *TNUsageMemo) MemoryUsed() float64

func (*TNUsageMemo) PrepareReplay added in v1.1.0

func (m *TNUsageMemo) PrepareReplay(datas []*CheckpointData, vers []uint32)

func (*TNUsageMemo) Replace added in v1.1.1

func (m *TNUsageMemo) Replace(new UsageData)

Replace replaces the old usage with newUsage

func (*TNUsageMemo) UpdateNewAccCache added in v1.1.0

func (m *TNUsageMemo) UpdateNewAccCache(usage UsageData, del bool)

type TableInfo added in v1.2.0

type TableInfo struct {
	// contains filtered or unexported fields
}

type TableLogtailRespBuilder

type TableLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewTableLogtailRespBuilder

func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder

func (*TableLogtailRespBuilder) BuildResp

func (*TableLogtailRespBuilder) Close

func (b *TableLogtailRespBuilder) Close()

func (*TableLogtailRespBuilder) VisitObj added in v1.1.0

type TableMeta added in v1.0.0

type TableMeta struct {
	common.ClosedInterval
	// contains filtered or unexported fields
}

func NewTableMeta added in v1.0.0

func NewTableMeta() *TableMeta

func (*TableMeta) String added in v1.0.0

func (m *TableMeta) String() string

type TableRespKind added in v0.8.0

type TableRespKind int
const (
	TableRespKind_Data TableRespKind = iota
	TableRespKind_Blk
	TableRespKind_Obj
)

type TempFKey added in v1.0.0

type TempFKey struct{}

type TempFilter added in v1.0.0

type TempFilter struct {
	sync.RWMutex
	// contains filtered or unexported fields
}
var TempF *TempFilter

func (*TempFilter) Add added in v1.0.0

func (f *TempFilter) Add(id uint64)

func (*TempFilter) Check added in v1.0.0

func (f *TempFilter) Check(id uint64) (skip bool)

type TxnLogtailRespBuilder added in v0.8.0

type TxnLogtailRespBuilder struct {
	// contains filtered or unexported fields
}

func NewTxnLogtailRespBuilder added in v0.8.0

func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder

func (*TxnLogtailRespBuilder) BuildResp added in v0.8.0

func (b *TxnLogtailRespBuilder) BuildResp()

func (*TxnLogtailRespBuilder) Close added in v0.8.0

func (b *TxnLogtailRespBuilder) Close()

func (*TxnLogtailRespBuilder) CollectLogtail added in v0.8.0

func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())

type TxnTable added in v0.7.0

type TxnTable struct {
	*model.AOT[BlockT, RowT]
}

func NewTxnTable added in v0.7.0

func NewTxnTable(blockSize int, nowClock func() types.TS) *TxnTable

func (*TxnTable) AddTxn added in v0.7.0

func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)

func (*TxnTable) ForeachRowInBetween added in v0.7.0

func (table *TxnTable) ForeachRowInBetween(
	from, to types.TS,
	skipBlkOp func(blk BlockT) bool,
	rowOp func(row RowT) (goNext bool),
) (readRows int)

func (*TxnTable) TruncateByTimeStamp added in v0.7.0

func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)

func (*TxnTable) TryCompact added in v1.0.0

func (table *TxnTable) TryCompact(from types.TS, rt *dbutils.Runtime) (to types.TS)

type UsageData added in v1.0.1

type UsageData struct {
	AccId uint64
	DbId  uint64
	TblId uint64
	Size  uint64

	// this will not persist
	// only global ckp will update
	ObjectAbstract
	// contains filtered or unexported fields
}

func MockUsageData added in v1.1.0

func MockUsageData(accCnt, dbCnt, tblCnt int, allocator *atomic.Uint64) (result []UsageData)

MockUsageData generates accCnt * dbCnt * tblCnt UsageDatas. the accIds, dbIds and tblIds are random produced. this func ensure that all ids are different.

func Objects2Usages added in v1.2.3

func Objects2Usages(objs []*catalog.ObjectEntry, isGlobal bool) (usages []UsageData)

func (UsageData) IsZero added in v1.1.0

func (u UsageData) IsZero() bool

func (*UsageData) Merge added in v1.2.3

func (u *UsageData) Merge(other UsageData, delete bool)

func (UsageData) String added in v1.1.0

func (u UsageData) String() string

Directories

Path Synopsis
This package implements client and server for logtail push model.
This package implements client and server for logtail push model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL