logtail

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SnapshotAttr_TID                       = catalog.SnapshotAttr_TID
	SnapshotAttr_DBID                      = catalog.SnapshotAttr_DBID
	SegmentAttr_ID                         = catalog.SegmentAttr_ID
	SegmentAttr_CreateAt                   = catalog.SegmentAttr_CreateAt
	SegmentAttr_SegNode                    = catalog.SegmentAttr_SegNode
	SnapshotAttr_BlockMaxRow               = catalog.SnapshotAttr_BlockMaxRow
	SnapshotAttr_SegmentMaxBlock           = catalog.SnapshotAttr_SegmentMaxBlock
	SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start"
	SnapshotMetaAttr_BlockInsertBatchEnd   = "block_insert_batch_end"
	SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start"
	SnapshotMetaAttr_BlockDeleteBatchEnd   = "block_delete_batch_end"
	SnapshotMetaAttr_SegDeleteBatchStart   = "seg_delete_batch_start"
	SnapshotMetaAttr_SegDeleteBatchEnd     = "seg_delete_batch_end"

	SnapshotAttr_SchemaExtra = catalog.SnapshotAttr_SchemaExtra
)
View Source
const (
	MetaIDX uint16 = iota

	DBInsertIDX
	DBInsertTxnIDX
	DBDeleteIDX
	DBDeleteTxnIDX

	TBLInsertIDX
	TBLInsertTxnIDX
	TBLDeleteIDX
	TBLDeleteTxnIDX
	TBLColInsertIDX
	TBLColDeleteIDX

	SEGInsertIDX
	SEGInsertTxnIDX
	SEGDeleteIDX
	SEGDeleteTxnIDX

	BLKMetaInsertIDX
	BLKMetaInsertTxnIDX
	BLKMetaDeleteIDX
	BLKMetaDeleteTxnIDX

	BLKDNMetaInsertIDX
	BLKDNMetaInsertTxnIDX
	BLKDNMetaDeleteIDX
	BLKDNMetaDeleteTxnIDX

	BLKCNMetaInsertIDX

	BLKInsertIDX
	BLKInsertTxnIDX
	BLKDeleteIDX
	BLKDeleteTxnIDX
)
View Source
const (
	Checkpoint_Meta_TID_IDX                = 2
	Checkpoint_Meta_Insert_Block_Start_IDX = 3
	Checkpoint_Meta_Insert_Block_End_IDX   = 4
	Checkpoint_Meta_Delete_Block_Start_IDX = 5
	Checkpoint_Meta_Delete_Block_End_IDX   = 6
	Checkpoint_Meta_Segment_Start_IDX      = 7
	Checkpoint_Meta_Segment_End_IDX        = 8
)
View Source
const (
	LogtailHeartbeatDuration = time.Millisecond * 2
)
View Source
const MaxIDX = BLKCNMetaInsertIDX + 1
View Source
const Size90M = 90 * 1024 * 1024

Variables

View Source
var (
	// for blk meta response
	BlkMetaSchema *catalog.Schema
	DelSchema     *catalog.Schema
	SegSchema     *catalog.Schema
	TxnNodeSchema *catalog.Schema
	DBDNSchema    *catalog.Schema
	TblDNSchema   *catalog.Schema
	SegDNSchema   *catalog.Schema
	BlkDNSchema   *catalog.Schema
	MetaSchema    *catalog.Schema
)
View Source
var (
	SegmentSchemaAttr = []string{
		SegmentAttr_ID,
		SegmentAttr_CreateAt,
		SegmentAttr_SegNode,
	}
	SegmentSchemaTypes = []types.Type{
		types.New(types.T_uuid, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_blob, 0, 0),
	}
	TxnNodeSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
	}
	TxnNodeSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
	}
	DBDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	DBDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	TblDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		SnapshotAttr_BlockMaxRow,
		SnapshotAttr_SegmentMaxBlock,
		SnapshotAttr_SchemaExtra,
	}
	TblDNSchemaType = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint16, 0, 0),
		types.New(types.T_varchar, 0, 0),
	}
	SegmentDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
	}
	SegmentDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
	}
	BlockDNSchemaAttr = []string{
		txnbase.SnapshotAttr_LogIndex_LSN,
		txnbase.SnapshotAttr_StartTS,
		txnbase.SnapshotAttr_PrepareTS,
		txnbase.SnapshotAttr_CommitTS,
		txnbase.SnapshotAttr_LogIndex_CSN,
		txnbase.SnapshotAttr_LogIndex_Size,
		SnapshotAttr_DBID,
		SnapshotAttr_TID,
		pkgcatalog.BlockMeta_MetaLoc,
		pkgcatalog.BlockMeta_DeltaLoc,
	}
	BlockDNSchemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_TS, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint32, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
		types.New(types.T_varchar, types.MaxVarcharLen, 0),
	}
	MetaSchemaAttr = []string{
		SnapshotAttr_TID,
		SnapshotMetaAttr_BlockInsertBatchStart,
		SnapshotMetaAttr_BlockInsertBatchEnd,
		SnapshotMetaAttr_BlockDeleteBatchStart,
		SnapshotMetaAttr_BlockDeleteBatchEnd,
		SnapshotMetaAttr_SegDeleteBatchStart,
		SnapshotMetaAttr_SegDeleteBatchEnd,
	}
	MetaShcemaTypes = []types.Type{
		types.New(types.T_uint64, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
		types.New(types.T_int32, 0, 0),
	}
	BaseAttr = []string{
		catalog.AttrRowID,
		catalog.AttrCommitTs,
	}
	BaseTypes = []types.Type{
		types.T_Rowid.ToType(),
		types.T_TS.ToType(),
	}
)

Functions

func BatchToString

func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string

func DataChangeToLogtailBatch added in v0.8.0

func DataChangeToLogtailBatch(src *containers.BatchWithVersion) *containers.Batch

GetDataWindowForLogtail returns the batch according to the writeSchema. columns are sorted by seqnum and vacancy is filled with zero value

func DebugBatchToString

func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string

func GlobalCheckpointDataFactory added in v0.7.0

func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)

func HandleSyncLogTailReq

func HandleSyncLogTailReq(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog,
	req api.SyncLogTailReq,
	canRetry bool) (resp api.SyncLogTailResp, closeCB func(), err error)

func IncrementalCheckpointDataFactory

func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)

func LoadBlkColumnsByMeta added in v0.8.0

func LoadBlkColumnsByMeta(cxt context.Context, colTypes []types.Type, colNames []string, id uint16, reader *blockio.BlockReader) (*containers.Batch, error)

func LoadCNBlkColumnsByMeta added in v0.8.0

func LoadCNBlkColumnsByMeta(cxt context.Context, colTypes []types.Type, colNames []string, id uint16, reader *blockio.BlockReader, m *mpool.MPool) (*batch.Batch, error)

func LoadCheckpointEntries

func LoadCheckpointEntries(
	ctx context.Context,
	metLoc string,
	tableID uint64,
	tableName string,
	dbID uint64,
	dbName string,
	fs fileservice.FileService) ([]*api.Entry, error)

func MockCallback added in v0.8.0

func MockCallback(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error

func NewDirtyCollector

func NewDirtyCollector(
	sourcer *Manager,
	clock clock.Clock,
	catalog *catalog.Catalog,
	interceptor DirtyEntryInterceptor) *dirtyCollector

func ToStringTemplate

func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string

Types

type BaseCollector added in v0.7.0

type BaseCollector struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

func (*BaseCollector) Close added in v0.7.0

func (collector *BaseCollector) Close()

func (*BaseCollector) OrphanData added in v0.7.0

func (collector *BaseCollector) OrphanData() *CheckpointData

func (*BaseCollector) VisitBlk added in v0.7.0

func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)

func (*BaseCollector) VisitDB added in v0.7.0

func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error

func (*BaseCollector) VisitSeg added in v0.7.0

func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)

func (*BaseCollector) VisitTable added in v0.7.0

func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)

type BlockT added in v0.7.0

type BlockT = *txnBlock

type BoundTableOperator

type BoundTableOperator struct {
	// contains filtered or unexported fields
}

BoundTableOperator holds a read only reader, knows how to iterate catalog entries.

func NewBoundTableOperator

func NewBoundTableOperator(catalog *catalog.Catalog,
	reader *Reader,
	scope Scope,
	dbID, tableID uint64,
	visitor catalog.Processor) *BoundTableOperator

func (*BoundTableOperator) Run

func (c *BoundTableOperator) Run() error

Run takes a RespBuilder to visit every table/segment/block touched by all txn in the Reader. During the visiting, RespBuiler will fetch information to return logtail entry

type CNCheckpointData added in v0.8.0

type CNCheckpointData struct {
	// contains filtered or unexported fields
}

func NewCNCheckpointData added in v0.8.0

func NewCNCheckpointData() *CNCheckpointData

func (*CNCheckpointData) GetTableData added in v0.8.0

func (data *CNCheckpointData) GetTableData(tid uint64) (ins, del, cnIns, segDel *api.Batch, err error)

func (*CNCheckpointData) GetTableMeta added in v0.8.0

func (data *CNCheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)

func (*CNCheckpointData) PrefetchFrom added in v0.8.0

func (data *CNCheckpointData) PrefetchFrom(
	ctx context.Context,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CNCheckpointData) ReadFrom added in v0.8.0

func (data *CNCheckpointData) ReadFrom(
	ctx context.Context,
	reader *blockio.BlockReader,
	m *mpool.MPool) (err error)

type CatalogLogtailRespBuilder

type CatalogLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from db and table entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewCatalogLogtailRespBuilder

func NewCatalogLogtailRespBuilder(ctx context.Context, scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder

func (*CatalogLogtailRespBuilder) BuildResp

func (*CatalogLogtailRespBuilder) Close

func (b *CatalogLogtailRespBuilder) Close()

func (*CatalogLogtailRespBuilder) VisitDB

func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error

VisitDB = catalog.Processor.OnDatabase

func (*CatalogLogtailRespBuilder) VisitTbl

func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error

VisitTbl = catalog.Processor.OnTable

type CheckpointClient

type CheckpointClient interface {
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
}

type CheckpointData

type CheckpointData struct {
	// contains filtered or unexported fields
}

func NewCheckpointData

func NewCheckpointData() *CheckpointData

func (*CheckpointData) ApplyReplayTo

func (data *CheckpointData) ApplyReplayTo(
	c *catalog.Catalog,
	dataFactory catalog.DataFactory) (err error)

func (*CheckpointData) Close

func (data *CheckpointData) Close()

func (*CheckpointData) GetBlkBatchs

func (*CheckpointData) GetDBBatchs

func (*CheckpointData) GetDNBlkBatchs

func (data *CheckpointData) GetDNBlkBatchs() (
	*containers.Batch,
	*containers.Batch,
	*containers.Batch,
	*containers.Batch)

func (*CheckpointData) GetSegBatchs

func (*CheckpointData) GetTblBatchs

func (*CheckpointData) PrefetchFrom added in v0.8.0

func (data *CheckpointData) PrefetchFrom(
	ctx context.Context,
	service fileservice.FileService,
	key objectio.Location) (err error)

func (*CheckpointData) PrintData

func (data *CheckpointData) PrintData()

func (*CheckpointData) ReadFrom

func (data *CheckpointData) ReadFrom(
	ctx context.Context,
	reader *blockio.BlockReader,
	m *mpool.MPool) (err error)

TODO: There need a global io pool

func (*CheckpointData) UpdateBlkMeta

func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)

func (*CheckpointData) UpdateSegMeta added in v0.8.0

func (data *CheckpointData) UpdateSegMeta(tid uint64, delStart, delEnd int32)

func (*CheckpointData) WriteTo

func (data *CheckpointData) WriteTo(
	writer *blockio.BlockWriter) (blks []objectio.BlockObject, err error)

type CheckpointMeta

type CheckpointMeta struct {
	// contains filtered or unexported fields
}

func NewCheckpointMeta

func NewCheckpointMeta() *CheckpointMeta

type Collector

type Collector interface {
	String() string
	Run()
	ScanInRange(from, to types.TS) (*DirtyTreeEntry, int)
	ScanInRangePruned(from, to types.TS) *DirtyTreeEntry
	IsCommitted(from, to types.TS) bool
	GetAndRefreshMerged() *DirtyTreeEntry
	Merge() *DirtyTreeEntry
	GetMaxLSN(from, to types.TS) uint64
	Init(maxts types.TS)
}

type DirtyEntryInterceptor

type DirtyEntryInterceptor = catalog.Processor

type DirtyTreeEntry

type DirtyTreeEntry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewDirtyTreeEntry

func NewDirtyTreeEntry(start, end types.TS, tree *model.Tree) *DirtyTreeEntry

func NewEmptyDirtyTreeEntry

func NewEmptyDirtyTreeEntry() *DirtyTreeEntry

func (*DirtyTreeEntry) GetTimeRange

func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)

func (*DirtyTreeEntry) GetTree

func (entry *DirtyTreeEntry) GetTree() (tree *model.Tree)

func (*DirtyTreeEntry) IsEmpty

func (entry *DirtyTreeEntry) IsEmpty() bool

func (*DirtyTreeEntry) Merge

func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)

func (*DirtyTreeEntry) String

func (entry *DirtyTreeEntry) String() string

type GlobalCollector added in v0.7.0

type GlobalCollector struct {
	*BaseCollector
	// contains filtered or unexported fields
}

func NewGlobalCollector added in v0.7.0

func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector

func (*GlobalCollector) VisitBlk added in v0.7.0

func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error

func (*GlobalCollector) VisitDB added in v0.7.0

func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error

func (*GlobalCollector) VisitSeg added in v0.7.0

func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error

func (*GlobalCollector) VisitTable added in v0.7.0

func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error

type IncrementalCollector

type IncrementalCollector struct {
	*BaseCollector
}

func NewIncrementalCollector

func NewIncrementalCollector(start, end types.TS) *IncrementalCollector

type Logtailer added in v0.7.0

type Logtailer interface {
	// RangeLogtail returns logtail for all tables within the range (from, to].
	// NOTE: caller should keep time range monotonous, or there would be a checkpoint.
	RangeLogtail(
		ctx context.Context, from, to timestamp.Timestamp,
	) ([]logtail.TableLogtail, []func(), error)

	RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

	// TableLogtail returns logtail for the specified table.
	//
	// NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value.
	TableLogtail(
		ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
	) (logtail.TableLogtail, func(), error)

	// Now is a time getter from TxnManager. Users of Logtailer should get a timestamp
	// from Now and use the timestamp to collect logtail, in that case, all txn prepared
	// before it are visible.
	Now() (timestamp.Timestamp, timestamp.Timestamp)
}

Logtailer provides logtail for the specified table.

type LogtailerImpl added in v0.7.0

type LogtailerImpl struct {
	// contains filtered or unexported fields
}

func NewLogtailer added in v0.7.0

func NewLogtailer(
	ctx context.Context,
	ckpClient CheckpointClient,
	mgr *Manager,
	c *catalog.Catalog) *LogtailerImpl

func (*LogtailerImpl) Now added in v0.8.0

Now is a time getter from TxnManager. Users of Logtailer should get a timestamp from Now and use the timestamp to collect logtail, in that case, all txn prepared before it are visible.

func (*LogtailerImpl) RangeLogtail added in v0.7.0

func (l *LogtailerImpl) RangeLogtail(
	ctx context.Context, from, to timestamp.Timestamp,
) ([]logtail.TableLogtail, []func(), error)

RangeLogtail returns logtail for all tables that are modified within the range (from, to]. Check out all dirty tables in the time window and collect logtails for every table

func (*LogtailerImpl) RegisterCallback added in v0.8.0

func (l *LogtailerImpl) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error)

func (*LogtailerImpl) TableLogtail added in v0.7.0

func (l *LogtailerImpl) TableLogtail(
	ctx context.Context, table api.TableID, from, to timestamp.Timestamp,
) (logtail.TableLogtail, func(), error)

TableLogtail returns logtail for the specified table. It boils down to calling `HandleSyncLogTailReq`

type Manager added in v0.7.0

type Manager struct {
	txnbase.NoopCommitListener
	// contains filtered or unexported fields
}

Logtail manager holds sorted txn handles. Its main jobs:

- Insert new txn handle - Efficiently iterate over arbitrary range of txn handles on a snapshot - Truncate unneceessary txn handles according to GC timestamp

func NewManager added in v0.7.0

func NewManager(rt *dbutils.Runtime, blockSize int, nowClock func() types.TS) *Manager

func (*Manager) GCByTS added in v0.7.0

func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)

func (*Manager) GetReader added in v0.7.0

func (mgr *Manager) GetReader(from, to types.TS) *Reader

GetReader get a snapshot of all txn prepared between from and to.

func (*Manager) GetTableOperator added in v0.7.0

func (mgr *Manager) GetTableOperator(
	from, to types.TS,
	catalog *catalog.Catalog,
	dbID, tableID uint64,
	scope Scope,
	visitor catalog.Processor,
) *BoundTableOperator

func (*Manager) OnEndPrePrepare added in v0.7.0

func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)

OnEndPrePrepare is a listener for TxnManager. When a txn completes PrePrepare, add it to the logtail manager

func (*Manager) OnEndPrepareWAL added in v0.8.0

func (mgr *Manager) OnEndPrepareWAL(txn txnif.AsyncTxn)

func (*Manager) RegisterCallback added in v0.8.0

func (mgr *Manager) RegisterCallback(cb func(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error) error

func (*Manager) Start added in v0.8.0

func (mgr *Manager) Start()

func (*Manager) Stop added in v0.8.0

func (mgr *Manager) Stop()

type Reader added in v0.7.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a snapshot of all txn prepared between from and to. Dirty tables/segments/blocks can be queried based on those txn

func (*Reader) GetDirty added in v0.7.0

func (r *Reader) GetDirty() (tree *model.Tree, count int)

Merge all dirty table/segment/block into one dirty tree

func (*Reader) GetDirtyByTable added in v0.7.0

func (r *Reader) GetDirtyByTable(
	dbID, id uint64,
) (tree *model.TableTree)

Merge all dirty table/segment/block of **a table** into one tree

func (*Reader) GetMaxLSN added in v0.7.0

func (r *Reader) GetMaxLSN() (maxLsn uint64)

TODO: optimize

func (*Reader) HasCatalogChanges added in v0.7.0

func (r *Reader) HasCatalogChanges() bool

HasCatalogChanges returns true if any txn in the reader modified the Catalog

func (*Reader) IsCommitted added in v0.8.0

func (r *Reader) IsCommitted() bool

type RespBuilder

type RespBuilder interface {
	catalog.Processor
	BuildResp() (api.SyncLogTailResp, error)
	Close()
}

type RowT added in v0.7.0

type RowT = *txnRow

type Scope

type Scope = int
const (
	// changes for mo_databases
	ScopeDatabases Scope = iota + 1
	// changes for mo_tables
	ScopeTables
	// changes for mo_columns
	ScopeColumns
	// changes for user tables
	ScopeUserTables
)

func DecideTableScope added in v0.7.0

func DecideTableScope(tableID uint64) Scope

type TableLogtailRespBuilder

type TableLogtailRespBuilder struct {
	*catalog.LoopProcessor
	// contains filtered or unexported fields
}

CatalogLogtailRespBuilder knows how to make api-entry from block entry. impl catalog.Processor interface, driven by BoundTableOperator

func NewTableLogtailRespBuilder

func NewTableLogtailRespBuilder(ctx context.Context, ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder

func (*TableLogtailRespBuilder) BuildResp

func (*TableLogtailRespBuilder) Close

func (b *TableLogtailRespBuilder) Close()

func (*TableLogtailRespBuilder) VisitBlk

func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error

VisitBlk = catalog.Processor.OnBlock

func (*TableLogtailRespBuilder) VisitSeg added in v0.8.0

type TableRespKind added in v0.8.0

type TableRespKind int
const (
	TableRespKind_Data TableRespKind = iota
	TableRespKind_Blk
	TableRespKind_Seg
)

type TxnLogtailRespBuilder added in v0.8.0

type TxnLogtailRespBuilder struct {
	// contains filtered or unexported fields
}

func NewTxnLogtailRespBuilder added in v0.8.0

func NewTxnLogtailRespBuilder(rt *dbutils.Runtime) *TxnLogtailRespBuilder

func (*TxnLogtailRespBuilder) BuildResp added in v0.8.0

func (b *TxnLogtailRespBuilder) BuildResp()

func (*TxnLogtailRespBuilder) Close added in v0.8.0

func (b *TxnLogtailRespBuilder) Close()

func (*TxnLogtailRespBuilder) CollectLogtail added in v0.8.0

func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())

type TxnTable added in v0.7.0

type TxnTable struct {
	*model.AOT[BlockT, RowT]
}

func NewTxnTable added in v0.7.0

func NewTxnTable(blockSize int, nowClock func() types.TS) *TxnTable

func (*TxnTable) AddTxn added in v0.7.0

func (table *TxnTable) AddTxn(txn txnif.AsyncTxn) (err error)

func (*TxnTable) ForeachRowInBetween added in v0.7.0

func (table *TxnTable) ForeachRowInBetween(
	from, to types.TS,
	skipBlkOp func(blk BlockT) bool,
	rowOp func(row RowT) (goNext bool),
) (readRows int)

func (*TxnTable) TruncateByTimeStamp added in v0.7.0

func (table *TxnTable) TruncateByTimeStamp(ts types.TS) (cnt int)

Directories

Path Synopsis
This package implements client and server for logtail push model.
This package implements client and server for logtail push model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL