logtail

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SnapshotAttr_SegID                     = catalog.SnapshotAttr_SegID
	SnapshotAttr_TID                       = catalog.SnapshotAttr_TID
	SnapshotAttr_DBID                      = catalog.SnapshotAttr_DBID
	SegmentAttr_ID                         = catalog.SegmentAttr_ID
	SegmentAttr_CreateAt                   = catalog.SegmentAttr_CreateAt
	SegmentAttr_State                      = catalog.SegmentAttr_State
	SnapshotAttr_BlockMaxRow               = catalog.SnapshotAttr_BlockMaxRow
	SnapshotAttr_SegmentMaxBlock           = catalog.SnapshotAttr_SegmentMaxBlock
	SnapshotMetaAttr_Tid                   = "table_id"
	SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start"
	SnapshotMetaAttr_BlockInsertBatchEnd   = "block_insert_batch_end"
	SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start"
	SnapshotMetaAttr_BlockDeleteBatchEnd   = "block_delete_batch_end"
)
View Source
const PrintN = 3
View Source
const Size90M = 80 * 1024 * 1024

Variables

View Source
var (
	// for blk meta response
	BlkMetaSchema *catalog.Schema
	DelSchema     *catalog.Schema
	SegSchema     *catalog.Schema
	TxnNodeSchema *catalog.Schema
	DBDNSchema    *catalog.Schema
	TblDNSchema   *catalog.Schema
	SegDNSchema   *catalog.Schema
	BlkDNSchema   *catalog.Schema
	MetaSchema    *catalog.Schema
)
View Source
var (
	SegmentSchemaAttr = []string{
		SegmentAttr_ID,
		SegmentAttr_CreateAt,
		SegmentAttr_State,
	}
	SegmentSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_bool, 0, 0, 0),
	}
	TxnNodeSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
	}
	TxnNodeSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
	}
	DBDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	DBDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
	}
	TblDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_BlockMaxRow,
		SnapshotAttr_SegmentMaxBlock,
	}
	TblDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint16, 0, 0, 0),
	}
	SegmentDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	SegmentDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
	}
	BlockDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_SegID,
		pkgcatalog.BlockMeta_MetaLoc,
		pkgcatalog.BlockMeta_DeltaLoc,
	}
	BlockDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_TS, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint32, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_varchar, 0, 0, 0),
		types.New(types.T_varchar, 0, 0, 0),
	}
	MetaSchemaAttr = []string{
		SnapshotMetaAttr_Tid,
		SnapshotMetaAttr_BlockInsertBatchStart,
		SnapshotMetaAttr_BlockInsertBatchEnd,
		SnapshotMetaAttr_BlockDeleteBatchStart,
		SnapshotMetaAttr_BlockDeleteBatchEnd,
	}
	MetaShcemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
		types.New(types.T_int32, 0, 0, 0),
	}
	BaseAttr = []string{
		catalog.AttrRowID,
		catalog.AttrCommitTs,
	}
	BaseTypes = []types.Type{
		types.T_Rowid.ToType(),
		types.T_TS.ToType(),
	}
)

Functions

func BatchToString

func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func DebugBatchToString

func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func HandleSyncLogTailReq

func HandleSyncLogTailReq(
	ckpClient CheckpointClient,
	mgr *LogtailMgr,
	c *catalog.Catalog,
	req api.SyncLogTailReq,
	canRetry bool) (resp api.SyncLogTailResp, err error)

func IncrementalCheckpointDataFactory

func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)

func LoadCheckpointEntries

func LoadCheckpointEntries(
	metLoc string,
	tableID uint64,
	tableName string,
	dbID uint64,
	dbName string,
	fs fileservice.FileService) (entries []*api.Entry, err error)

func NewDirtyCollector

func NewDirtyCollector(
	sourcer *LogtailMgr,
	clock clock.Clock,
	catalog *catalog.Catalog,
	interceptor DirtyEntryInterceptor) *dirtyCollector

func ToStringTemplate

func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string

Types

type BaseOperator

type BaseOperator struct {
	// contains filtered or unexported fields
}

BoundTableOperator holds a read only reader, knows how to iter entry. Drive a entry visitor, which acts as an api resp builder

type BoundOperator

type BoundOperator struct {
	*BaseOperator
	// contains filtered or unexported fields
}

func NewBoundOperator

func NewBoundOperator(catalog *catalog.Catalog,
	reader *LogtailReader,
	visitor catalog.Processor) *BoundOperator

func (*BoundOperator) Run

func (op *BoundOperator) Run() (err error)

type BoundTableOperator

type BoundTableOperator struct {
	*BoundOperator
	// contains filtered or unexported fields
}

func NewBoundTableOperator

func NewBoundTableOperator(catalog *catalog.Catalog,
	reader *LogtailReader,
	scope Scope,
	dbID, tableID uint64,
	visitor catalog.Processor) *BoundTableOperator

func (*BoundTableOperator) Run

func (c *BoundTableOperator) Run() error

type CatalogLogtailRespBuilder

type CatalogLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from catalog entry impl catalog.Processor interface, driven by LogtailCollector

func NewCatalogLogtailRespBuilder

func NewCatalogLogtailRespBuilder(scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder

func (*CatalogLogtailRespBuilder) BuildResp

func (*CatalogLogtailRespBuilder) Close

func (b *CatalogLogtailRespBuilder) Close()

func (*CatalogLogtailRespBuilder) VisitDB

func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error

func (*CatalogLogtailRespBuilder) VisitTbl

func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error

type CheckpointClient

type CheckpointClient interface {
	CollectCheckpointsInRange(start, end types.TS) (location string, checkpointed types.TS)
	FlushTable(dbID, tableID uint64, ts types.TS) error
}

type CheckpointData

type CheckpointData struct {
	// contains filtered or unexported fields
}

func NewCheckpointData

func NewCheckpointData() *CheckpointData

func (*CheckpointData) ApplyReplayTo

func (data *CheckpointData) ApplyReplayTo(
	c *catalog.Catalog,
	dataFactory catalog.DataFactory) (err error)

func (*CheckpointData) Close

func (data *CheckpointData) Close()

func (*CheckpointData) GetBlkBatchs

func (*CheckpointData) GetDBBatchs

func (*CheckpointData) GetDNBlkBatchs

func (*CheckpointData) GetSegBatchs

func (*CheckpointData) GetTableData

func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)

func (*CheckpointData) GetTableMeta

func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)

func (*CheckpointData) GetTblBatchs

func (*CheckpointData) PrintData

func (data *CheckpointData) PrintData()

func (*CheckpointData) ReadFrom

func (data *CheckpointData) ReadFrom(
	reader *blockio.Reader,
	m *mpool.MPool) (err error)

func (*CheckpointData) UpdateBlkMeta

func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)

func (*CheckpointData) WriteTo

func (data *CheckpointData) WriteTo(
	writer *blockio.Writer) (blks []objectio.BlockObject, err error)

type CheckpointMeta

type CheckpointMeta struct {
	// contains filtered or unexported fields
}

func NewCheckpointMeta

func NewCheckpointMeta() *CheckpointMeta

type Collector

type Collector interface {
	String() string
	Run()
	ScanInRange(from, to types.TS) (*DirtyTreeEntry, int)
	ScanInRangePruned(from, to types.TS) *DirtyTreeEntry
	GetAndRefreshMerged() *DirtyTreeEntry
	Merge() *DirtyTreeEntry
	GetMaxLSN(from, to types.TS) uint64
	Init(maxts types.TS)
}

type DirtyEntryInterceptor

type DirtyEntryInterceptor = catalog.Processor

type DirtyTreeEntry

type DirtyTreeEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDirtyTreeEntry

func NewDirtyTreeEntry(start, end types.TS, tree *common.Tree) *DirtyTreeEntry

func NewEmptyDirtyTreeEntry

func NewEmptyDirtyTreeEntry() *DirtyTreeEntry

func (*DirtyTreeEntry) GetTimeRange

func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)

func (*DirtyTreeEntry) GetTree

func (entry *DirtyTreeEntry) GetTree() (tree *common.Tree)

func (*DirtyTreeEntry) IsEmpty

func (entry *DirtyTreeEntry) IsEmpty() bool

func (*DirtyTreeEntry) Merge

func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)

func (*DirtyTreeEntry) String

func (entry *DirtyTreeEntry) String() string

type IncrementalCollector

type IncrementalCollector struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func NewIncrementalCollector

func NewIncrementalCollector(start, end types.TS) *IncrementalCollector

func (*IncrementalCollector) Close

func (collector *IncrementalCollector) Close()

func (*IncrementalCollector) OrphanData

func (collector *IncrementalCollector) OrphanData() *CheckpointData

func (*IncrementalCollector) VisitBlk

func (collector *IncrementalCollector) VisitBlk(entry *catalog.BlockEntry) (err error)

func (*IncrementalCollector) VisitDB

func (collector *IncrementalCollector) VisitDB(entry *catalog.DBEntry) error

func (*IncrementalCollector) VisitSeg

func (collector *IncrementalCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)

func (*IncrementalCollector) VisitTable

func (collector *IncrementalCollector) VisitTable(entry *catalog.TableEntry) (err error)

type LogtailMgr

type LogtailMgr struct {
	txnbase.NoopCommitListener

	// Lock is used to protect pages. there are three cases to hold lock
	// 1. activePage is full and moves to pages
	// 2. prune txn because of checkpoint TODO
	// 3. copy btree
	// Not RwLock because a copied btree can be read without holding read lock
	sync.Mutex
	// contains filtered or unexported fields
}

func NewLogtailMgr

func NewLogtailMgr(pageSize int32, clock clock.Clock) *LogtailMgr

func (*LogtailMgr) AddTxn

func (l *LogtailMgr) AddTxn(txn txnif.AsyncTxn)

Notes: 1. AddTxn happens in a queue, it is safe to assume there is no concurrent AddTxn now. 2. the added txn has no prepareTS because it happens in OnEndPrePrepare, so it is safe to alloc ts to be minTs

func (*LogtailMgr) DecideScope

func (l *LogtailMgr) DecideScope(tableID uint64) Scope

func (*LogtailMgr) GetReader

func (l *LogtailMgr) GetReader(start, end types.TS) *LogtailReader

GetReader returns a read only reader of txns at call time. this is cheap operation, the returned reader can be accessed without any locks

func (*LogtailMgr) GetTableOperator

func (l *LogtailMgr) GetTableOperator(start, end types.TS,
	catalog *catalog.Catalog,
	dbID, tableID uint64,
	scope Scope,
	visitor catalog.Processor) *BoundTableOperator

func (*LogtailMgr) OnEndPrePrepare

func (l *LogtailMgr) OnEndPrePrepare(txn txnif.AsyncTxn)

LogtailMgr as a commit listener

type LogtailReader

type LogtailReader struct {
	// contains filtered or unexported fields
}

a read only view of txns

func (*LogtailReader) GetDirty

func (v *LogtailReader) GetDirty() (tree *common.Tree, count int)

func (*LogtailReader) GetDirtyByTable

func (v *LogtailReader) GetDirtyByTable(dbID, id uint64) (tree *common.TableTree)

func (*LogtailReader) GetMaxLSN

func (v *LogtailReader) GetMaxLSN() (maxLsn uint64)

func (*LogtailReader) HasCatalogChanges

func (v *LogtailReader) HasCatalogChanges() bool

type RespBuilder

type RespBuilder interface {
	catalog.Processor
	BuildResp() (api.SyncLogTailResp, error)
	Close()
}

type Scope

type Scope = int
const (
	// changes for mo_databases
	ScopeDatabases Scope = iota + 1
	// changes for mo_tables
	ScopeTables
	// changes for mo_columns
	ScopeColumns
	// changes for user tables
	ScopeUserTables
)

type TableLogtailRespBuilder

type TableLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func NewTableLogtailRespBuilder

func NewTableLogtailRespBuilder(ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder

func (*TableLogtailRespBuilder) BuildResp

func (*TableLogtailRespBuilder) Close

func (b *TableLogtailRespBuilder) Close()

func (*TableLogtailRespBuilder) VisitBlk

func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL