logtail

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SnapshotAttr_SegID                     = catalog.SnapshotAttr_SegID
	SnapshotAttr_TID                       = catalog.SnapshotAttr_TID
	SnapshotAttr_DBID                      = catalog.SnapshotAttr_DBID
	SegmentAttr_ID                         = catalog.SegmentAttr_ID
	SegmentAttr_CreateAt                   = catalog.SegmentAttr_CreateAt
	SegmentAttr_State                      = catalog.SegmentAttr_State
	SegmentAttr_Sorted                     = catalog.SegmentAttr_Sorted
	SnapshotAttr_BlockMaxRow               = catalog.SnapshotAttr_BlockMaxRow
	SnapshotAttr_SegmentMaxBlock           = catalog.SnapshotAttr_SegmentMaxBlock
	SnapshotMetaAttr_Tid                   = "table_id"
	SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start"
	SnapshotMetaAttr_BlockInsertBatchEnd   = "block_insert_batch_end"
	SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start"
	SnapshotMetaAttr_BlockDeleteBatchEnd   = "block_delete_batch_end"
)
View Source
const (
	MetaIDX uint16 = iota

	DBInsertIDX
	DBInsertTxnIDX
	DBDeleteIDX
	DBDeleteTxnIDX

	TBLInsertIDX
	TBLInsertTxnIDX
	TBLDeleteIDX
	TBLDeleteTxnIDX
	TBLColInsertIDX
	TBLColDeleteIDX

	SEGInsertIDX
	SEGInsertTxnIDX
	SEGDeleteIDX
	SEGDeleteTxnIDX

	BLKMetaInsertIDX
	BLKMetaInsertTxnIDX
	BLKMetaDeleteIDX
	BLKMetaDeleteTxnIDX

	BLKDNMetaInsertIDX
	BLKDNMetaInsertTxnIDX
	BLKDNMetaDeleteIDX
	BLKDNMetaDeleteTxnIDX

	BLKCNMetaInsertIDX

	BLKInsertIDX
	BLKInsertTxnIDX
	BLKDeleteIDX
	BLKDeleteTxnIDX
)
View Source
const MaxIDX = BLKCNMetaInsertIDX + 1
View Source
const PrintN = 3
View Source
const Size90M = 90 * 1024 * 1024

Variables

View Source
var (
	// for blk meta response
	BlkMetaSchema *catalog.Schema
	DelSchema     *catalog.Schema
	SegSchema     *catalog.Schema
	TxnNodeSchema *catalog.Schema
	DBDNSchema    *catalog.Schema
	TblDNSchema   *catalog.Schema
	SegDNSchema   *catalog.Schema
	BlkDNSchema   *catalog.Schema
	MetaSchema    *catalog.Schema
)
View Source
var (
	SegmentSchemaAttr = []string{
		SegmentAttr_ID,
		SegmentAttr_CreateAt,
		SegmentAttr_State,
		SegmentAttr_Sorted,
	}
	SegmentSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_bool, 0, 0, 0),
		types.New(types.T_bool, 0, 0, 0),
	}
	TxnNodeSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
	}
	TxnNodeSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
	}
	DBDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	DBDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
	}
	TblDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_BlockMaxRow,
		SnapshotAttr_SegmentMaxBlock,
	}
	TblDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint16, 0, 0, 0),
	}
	SegmentDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	SegmentDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
	}
	BlockDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_SegID,
		pkgcatalog.BlockMeta_MetaLoc,
		pkgcatalog.BlockMeta_DeltaLoc,
	}
	BlockDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0, 0),
	}
	MetaSchemaAttr = []string{
		SnapshotMetaAttr_Tid,
		SnapshotMetaAttr_BlockInsertBatchStart,
		SnapshotMetaAttr_BlockInsertBatchEnd,
		SnapshotMetaAttr_BlockDeleteBatchStart,
		SnapshotMetaAttr_BlockDeleteBatchEnd,
	}
	MetaShcemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
	}
	BaseAttr = []string{
		catalog.AttrRowID,
		catalog.AttrCommitTs,
	}
	BaseTypes = []types.Type{
		types.T_Rowid.ToType(),
		types.T_TS.ToType(),
	}
)

Functions

func BatchToString

func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func DebugBatchToString

func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string

func GlobalCheckpointDataFactory added in v0.7.0

func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)

func HandleSyncLogTailReq

func HandleSyncLogTailReq(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog,
	req api.SyncLogTailReq,
	canRetry bool) (resp api.SyncLogTailResp, err error)

func IncrementalCheckpointDataFactory

func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)

func LoadCheckpointEntries

func LoadCheckpointEntries(
	ctx context.Context,
	metLoc string,
	tableID uint64,
	tableName string,
	dbID uint64,
	dbName string,
	fs fileservice.FileService) (entries []*api.Entry, err error)

func NewDirtyCollector

func NewDirtyCollector(
	sourcer *Manager,
	clock clock.Clock,
	catalog *catalog.Catalog,
	interceptor DirtyEntryInterceptor) *dirtyCollector

func ToStringTemplate

func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string

Types

type BaseCollector added in v0.7.0

type BaseCollector struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func (*BaseCollector) Close added in v0.7.0

func (collector *BaseCollector) Close()

func (*BaseCollector) OrphanData added in v0.7.0

func (collector *BaseCollector) OrphanData() *CheckpointData

func (*BaseCollector) VisitBlk added in v0.7.0

func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)

func (*BaseCollector) VisitDB added in v0.7.0

func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error

func (*BaseCollector) VisitSeg added in v0.7.0

func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)

func (*BaseCollector) VisitTable added in v0.7.0

func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)

type BaseOperator

type BaseOperator struct {
	// contains filtered or unexported fields
}

BoundTableOperator holds a read only reader, knows how to iter entry. Drive a entry visitor, which acts as an api resp builder

type BlockT added in v0.7.0

type BlockT = *txnBlock

type BoundOperator

type BoundOperator struct {
	*BaseOperator
	// contains filtered or unexported fields
}

func NewBoundOperator

func NewBoundOperator(catalog *catalog.Catalog,
	reader *Reader,
	visitor catalog.Processor) *BoundOperator

func (*BoundOperator) Run

func (op *BoundOperator) Run() (err error)

type BoundTableOperator

type BoundTableOperator struct {
	*BoundOperator
	// contains filtered or unexported fields
}

func NewBoundTableOperator

func NewBoundTableOperator(catalog *catalog.Catalog,
	reader *Reader,
	scope Scope,
	dbID, tableID uint64,
	visitor catalog.Processor) *BoundTableOperator

func (*BoundTableOperator) Run

func (c *BoundTableOperator) Run() error

type CatalogLogtailRespBuilder

type CatalogLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from catalog entry impl catalog.Processor interface, driven by LogtailCollector

func NewCatalogLogtailRespBuilder

func NewCatalogLogtailRespBuilder(scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder

func (*CatalogLogtailRespBuilder) BuildResp

func (*CatalogLogtailRespBuilder) Close

func (b *CatalogLogtailRespBuilder) Close()

func (*CatalogLogtailRespBuilder) VisitDB

func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error

func (*CatalogLogtailRespBuilder) VisitTbl

func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error

type CheckpointClient

type CheckpointClient interface {
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	FlushTable(dbID, tableID uint64, ts types.TS) error
}

type CheckpointData

type CheckpointData struct {
	// contains filtered or unexported fields
}

func NewCheckpointData

func NewCheckpointData() *CheckpointData

func (*CheckpointData) ApplyReplayTo

func (data *CheckpointData) ApplyReplayTo(
	c *catalog.Catalog,
	dataFactory catalog.DataFactory) (err error)

func (*CheckpointData) Close

func (data *CheckpointData) Close()

func (*CheckpointData) GetBlkBatchs

func (*CheckpointData) GetDBBatchs

func (*CheckpointData) GetDNBlkBatchs

func (data *CheckpointData) GetDNBlkBatchs() (
	*containers.Batch,
	*containers.Batch,
	*containers.Batch,
	*containers.Batch)

func (*CheckpointData) GetSegBatchs

func (*CheckpointData) GetTableData

func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)

func (*CheckpointData) GetTableMeta

func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)

func (*CheckpointData) GetTblBatchs

func (*CheckpointData) PrintData

func (data *CheckpointData) PrintData()

func (*CheckpointData) ReadFrom

func (data *CheckpointData) ReadFrom(
	reader *blockio.Reader,
	scheduler tasks.JobScheduler,
	m *mpool.MPool) (err error)

TODO: There need a global io pool

func (*CheckpointData) UpdateBlkMeta

func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)

func (*CheckpointData) WriteTo

func (data *CheckpointData) WriteTo(
	writer *blockio.Writer) (blks []objectio.BlockObject, err error)

type CheckpointMeta

type CheckpointMeta struct {
	// contains filtered or unexported fields
}

func NewCheckpointMeta

func NewCheckpointMeta() *CheckpointMeta

type Collector

type Collector interface {
	String() string
	Run()
	ScanInRange(from, to types.TS) (*DirtyTreeEntry, int)
	ScanInRangePruned(from, to types.TS) *DirtyTreeEntry
	GetAndRefreshMerged() *DirtyTreeEntry
	Merge() *DirtyTreeEntry
	GetMaxLSN(from, to types.TS) uint64
	Init(maxts types.TS)
}

type DirtyEntryInterceptor

type DirtyEntryInterceptor = catalog.Processor

type DirtyTreeEntry

type DirtyTreeEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDirtyTreeEntry

func NewDirtyTreeEntry(start, end types.TS, tree *common.Tree) *DirtyTreeEntry

func NewEmptyDirtyTreeEntry

func NewEmptyDirtyTreeEntry() *DirtyTreeEntry

func (*DirtyTreeEntry) GetTimeRange

func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)

func (*DirtyTreeEntry) GetTree

func (entry *DirtyTreeEntry) GetTree() (tree *common.Tree)

func (*DirtyTreeEntry) IsEmpty

func (entry *DirtyTreeEntry) IsEmpty() bool

func (*DirtyTreeEntry) Merge

func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)

func (*DirtyTreeEntry) String

func (entry *DirtyTreeEntry) String() string

type GlobalCollector added in v0.7.0

type GlobalCollector struct {
	*BaseCollector
	// contains filtered or unexported fields
}

func NewGlobalCollector added in v0.7.0

func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector

func (*GlobalCollector) VisitBlk added in v0.7.0

func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error

func (*GlobalCollector) VisitDB added in v0.7.0

func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error

func (*GlobalCollector) VisitSeg added in v0.7.0

func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error

func (*GlobalCollector) VisitTable added in v0.7.0

func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error

type IncrementalCollector

type IncrementalCollector struct {
	*BaseCollector
}

func NewIncrementalCollector

func NewIncrementalCollector(start, end types.TS) *IncrementalCollector

type Logtailer added in v0.7.0

type Logtailer interface {
	// RangeLogtail returns logtail for all tables within the range (from, to].
	RangeLogtail(
		ctx context.Context, from, to timestamp.Timestamp,
	) ([]logtail.TableLogtail, error)

	// TableLogtail returns logtail for the specified table.
	//
	// NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value.
	TableLogtail(
		ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
	) (logtail.TableLogtail, error)
}

Logtailer provides logtail for the specified table.

type LogtailerImpl added in v0.7.0

type LogtailerImpl struct {
	// contains filtered or unexported fields
}

func NewLogtailer added in v0.7.0

func NewLogtailer(
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog) *LogtailerImpl

func (*LogtailerImpl) RangeLogtail added in v0.7.0

func (l *LogtailerImpl) RangeLogtail(
	ctx context.Context, from, to timestamp.Timestamp,
) ([]logtail.TableLogtail, error)

RangeLogtail returns logtail for all tables within the range (from, to].

func (*LogtailerImpl) TableLogtail added in v0.7.0

func (l *LogtailerImpl) TableLogtail(
	ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
) (logtail.TableLogtail, error)

TableLogtail returns logtail for the specified table.

type Manager added in v0.7.0

type Manager struct {
	txnbase.NoopCommitListener
	// contains filtered or unexported fields
}

func NewManager added in v0.7.0

func NewManager(blockSize int, clock clock.Clock) *Manager

func (*Manager) GCByTS added in v0.7.0

func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)

func (*Manager) GetReader added in v0.7.0

func (mgr *Manager) GetReader(from, to types.TS) *Reader

func (*Manager) GetTableOperator added in v0.7.0

func (mgr *Manager) GetTableOperator(
	from, to types.TS,
	catalog *catalog.Catalog,
	dbID, tableID uint64,
	scope Scope,
	visitor catalog.Processor,
) *BoundTableOperator

func (*Manager) OnEndPrePrepare added in v0.7.0

func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)

type Reader added in v0.7.0

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) GetDirty added in v0.7.0

func (r *Reader) GetDirty() (tree *common.Tree, count int)

func (*Reader) GetDirtyByTable added in v0.7.0

func (r *Reader) GetDirtyByTable(
	dbID, id uint64,
) (tree *common.TableTree)

func (*Reader) GetMaxLSN added in v0.7.0

func (r *Reader) GetMaxLSN() (maxLsn uint64)

TODO: optimize

func (*Reader) HasCatalogChanges added in v0.7.0

func (r *Reader) HasCatalogChanges() bool

type RespBuilder

type RespBuilder interface {
	catalog.Processor
	BuildResp() (api.SyncLogTailResp, error)
	Close()
}

type RowT added in v0.7.0

type RowT = *txnRow

type Scope

type Scope = int
const (
	// changes for mo_databases
	ScopeDatabases Scope = iota + 1
	// changes for mo_tables
	ScopeTables
	// changes for mo_columns
	ScopeColumns
	// changes for user tables
	ScopeUserTables
)

func DecideTableScope added in v0.7.0

func DecideTableScope(tableID uint64) Scope

type TableLogtailRespBuilder

type TableLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func NewTableLogtailRespBuilder

func NewTableLogtailRespBuilder(ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder

func (*TableLogtailRespBuilder) BuildResp

func (*TableLogtailRespBuilder) Close

func (b *TableLogtailRespBuilder) Close()

func (*TableLogtailRespBuilder) VisitBlk

func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error

type TxnTable added in v0.7.0

type TxnTable struct {
	*model.AOT[BlockT, RowT]
}

func NewTxnTable added in v0.7.0

func NewTxnTable(blockSize int, clock *types.TsAlloctor) *TxnTable

func (*TxnTable) AddTxn added in v0.7.0

func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)

func (*TxnTable) ForeachRowInBetween added in v0.7.0

func (table *TxnTable) ForeachRowInBetween(
	from, to types.TS,
	skipBlkOp func(blk BlockT) bool,
	rowOp func(row RowT) (goNext bool),
) (readRows int)

func (*TxnTable) TruncateByTimeStamp added in v0.7.0

func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)

Directories

Path Synopsis
This package implements client and server for logtail push model.
This package implements client and server for logtail push model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL