Documentation ¶
Index ¶
- Constants
- Variables
- func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool, lvl zapcore.Level) string
- func GlobalCheckpointDataFactory(end types.TS, versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error)
- func HandleSyncLogTailReq(ctx context.Context, ckpClient CheckpointClient, mgr *Manager, ...) (resp api.SyncLogTailResp, err error)
- func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)
- func LoadCheckpointEntries(ctx context.Context, metLoc string, tableID uint64, tableName string, ...) (entries []*api.Entry, err error)
- func NewDirtyCollector(sourcer *Manager, clock clock.Clock, catalog *catalog.Catalog, ...) *dirtyCollector
- func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
- type BaseCollector
- func (collector *BaseCollector) Close()
- func (collector *BaseCollector) OrphanData() *CheckpointData
- func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
- func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
- func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
- type BaseOperator
- type BlockT
- type BoundOperator
- type BoundTableOperator
- type CatalogLogtailRespBuilder
- type CheckpointClient
- type CheckpointData
- func (data *CheckpointData) ApplyReplayTo(c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
- func (data *CheckpointData) Close()
- func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetSegBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)
- func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
- func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, ...)
- func (data *CheckpointData) PrintData()
- func (data *CheckpointData) ReadFrom(reader *blockio.Reader, scheduler tasks.JobScheduler, m *mpool.MPool) (err error)
- func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
- func (data *CheckpointData) WriteTo(writer *blockio.Writer) (blks []objectio.BlockObject, err error)
- type CheckpointMeta
- type Collector
- type DirtyEntryInterceptor
- type DirtyTreeEntry
- type GlobalCollector
- func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
- func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
- func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
- type IncrementalCollector
- type Logtailer
- type LogtailerImpl
- type Manager
- func (mgr *Manager) GCByTS(ctx context.Context, ts types.TS)
- func (mgr *Manager) GetReader(from, to types.TS) *Reader
- func (mgr *Manager) GetTableOperator(from, to types.TS, catalog *catalog.Catalog, dbID, tableID uint64, scope Scope, ...) *BoundTableOperator
- func (mgr *Manager) OnEndPrePrepare(txn txnif.AsyncTxn)
- type Reader
- type RespBuilder
- type RowT
- type Scope
- type TableLogtailRespBuilder
- type TxnTable
Constants ¶
View Source
const ( SnapshotAttr_SegID = catalog.SnapshotAttr_SegID SnapshotAttr_TID = catalog.SnapshotAttr_TID SnapshotAttr_DBID = catalog.SnapshotAttr_DBID SegmentAttr_ID = catalog.SegmentAttr_ID SegmentAttr_CreateAt = catalog.SegmentAttr_CreateAt SegmentAttr_State = catalog.SegmentAttr_State SegmentAttr_Sorted = catalog.SegmentAttr_Sorted SnapshotAttr_BlockMaxRow = catalog.SnapshotAttr_BlockMaxRow SnapshotAttr_SegmentMaxBlock = catalog.SnapshotAttr_SegmentMaxBlock SnapshotMetaAttr_Tid = "table_id" SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start" SnapshotMetaAttr_BlockInsertBatchEnd = "block_insert_batch_end" SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start" SnapshotMetaAttr_BlockDeleteBatchEnd = "block_delete_batch_end" )
View Source
const ( MetaIDX uint16 = iota DBInsertIDX DBInsertTxnIDX DBDeleteIDX DBDeleteTxnIDX TBLInsertIDX TBLInsertTxnIDX TBLDeleteIDX TBLDeleteTxnIDX TBLColInsertIDX TBLColDeleteIDX SEGInsertIDX SEGInsertTxnIDX SEGDeleteIDX SEGDeleteTxnIDX BLKMetaInsertIDX BLKMetaInsertTxnIDX BLKMetaDeleteIDX BLKMetaDeleteTxnIDX BLKDNMetaInsertIDX BLKDNMetaInsertTxnIDX BLKDNMetaDeleteIDX BLKDNMetaDeleteTxnIDX BLKCNMetaInsertIDX BLKInsertIDX BLKInsertTxnIDX BLKDeleteIDX BLKDeleteTxnIDX )
View Source
const MaxIDX = BLKCNMetaInsertIDX + 1
View Source
const PrintN = 3
View Source
const Size90M = 90 * 1024 * 1024
Variables ¶
View Source
var ( // for blk meta response BlkMetaSchema *catalog.Schema DelSchema *catalog.Schema SegSchema *catalog.Schema TxnNodeSchema *catalog.Schema DBDNSchema *catalog.Schema TblDNSchema *catalog.Schema SegDNSchema *catalog.Schema BlkDNSchema *catalog.Schema MetaSchema *catalog.Schema )
View Source
var ( SegmentSchemaAttr = []string{ SegmentAttr_ID, SegmentAttr_CreateAt, SegmentAttr_State, SegmentAttr_Sorted, } SegmentSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_bool, 0, 0, 0), types.New(types.T_bool, 0, 0, 0), } TxnNodeSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, } TxnNodeSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), } DBDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } DBDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), } TblDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_BlockMaxRow, SnapshotAttr_SegmentMaxBlock, } TblDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint16, 0, 0, 0), } SegmentDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } SegmentDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), } BlockDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_SegID, pkgcatalog.BlockMeta_MetaLoc, pkgcatalog.BlockMeta_DeltaLoc, } BlockDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0, 0), } MetaSchemaAttr = []string{ SnapshotMetaAttr_Tid, SnapshotMetaAttr_BlockInsertBatchStart, SnapshotMetaAttr_BlockInsertBatchEnd, SnapshotMetaAttr_BlockDeleteBatchStart, SnapshotMetaAttr_BlockDeleteBatchEnd, } MetaShcemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), } BaseAttr = []string{ catalog.AttrRowID, catalog.AttrCommitTs, } BaseTypes = []types.Type{ types.T_Rowid.ToType(), types.T_TS.ToType(), } )
Functions ¶
func BatchToString ¶
func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func DebugBatchToString ¶
func GlobalCheckpointDataFactory ¶ added in v0.7.0
func HandleSyncLogTailReq ¶
func HandleSyncLogTailReq( ctx context.Context, ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog, req api.SyncLogTailReq, canRetry bool) (resp api.SyncLogTailResp, err error)
func LoadCheckpointEntries ¶
func NewDirtyCollector ¶
func ToStringTemplate ¶
func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
Types ¶
type BaseCollector ¶ added in v0.7.0
type BaseCollector struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func (*BaseCollector) Close ¶ added in v0.7.0
func (collector *BaseCollector) Close()
func (*BaseCollector) OrphanData ¶ added in v0.7.0
func (collector *BaseCollector) OrphanData() *CheckpointData
func (*BaseCollector) VisitBlk ¶ added in v0.7.0
func (collector *BaseCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
func (*BaseCollector) VisitDB ¶ added in v0.7.0
func (collector *BaseCollector) VisitDB(entry *catalog.DBEntry) error
func (*BaseCollector) VisitSeg ¶ added in v0.7.0
func (collector *BaseCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
func (*BaseCollector) VisitTable ¶ added in v0.7.0
func (collector *BaseCollector) VisitTable(entry *catalog.TableEntry) (err error)
type BaseOperator ¶
type BaseOperator struct {
// contains filtered or unexported fields
}
BoundTableOperator holds a read only reader, knows how to iter entry. Drive a entry visitor, which acts as an api resp builder
type BoundOperator ¶
type BoundOperator struct { *BaseOperator // contains filtered or unexported fields }
func NewBoundOperator ¶
func (*BoundOperator) Run ¶
func (op *BoundOperator) Run() (err error)
type BoundTableOperator ¶
type BoundTableOperator struct { *BoundOperator // contains filtered or unexported fields }
func NewBoundTableOperator ¶
func (*BoundTableOperator) Run ¶
func (c *BoundTableOperator) Run() error
type CatalogLogtailRespBuilder ¶
type CatalogLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from catalog entry impl catalog.Processor interface, driven by LogtailCollector
func NewCatalogLogtailRespBuilder ¶
func NewCatalogLogtailRespBuilder(scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder
func (*CatalogLogtailRespBuilder) BuildResp ¶
func (b *CatalogLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*CatalogLogtailRespBuilder) Close ¶
func (b *CatalogLogtailRespBuilder) Close()
func (*CatalogLogtailRespBuilder) VisitDB ¶
func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error
func (*CatalogLogtailRespBuilder) VisitTbl ¶
func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error
type CheckpointClient ¶
type CheckpointData ¶
type CheckpointData struct {
// contains filtered or unexported fields
}
func NewCheckpointData ¶
func NewCheckpointData() *CheckpointData
func (*CheckpointData) ApplyReplayTo ¶
func (data *CheckpointData) ApplyReplayTo( c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
func (*CheckpointData) Close ¶
func (data *CheckpointData) Close()
func (*CheckpointData) GetBlkBatchs ¶
func (data *CheckpointData) GetBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDBBatchs ¶
func (data *CheckpointData) GetDBBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDNBlkBatchs ¶
func (data *CheckpointData) GetDNBlkBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetSegBatchs ¶
func (data *CheckpointData) GetSegBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTableData ¶
func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)
func (*CheckpointData) GetTableMeta ¶
func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
func (*CheckpointData) GetTblBatchs ¶
func (data *CheckpointData) GetTblBatchs() ( *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) PrintData ¶
func (data *CheckpointData) PrintData()
func (*CheckpointData) ReadFrom ¶
func (data *CheckpointData) ReadFrom( reader *blockio.Reader, scheduler tasks.JobScheduler, m *mpool.MPool) (err error)
TODO: There need a global io pool
func (*CheckpointData) UpdateBlkMeta ¶
func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
func (*CheckpointData) WriteTo ¶
func (data *CheckpointData) WriteTo( writer *blockio.Writer) (blks []objectio.BlockObject, err error)
type CheckpointMeta ¶
type CheckpointMeta struct {
// contains filtered or unexported fields
}
func NewCheckpointMeta ¶
func NewCheckpointMeta() *CheckpointMeta
type Collector ¶
type Collector interface { String() string Run() ScanInRange(from, to types.TS) (*DirtyTreeEntry, int) ScanInRangePruned(from, to types.TS) *DirtyTreeEntry GetAndRefreshMerged() *DirtyTreeEntry Merge() *DirtyTreeEntry GetMaxLSN(from, to types.TS) uint64 Init(maxts types.TS) }
type DirtyEntryInterceptor ¶
type DirtyTreeEntry ¶
func NewDirtyTreeEntry ¶
func NewDirtyTreeEntry(start, end types.TS, tree *common.Tree) *DirtyTreeEntry
func NewEmptyDirtyTreeEntry ¶
func NewEmptyDirtyTreeEntry() *DirtyTreeEntry
func (*DirtyTreeEntry) GetTimeRange ¶
func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)
func (*DirtyTreeEntry) GetTree ¶
func (entry *DirtyTreeEntry) GetTree() (tree *common.Tree)
func (*DirtyTreeEntry) IsEmpty ¶
func (entry *DirtyTreeEntry) IsEmpty() bool
func (*DirtyTreeEntry) Merge ¶
func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)
func (*DirtyTreeEntry) String ¶
func (entry *DirtyTreeEntry) String() string
type GlobalCollector ¶ added in v0.7.0
type GlobalCollector struct { *BaseCollector // contains filtered or unexported fields }
func NewGlobalCollector ¶ added in v0.7.0
func NewGlobalCollector(end types.TS, versionInterval time.Duration) *GlobalCollector
func (*GlobalCollector) VisitBlk ¶ added in v0.7.0
func (collector *GlobalCollector) VisitBlk(entry *catalog.BlockEntry) error
func (*GlobalCollector) VisitDB ¶ added in v0.7.0
func (collector *GlobalCollector) VisitDB(entry *catalog.DBEntry) error
func (*GlobalCollector) VisitSeg ¶ added in v0.7.0
func (collector *GlobalCollector) VisitSeg(entry *catalog.SegmentEntry) error
func (*GlobalCollector) VisitTable ¶ added in v0.7.0
func (collector *GlobalCollector) VisitTable(entry *catalog.TableEntry) error
type IncrementalCollector ¶
type IncrementalCollector struct {
*BaseCollector
}
func NewIncrementalCollector ¶
func NewIncrementalCollector(start, end types.TS) *IncrementalCollector
type Logtailer ¶ added in v0.7.0
type Logtailer interface { // RangeLogtail returns logtail for all tables within the range (from, to]. RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, error) // TableLogtail returns logtail for the specified table. // // NOTE: If table not exist, logtail.TableLogtail shouldn't be a simple zero value. TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, error) }
Logtailer provides logtail for the specified table.
type LogtailerImpl ¶ added in v0.7.0
type LogtailerImpl struct {
// contains filtered or unexported fields
}
func NewLogtailer ¶ added in v0.7.0
func NewLogtailer( ckpClient CheckpointClient, mgr *Manager, c *catalog.Catalog) *LogtailerImpl
func (*LogtailerImpl) RangeLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) RangeLogtail( ctx context.Context, from, to timestamp.Timestamp, ) ([]logtail.TableLogtail, error)
RangeLogtail returns logtail for all tables within the range (from, to].
func (*LogtailerImpl) TableLogtail ¶ added in v0.7.0
func (l *LogtailerImpl) TableLogtail( ctx context.Context, table api.TableID, from, to timestamp.Timestamp, ) (logtail.TableLogtail, error)
TableLogtail returns logtail for the specified table.
type Manager ¶ added in v0.7.0
type Manager struct { txnbase.NoopCommitListener // contains filtered or unexported fields }
func (*Manager) GetTableOperator ¶ added in v0.7.0
func (*Manager) OnEndPrePrepare ¶ added in v0.7.0
type Reader ¶ added in v0.7.0
type Reader struct {
// contains filtered or unexported fields
}
func (*Reader) GetDirtyByTable ¶ added in v0.7.0
func (*Reader) HasCatalogChanges ¶ added in v0.7.0
type RespBuilder ¶
type RespBuilder interface { catalog.Processor BuildResp() (api.SyncLogTailResp, error) Close() }
type TableLogtailRespBuilder ¶
type TableLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func NewTableLogtailRespBuilder ¶
func NewTableLogtailRespBuilder(ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder
func (*TableLogtailRespBuilder) BuildResp ¶
func (b *TableLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*TableLogtailRespBuilder) Close ¶
func (b *TableLogtailRespBuilder) Close()
func (*TableLogtailRespBuilder) VisitBlk ¶
func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error
type TxnTable ¶ added in v0.7.0
func NewTxnTable ¶ added in v0.7.0
func NewTxnTable(blockSize int, clock *types.TsAlloctor) *TxnTable
func (*TxnTable) ForeachRowInBetween ¶ added in v0.7.0
Source Files ¶
Click to show internal directories.
Click to hide internal directories.