logtail

package
v1.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SnapshotAttr_TID                            = catalog.SnapshotAttr_TID
	SnapshotAttr_DBID                           = catalog.SnapshotAttr_DBID
	SegmentAttr_ID                              = catalog.SegmentAttr_ID
	SegmentAttr_CreateAt                        = catalog.SegmentAttr_CreateAt
	SegmentAttr_SegNode                         = catalog.SegmentAttr_SegNode
	SnapshotAttr_BlockMaxRow                    = catalog.SnapshotAttr_BlockMaxRow
	SnapshotAttr_SegmentMaxBlock                = catalog.SnapshotAttr_SegmentMaxBlock
	SnapshotMetaAttr_BlockInsertBatchStart      = "block_insert_batch_start"
	SnapshotMetaAttr_BlockInsertBatchEnd        = "block_insert_batch_end"
	SnapshotMetaAttr_BlockInsertBatchLocation   = "block_insert_batch_location"
	SnapshotMetaAttr_BlockDeleteBatchStart      = "block_delete_batch_start"
	SnapshotMetaAttr_BlockDeleteBatchEnd        = "block_delete_batch_end"
	SnapshotMetaAttr_BlockDeleteBatchLocation   = "block_delete_batch_location"
	SnapshotMetaAttr_BlockCNInsertBatchLocation = "block_cn_insert_batch_location"
	SnapshotMetaAttr_SegDeleteBatchStart        = "seg_delete_batch_start"
	SnapshotMetaAttr_SegDeleteBatchEnd          = "seg_delete_batch_end"
	SnapshotMetaAttr_SegDeleteBatchLocation     = "seg_delete_batch_location"
	CheckpointMetaAttr_BlockLocation            = "checkpoint_meta_block_location"
	CheckpointMetaAttr_SchemaType               = "checkpoint_meta_schema_type"

	AccountIDDbNameTblName = catalog.AccountIDDbNameTblName
	AccountIDDbName        = catalog.AccountIDDbName

	SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra
)
View Source
const (
	CheckpointVersion1 uint32 = 1
	CheckpointVersion2 uint32 = 2
	CheckpointVersion3 uint32 = 3
	CheckpointVersion4 uint32 = 4
	CheckpointVersion5 uint32 = 5
	CheckpointVersion6 uint32 = 6
	CheckpointVersion7 uint32 = 7
	CheckpointVersion8 uint32 = 8

	CheckpointCurrentVersion = CheckpointVersion8
)
View Source
const (
	MetaIDX uint16 = iota

	DBInsertIDX
	DBInsertTxnIDX
	DBDeleteIDX
	DBDeleteTxnIDX

	TBLInsertIDX
	TBLInsertTxnIDX
	TBLDeleteIDX
	TBLDeleteTxnIDX
	TBLColInsertIDX
	TBLColDeleteIDX

	SEGInsertIDX
	SEGInsertTxnIDX
	SEGDeleteIDX
	SEGDeleteTxnIDX

	BLKMetaInsertIDX
	BLKMetaInsertTxnIDX
	BLKMetaDeleteIDX
	BLKMetaDeleteTxnIDX

	BLKTNMetaInsertIDX
	BLKTNMetaInsertTxnIDX
	BLKTNMetaDeleteIDX
	BLKTNMetaDeleteTxnIDX

	BLKCNMetaInsertIDX

	TNMetaIDX
)
View Source
const (
	Checkpoint_Meta_TID_IDX                 = 2
	Checkpoint_Meta_Insert_Block_LOC_IDX    = 3
	Checkpoint_Meta_CN_Delete_Block_LOC_IDX = 4
	Checkpoint_Meta_Delete_Block_LOC_IDX    = 5
	Checkpoint_Meta_Segment_LOC_IDX         = 6
)
View Source
const (
	Checkpoint_Meta_Insert_Block_Start_IDX = 3
	Checkpoint_Meta_Insert_Block_End_IDX   = 4
	Checkpoint_Meta_Delete_Block_Start_IDX = 5
	Checkpoint_Meta_Delete_Block_End_IDX   = 6
	Checkpoint_Meta_Segment_Start_IDX      = 7
	Checkpoint_Meta_Segment_End_IDX        = 8
)

for ver1-3

View Source
const (
	LocationOffset      = 0
	LocationLength      = objectio.LocationLen
	StartOffsetOffset   = LocationOffset + LocationLength
	StartOffsetLength   = 8
	EndOffsetOffset     = StartOffsetOffset + StartOffsetLength
	EndOffsetLength     = 8
	BlockLocationLength = EndOffsetOffset + EndOffsetLength
)

func (l BlockLocations) append iterator

Location is a fixed-length unmodifiable byte array. Layout: Location(objectio.Location) | StartOffset(uint64) | EndOffset(uint64)

View Source
const (
	BlockInsert = iota
	BlockDelete
	CNBlockInsert
	SegmentDelete
)
View Source
const DefaultCheckpointBlockRows = 10000
View Source
const DefaultCheckpointSize = 1024 * 1024 * 1024
View Source
const (
	LogtailHeartbeatDuration = time.Millisecond * 2
)
View Source
const MaxIDX = TNMetaIDX + 1
View Source
const MetaMaxIdx = SegmentDelete + 1
View Source
const Size90M = 90 * 1024 * 1024

Variables

View Source
var (
	// for blk meta response
	BlkMetaSchema    *catalog.Schema // latest version
	BlkMetaSchema_V1 *catalog.Schema // previous version
	DelSchema        *catalog.Schema
	SegSchema        *catalog.Schema
	TxnNodeSchema    *catalog.Schema
	DBTNSchema       *catalog.Schema
	TblTNSchema      *catalog.Schema
	SegTNSchema      *catalog.Schema
	BlkTNSchema      *catalog.Schema
	MetaSchema_V1    *catalog.Schema
	MetaSchema       *catalog.Schema
	DBDelSchema      *catalog.Schema
	TblDelSchema     *catalog.Schema
	ColumnDelSchema  *catalog.Schema
	TNMetaSchema     *catalog.Schema

	DBSpecialDeleteSchema  *catalog.Schema
	TBLSpecialDeleteSchema *catalog.Schema
)
View Source
var (
	DBSpecialDeleteAttr = []string{
		pkgcatalog.SystemDBAttr_ID,
		AccountIDDbName,
	}
	DBSpecialDeleteTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	TBLSpecialDeleteAttr = []string{
		pkgcatalog.SystemRelAttr_ID,
		AccountIDDbNameTblName,
	}
	TBLSpecialDeleteTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	SegmentSchemaAttr = []string{
		SegmentAttr_ID,
		SegmentAttr_CreateAt,
		SegmentAttr_SegNode,
	}
	SegmentSchemaTypes = []types.Type{
		types.New(types.T_uuid, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_blob, 0, 0),
	}
	TxnNodeSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
	}
	TxnNodeSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
	}
	DBTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	DBTNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	TblTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_BlockMaxRow,
		SnapshotAttr_SegmentMaxBlock,
		SnapshotAttr_SchemaExtra,
	}
	TblTNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint16, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	SegmentTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	SegmentTNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	BlockTNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		pkgcatalog.BlockMeta_MetaLoc,
		pkgcatalog.BlockMeta_DeltaLoc,
	}
	BlockTNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
	}
	MetaSchemaAttr_V1 = []string{
		SnapshotAttr_TID,
		SnapshotMetaAttr_BlockInsertBatchStart,
		SnapshotMetaAttr_BlockInsertBatchEnd,
		SnapshotMetaAttr_BlockDeleteBatchStart,
		SnapshotMetaAttr_BlockDeleteBatchEnd,
		SnapshotMetaAttr_SegDeleteBatchStart,
		SnapshotMetaAttr_SegDeleteBatchEnd,
	}
	MetaShcemaTypes_V1 = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
	}
	MetaSchemaAttr = []string{
		SnapshotAttr_TID,
		SnapshotMetaAttr_BlockInsertBatchLocation,
		SnapshotMetaAttr_BlockCNInsertBatchLocation,
		SnapshotMetaAttr_BlockDeleteBatchLocation,
		SnapshotMetaAttr_SegDeleteBatchLocation,
	}
	MetaShcemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
	}
	DBDelSchemaAttr = []string{
		pkgcatalog.SystemDBAttr_ID,
	}
	DBDelSchemaTypes = []types.Type{
		types.T_uint64.ToType(),
	}
	TblDelSchemaAttr = []string{
		pkgcatalog.SystemRelAttr_ID,
	}
	TblDelSchemaTypes = []types.Type{
		types.T_uint64.ToType(),
	}
	ColumnDelSchemaAttr = []string{
		pkgcatalog.SystemColAttr_UniqName,
	}
	ColumnDelSchemaTypes = []types.Type{
		types.T_varchar.ToType(),
	}
	TNMetaSchemaAttr = []string{
		CheckpointMetaAttr_BlockLocation,
		CheckpointMetaAttr_SchemaType,
	}
	TNMetaShcemaTypes = []types.Type{
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_uint16, 0, 0),
	}

	BaseAttr = []string{
		catalog.AttrRowID,
		catalog.AttrCommitTs,
	}
	BaseTypes = []types.Type{
		types.T_Rowid.ToType(),
		types.T_TS.ToType(),
	}
)

Functions

func BatchToString

func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func DataChangeToLogtailBatch added in v0.8.0

func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch

GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value

func DebugBatchToString

func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string

func GetMetaIdxesByVersion added in v1.0.0

func GetMetaIdxesByVersion(ver uint32) []uint16

func GlobalCheckpointDataFactory added in v0.7.0

func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)

func HandleSyncLogTailReq

func HandleSyncLogTailReq(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog,
	req api.SyncLogTailReq,
	canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)

func IncrementalCheckpointDataFactory

func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)

func LoadBlkColumnsByMeta added in v0.8.0

func LoadBlkColumnsByMeta(
	version uint32,
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	id uint16,
	reader *blockio.BlockReader,
) ([]*containers.Batch, error)

func LoadCNSubBlkColumnsByMeta added in v1.0.0

func LoadCNSubBlkColumnsByMeta(
	version uint32,
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	id uint16,
	reader *blockio.BlockReader,
	m *mpool.MPool,
) ([]*batch.Batch, error)

func LoadCNSubBlkColumnsByMetaWithId added in v1.0.0

func LoadCNSubBlkColumnsByMetaWithId(
	cxt context.Context,
	colTypes []types.Type,
	colNames []string,
	dataType uint16,
	id uint16,
	version uint32,
	reader *blockio.BlockReader,
	m *mpool.MPool,
) (ioResult *batch.Batch, err error)

func LoadCheckpointEntries

func LoadCheckpointEntries(
	ctx context.Context,
	metLoc string,
	tableID uint64,
	tableName string,
	dbID uint64,
	dbName string,
	mp *mpool.MPool,
	fs fileservice.FileService) ([]*api.Entry, []func(), error)

func MockCallback added in v0.8.0

func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error

func NewDirtyCollector

func NewDirtyCollector(
	sourcer *Manager,
	clock clock.Clock,
	catalog *catalog.Catalog,
	interceptor DirtyEntryInterceptor) *dirtyCollector

func ToStringTemplate

func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string

Types

type BaseCollector added in v0.7.0

type BaseCollector struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func (*BaseCollector) Close added in v0.7.0

func (collector *BaseCollector) Close()

func (*BaseCollector) OrphanData added in v0.7.0

func (collector *BaseCollector) OrphanData() *CheckpointData

func (*BaseCollector) VisitBlk added in v0.7.0

func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)

func (*BaseCollector) VisitDB added in v0.7.0

func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error

func (*BaseCollector) VisitSeg added in v0.7.0

func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)

func (*BaseCollector) VisitTable added in v0.7.0

func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)

type BlockLocation added in v1.0.0

type BlockLocation []byte

func BuildBlockLoaction added in v1.0.0

func BuildBlockLoaction(id uint16, start, end uint64) BlockLocation

func BuildBlockLoactionWithLocation added in v1.0.0

func BuildBlockLoactionWithLocation(name objectio.ObjectName, extent objectio.Extent, rows uint32, id uint16, start, end uint64) BlockLocation

func (BlockLocation) Contains added in v1.0.0

func (l BlockLocation) Contains(i common.ClosedInterval) bool

func (BlockLocation) GetEndOffset added in v1.0.0

func (l BlockLocation) GetEndOffset() uint64

func (BlockLocation) GetID added in v1.0.0

func (l BlockLocation) GetID() uint16

func (BlockLocation) GetLocation added in v1.0.0

func (l BlockLocation) GetLocation() objectio.Location

func (BlockLocation) GetStartOffset added in v1.0.0

func (l BlockLocation) GetStartOffset() uint64

func (BlockLocation) SetEndOffset added in v1.0.0

func (l BlockLocation) SetEndOffset(end uint64)

func (BlockLocation) SetID added in v1.0.0

func (l BlockLocation) SetID(id uint16)

func (BlockLocation) SetLocation added in v1.0.0

func (l BlockLocation) SetLocation(location objectio.Location)

func (BlockLocation) SetStartOffset added in v1.0.0

func (l BlockLocation) SetStartOffset(start uint64)

func (BlockLocation) String added in v1.0.0

func (l BlockLocation) String() string

type BlockLocations added in v1.0.0

type BlockLocations []byte

func NewEmptyBlockLocations added in v1.0.0

func NewEmptyBlockLocations() BlockLocations

func (*BlockLocations) Append added in v1.0.0

func (l *BlockLocations) Append(loc BlockLocation)

func (BlockLocations) MakeIterator added in v1.0.0

func (l BlockLocations) MakeIterator() *BlockLocationsIterator

func (BlockLocations) String added in v1.0.0

func (l BlockLocations) String() string

type BlockLocationsIterator added in v1.0.0

type BlockLocationsIterator struct {
	*BlockLocations
	// contains filtered or unexported fields
}

func (*BlockLocationsIterator) HasNext added in v1.0.0

func (i *BlockLocationsIterator) HasNext() bool

func (*BlockLocationsIterator) Next added in v1.0.0

type BlockT added in v0.7.0

type BlockT = *txnBlock

type BoundTableOperator

type BoundTableOperator struct {
	// contains filtered or unexported fields
}

BoundTableOperator holds a read only reader, knows how to iterate catalog entries.

func NewBoundTableOperator

func NewBoundTableOperator(catalog *catalog.Catalog,
	reader *Reader,
	scope Scope,
	dbID, tableID uint64,
	visitor catalog.Processor) *BoundTableOperator

func (*BoundTableOperator) Run

func (c *BoundTableOperator) Run() error

Run takes a RespBuilder to visit every table/segment/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry

type CNCheckpointData added in v0.8.0

type CNCheckpointData struct {
	// contains filtered or unexported fields
}

func NewCNCheckpointData added in v0.8.0

func NewCNCheckpointData() *CNCheckpointData

func (*CNCheckpointData) GetCloseCB added in v1.0.0

func (data *CNCheckpointData) GetCloseCB(version uint32, m *mpool.MPool) func()

func (*CNCheckpointData) GetTableDataFromBats added in v1.0.0

func (data *CNCheckpointData) GetTableDataFromBats(tid uint64, bats []*batch.Batch) (ins, del, cnIns, segDel *api.Batch, err error)

func (*CNCheckpointData) GetTableMeta added in v0.8.0

func (data *CNCheckpointData) GetTableMeta(tableID uint64, version uint32, loc objectio.Location) (meta *CheckpointMeta)

func (*CNCheckpointData) InitMetaIdx added in v1.0.0

func (data *CNCheckpointData) InitMetaIdx(ctx context.Context, version uint32, reader *blockio.BlockReader, location objectio.Location, m *mpool.MPool) error

func (*CNCheckpointData) PrefetchFrom added in v0.8.0

func (data *CNCheckpointData) PrefetchFrom(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location,
	tableID uint64) (err error)

func (*CNCheckpointData) PrefetchMetaFrom added in v1.0.0

func (data *CNCheckpointData) PrefetchMetaFrom(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	service fileservice.FileService,
	tableID uint64) (err error)

func (*CNCheckpointData) PrefetchMetaIdx added in v1.0.0

func (data *CNCheckpointData) PrefetchMetaIdx(
	ctx context.Context,
	version uint32,
	idxes []uint16,
	key objectio.Location,
	service fileservice.FileService,
) (err error)

func (*CNCheckpointData) ReadFromData added in v1.0.0

func (data *CNCheckpointData) ReadFromData(
	ctx context.Context,
	tableID uint64,
	location objectio.Location,
	reader *blockio.BlockReader,
	version uint32,
	m *mpool.MPool,
) (dataBats []*batch.Batch, err error)

func (*CNCheckpointData) ReadFromDataWithKey added in v1.0.0

func (data *CNCheckpointData) ReadFromDataWithKey(
	ctx context.Context,
	location objectio.Location,
	fs fileservice.FileService,
	m *mpool.MPool,
) (cnBatch *batch.Batch, err error)

type CatalogLogtailRespBuilder

type CatalogLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewCatalogLogtailRespBuilder

func NewCatalogLogtailRespBuilder(ctx context.Context, scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder

func (*CatalogLogtailRespBuilder) BuildResp

func (*CatalogLogtailRespBuilder) Close

func (b *CatalogLogtailRespBuilder) Close()

func (*CatalogLogtailRespBuilder) VisitDB

func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error

VisitDB = catalog.Processor.OnDatabase

func (*CatalogLogtailRespBuilder) VisitTbl

func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error

VisitTbl = catalog.Processor.OnTable

type CheckpointClient

type CheckpointClient interface {
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
}

type CheckpointData

type CheckpointData struct {
	// contains filtered or unexported fields
}

func LoadCheckpointEntriesFromKey added in v1.0.0

func LoadCheckpointEntriesFromKey(ctx context.Context, fs fileservice.FileService, location objectio.Location, version uint32) ([]objectio.Location, *CheckpointData, error)

func NewCheckpointData

func NewCheckpointData() *CheckpointData

func (*CheckpointData) ApplyReplayTo

func (data *CheckpointData) ApplyReplayTo(
	c *catalog.Catalog,
	dataFactory catalog.DataFactory,
) (err error)

func (*CheckpointData) Close

func (data *CheckpointData) Close()

func (*CheckpointData) CloseWhenLoadFromCache added in v1.0.0

func (data *CheckpointData) CloseWhenLoadFromCache(version uint32)

func (*CheckpointData) GetBatches added in v1.0.0

func (data *CheckpointData) GetBatches() []*containers.Batch

func (*CheckpointData) GetBlkBatchs

func (*CheckpointData) GetDBBatchs

func (*CheckpointData) GetSegBatchs

func (*CheckpointData) GetTNBlkBatchs added in v1.0.0

func (data *CheckpointData) GetTNBlkBatchs() (
	*containers.Batch,
	*containers.Batch,
	*containers.Batch,
	*containers.Batch)

func (*CheckpointData) GetTblBatchs

func (*CheckpointData) PrefetchFrom added in v0.8.0

func (data *CheckpointData) PrefetchFrom(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CheckpointData) PrefetchMeta added in v1.0.0

func (data *CheckpointData) PrefetchMeta(
	ctx context.Context,
	version uint32,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CheckpointData) PrintData

func (data *CheckpointData) PrintData()

func (*CheckpointData) ReadFrom

func (data *CheckpointData) ReadFrom(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	reader *blockio.BlockReader,
	fs fileservice.FileService,
	m *mpool.MPool,
) (err error)

TODO: There need a global io pool

func (*CheckpointData) ReadTNMetaBatch added in v1.0.0

func (data *CheckpointData) ReadTNMetaBatch(
	ctx context.Context,
	version uint32,
	location objectio.Location,
	reader *blockio.BlockReader,
) (err error)

func (*CheckpointData) UpdateBlkMeta

func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)

func (*CheckpointData) UpdateSegMeta added in v0.8.0

func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)

func (*CheckpointData) WriteTo

func (data *CheckpointData) WriteTo(
	fs fileservice.FileService,
	blockRows int,
	checkpointSize int,
) (CNLocation, TNLocation objectio.Location, err error)

type CheckpointMeta

type CheckpointMeta struct {
	// contains filtered or unexported fields
}

func NewCheckpointMeta

func NewCheckpointMeta() *CheckpointMeta

func (*CheckpointMeta) DecodeFromString added in v1.0.0

func (m *CheckpointMeta) DecodeFromString(keys [][]byte) (err error)

func (*CheckpointMeta) String added in v1.0.0

func (m *CheckpointMeta) String() string

type Collector

type Collector interface {
	String() string
	Run()
	ScanInRange(from, to types.TS) (*DirtyTreeEntry, int)
	ScanInRangePruned(from, to types.TS) *DirtyTreeEntry
	IsCommitted(from, to types.TS) bool
	GetAndRefreshMerged() *DirtyTreeEntry
	Merge() *DirtyTreeEntry
	GetMaxLSN(from, to types.TS) uint64
	Init(maxts types.TS)
}

type DirtyEntryInterceptor

type DirtyEntryInterceptor = catalog.Processor

type DirtyTreeEntry

type DirtyTreeEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDirtyTreeEntry

func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry

func NewEmptyDirtyTreeEntry

func NewEmptyDirtyTreeEntry() *DirtyTreeEntry

func (*DirtyTreeEntry) GetTimeRange

func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)

func (*DirtyTreeEntry) GetTree

func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)

func (*DirtyTreeEntry) IsEmpty

func (entry *DirtyTreeEntry) IsEmpty() bool

func (*DirtyTreeEntry) Merge

func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)

func (*DirtyTreeEntry) String

func (entry *DirtyTreeEntry) String() string

type GlobalCollector added in v0.7.0

type GlobalCollector struct {
	*BaseCollector
	// contains filtered or unexported fields
}

func NewGlobalCollector added in v0.7.0

func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector

func (*GlobalCollector) VisitBlk added in v0.7.0

func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error

func (*GlobalCollector) VisitDB added in v0.7.0

func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error

func (*GlobalCollector) VisitSeg added in v0.7.0

func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error

func (*GlobalCollector) VisitTable added in v0.7.0

func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error

type IncrementalCollector

type IncrementalCollector struct {
	*BaseCollector
}

func NewIncrementalCollector

func NewIncrementalCollector(start, end types.TS) *IncrementalCollector

type Logtailer added in v0.7.0

type Logtailer interface {
	// RangeLogtail returns logtail for all tables within the range (from, to].
	// NOTE: caller should keep time range monotonous, or there would be a checkpoint.
	RangeLogtail(
		ctx context.Context, from, to timestamp.Timestamp,
	) ([]logtail.TableLogtail, []func(), error)

	RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

	// TableLogtail returns logtail for the specified table.
	//
	// NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value.
	TableLogtail(
		ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
	) (logtail.TableLogtail, func(), error)

	// Now is a time getter from TxnManager. Users of Logtailer should get a timestamp
	// from Now and use the timestamp to collect logtail, in that case, all txn prepared
	// before it are visible.
	Now() (timestamp.Timestamp, timestamp.Timestamp)
}

Logtailer provides logtail for the specified table.

type LogtailerImpl added in v0.7.0

type LogtailerImpl struct {
	// contains filtered or unexported fields
}

func NewLogtailer added in v0.7.0

func NewLogtailer(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog) *LogtailerImpl

func (*LogtailerImpl) Now added in v0.8.0

Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.

func (*LogtailerImpl) RangeLogtail added in v0.7.0

func (l *LogtailerImpl) RangeLogtail(
	ctx context.Context, from, to timestamp.Timestamp,
) ([]logtail.TableLogtail, []func(), error)

RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table

func (*LogtailerImpl) RegisterCallback added in v0.8.0

func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

func (*LogtailerImpl) TableLogtail added in v0.7.0

func (l *LogtailerImpl) TableLogtail(
	ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
) (logtail.TableLogtail, func(), error)

TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`

type Manager added in v0.7.0

type Manager struct {
	txnbase.NoopCommitListener
	// contains filtered or unexported fields
}

Logtail manager holds sorted txn handles. Its main jobs:

- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp

func NewManager added in v0.7.0

func NewManager(rt *dbutils.Runtime, blockSize int, nowClock func() types.TS) *Manager

func (*Manager) GCByTS added in v0.7.0

func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)

func (*Manager) GetReader added in v0.7.0

func (mgr *Manager) GetReader(from, to types.TS) *Reader

GetReader get a snapshot of all txn prepared between from and to.

func (*Manager) GetTableOperator added in v0.7.0

func (mgr *Manager) GetTableOperator(
	from, to types.TS,
	catalog *catalog.Catalog,
	dbID, tableID uint64,
	scope Scope,
	visitor catalog.Processor,
) *BoundTableOperator

func (*Manager) OnEndPrePrepare added in v0.7.0

func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)

OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager

func (*Manager) OnEndPrepareWAL added in v0.8.0

func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)

func (*Manager) RegisterCallback added in v0.8.0

func (mgr *Manager) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) error

func (*Manager) Start added in v0.8.0

func (mgr *Manager) Start()

func (*Manager) Stop added in v0.8.0

func (mgr *Manager) Stop()

func (*Manager) TryCompactTable added in v1.0.0

func (mgr *Manager) TryCompactTable()

type Reader added in v0.7.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a snapshot of all txn prepared between from and to. Dirty tables/segments/blocks can be queried based on those txn

func (*Reader) GetDirty added in v0.7.0

func (r *Reader) GetDirty() (tree *model.Tree, count int)

Merge all dirty table/segment/block into one dirty tree

func (*Reader) GetDirtyByTable added in v0.7.0

func (r *Reader) GetDirtyByTable(
	dbID, id uint64,
) (tree *model.TableTree)

Merge all dirty table/segment/block of **a table** into one tree

func (*Reader) GetMaxLSN added in v0.7.0

func (r *Reader) GetMaxLSN() (maxLsn uint64)

TODO: optimize

func (*Reader) HasCatalogChanges added in v0.7.0

func (r *Reader) HasCatalogChanges() bool

HasCatalogChanges returns true if any txn in the reader modified the Catalog

func (*Reader) IsCommitted added in v0.8.0

func (r *Reader) IsCommitted() bool

type RespBuilder

type RespBuilder interface {
	catalog.Processor
	BuildResp() (api.SyncLogTailResp, error)
	Close()
}

type RowT added in v0.7.0

type RowT = *txnRow

type Scope

type Scope = int
const (
	// changes for mo_databases
	ScopeDatabases Scope = iota + 1
	// changes for mo_tables
	ScopeTables
	// changes for mo_columns
	ScopeColumns
	// changes for user tables
	ScopeUserTables
)

func DecideTableScope added in v0.7.0

func DecideTableScope(tableID uint64) Scope

type TableLogtailRespBuilder

type TableLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewTableLogtailRespBuilder

func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder

func (*TableLogtailRespBuilder) BuildResp

func (*TableLogtailRespBuilder) Close

func (b *TableLogtailRespBuilder) Close()

func (*TableLogtailRespBuilder) VisitBlk

func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error

VisitBlk = catalog.Processor.OnBlock

func (*TableLogtailRespBuilder) VisitSeg added in v0.8.0

type TableMeta added in v1.0.0

type TableMeta struct {
	common.ClosedInterval
	// contains filtered or unexported fields
}

func NewTableMeta added in v1.0.0

func NewTableMeta() *TableMeta

func (*TableMeta) String added in v1.0.0

func (m *TableMeta) String() string

type TableRespKind added in v0.8.0

type TableRespKind int
const (
	TableRespKind_Data TableRespKind = iota
	TableRespKind_Blk
	TableRespKind_Seg
)

type TxnLogtailRespBuilder added in v0.8.0

type TxnLogtailRespBuilder struct {
	// contains filtered or unexported fields
}

func NewTxnLogtailRespBuilder added in v0.8.0

func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder

func (*TxnLogtailRespBuilder) BuildResp added in v0.8.0

func (b *TxnLogtailRespBuilder) BuildResp()

func (*TxnLogtailRespBuilder) Close added in v0.8.0

func (b *TxnLogtailRespBuilder) Close()

func (*TxnLogtailRespBuilder) CollectLogtail added in v0.8.0

func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())

type TxnTable added in v0.7.0

type TxnTable struct {
	*model.AOT[BlockT, RowT]
}

func NewTxnTable added in v0.7.0

func NewTxnTable(blockSize int, nowClock func() types.TS) *TxnTable

func (*TxnTable) AddTxn added in v0.7.0

func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)

func (*TxnTable) ForeachRowInBetween added in v0.7.0

func (table *TxnTable) ForeachRowInBetween(
	from, to types.TS,
	skipBlkOp func(blk BlockT) bool,
	rowOp func(row RowT) (goNext bool),
) (readRows int)

func (*TxnTable) TruncateByTimeStamp added in v0.7.0

func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)

func (*TxnTable) TryCompact added in v1.0.0

func (table *TxnTable) TryCompact(from types.TS, rt *dbutils.Runtime) (to types.TS)

Directories

Path Synopsis
This package implements client and server for logtail push model.
This package implements client and server for logtail push model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL