Documentation
¶
Index ¶
- Constants
- Variables
- func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
- func HandleSyncLogTailReq(ckpClient CheckpointClient, mgr *LogtailMgr, c *catalog.Catalog, ...) (resp api.SyncLogTailResp, err error)
- func IncrementalCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) (*CheckpointData, error)
- func LoadCheckpointEntries(metLoc string, tableID uint64, tableName string, dbID uint64, dbName string, ...) (entries []*api.Entry, err error)
- func NewDirtyCollector(sourcer *LogtailMgr, clock clock.Clock, catalog *catalog.Catalog, ...) *dirtyCollector
- func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
- type BaseOperator
- type BoundOperator
- type BoundTableOperator
- type CatalogLogtailRespBuilder
- type CheckpointClient
- type CheckpointData
- func (data *CheckpointData) ApplyReplayTo(c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
- func (data *CheckpointData) Close()
- func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetDNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetSegBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
- func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)
- func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
- func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, ...)
- func (data *CheckpointData) PrintData()
- func (data *CheckpointData) ReadFrom(reader *blockio.Reader, m *mpool.MPool) (err error)
- func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
- func (data *CheckpointData) WriteTo(writer *blockio.Writer) (blks []objectio.BlockObject, err error)
- type CheckpointMeta
- type Collector
- type DirtyEntryInterceptor
- type DirtyTreeEntry
- type IncrementalCollector
- func (collector *IncrementalCollector) Close()
- func (collector *IncrementalCollector) OrphanData() *CheckpointData
- func (collector *IncrementalCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
- func (collector *IncrementalCollector) VisitDB(entry *catalog.DBEntry) error
- func (collector *IncrementalCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
- func (collector *IncrementalCollector) VisitTable(entry *catalog.TableEntry) (err error)
- type LogtailMgr
- func (l *LogtailMgr) AddTxn(txn txnif.AsyncTxn)
- func (l *LogtailMgr) DecideScope(tableID uint64) Scope
- func (l *LogtailMgr) GetReader(start, end types.TS) *LogtailReader
- func (l *LogtailMgr) GetTableOperator(start, end types.TS, catalog *catalog.Catalog, dbID, tableID uint64, ...) *BoundTableOperator
- func (l *LogtailMgr) OnEndPrePrepare(txn txnif.AsyncTxn)
- type LogtailReader
- type RespBuilder
- type Scope
- type TableLogtailRespBuilder
Constants ¶
View Source
const ( SnapshotAttr_SegID = catalog.SnapshotAttr_SegID SnapshotAttr_TID = catalog.SnapshotAttr_TID SnapshotAttr_DBID = catalog.SnapshotAttr_DBID SegmentAttr_ID = catalog.SegmentAttr_ID SegmentAttr_CreateAt = catalog.SegmentAttr_CreateAt SegmentAttr_State = catalog.SegmentAttr_State SnapshotAttr_BlockMaxRow = catalog.SnapshotAttr_BlockMaxRow SnapshotAttr_SegmentMaxBlock = catalog.SnapshotAttr_SegmentMaxBlock SnapshotMetaAttr_Tid = "table_id" SnapshotMetaAttr_BlockInsertBatchStart = "block_insert_batch_start" SnapshotMetaAttr_BlockInsertBatchEnd = "block_insert_batch_end" SnapshotMetaAttr_BlockDeleteBatchStart = "block_delete_batch_start" SnapshotMetaAttr_BlockDeleteBatchEnd = "block_delete_batch_end" )
View Source
const PrintN = 3
View Source
const Size90M = 80 * 1024 * 1024
Variables ¶
View Source
var ( // for blk meta response BlkMetaSchema *catalog.Schema DelSchema *catalog.Schema SegSchema *catalog.Schema TxnNodeSchema *catalog.Schema DBDNSchema *catalog.Schema TblDNSchema *catalog.Schema SegDNSchema *catalog.Schema BlkDNSchema *catalog.Schema MetaSchema *catalog.Schema )
View Source
var ( SegmentSchemaAttr = []string{ SegmentAttr_ID, SegmentAttr_CreateAt, SegmentAttr_State, } SegmentSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_bool, 0, 0, 0), } TxnNodeSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, } TxnNodeSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), } DBDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } DBDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), } TblDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_BlockMaxRow, SnapshotAttr_SegmentMaxBlock, } TblDNSchemaType = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint16, 0, 0, 0), } SegmentDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, } SegmentDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), } BlockDNSchemaAttr = []string{ txnbase.SnapshotAttr_LogIndex_LSN, txnbase.SnapshotAttr_StartTS, txnbase.SnapshotAttr_PrepareTS, txnbase.SnapshotAttr_CommitTS, txnbase.SnapshotAttr_LogIndex_CSN, txnbase.SnapshotAttr_LogIndex_Size, SnapshotAttr_DBID, SnapshotAttr_TID, SnapshotAttr_SegID, pkgcatalog.BlockMeta_MetaLoc, pkgcatalog.BlockMeta_DeltaLoc, } BlockDNSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_TS, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint32, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), types.New(types.T_varchar, 0, 0, 0), types.New(types.T_varchar, 0, 0, 0), } MetaSchemaAttr = []string{ SnapshotMetaAttr_Tid, SnapshotMetaAttr_BlockInsertBatchStart, SnapshotMetaAttr_BlockInsertBatchEnd, SnapshotMetaAttr_BlockDeleteBatchStart, SnapshotMetaAttr_BlockDeleteBatchEnd, } MetaShcemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), types.New(types.T_int32, 0, 0, 0), } BaseAttr = []string{ catalog.AttrRowID, catalog.AttrCommitTs, } BaseTypes = []types.Type{ types.T_Rowid.ToType(), types.T_TS.ToType(), } )
Functions ¶
func BatchToString ¶
func BatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func DebugBatchToString ¶
func DebugBatchToString(name string, bat *containers.Batch, isSpecialRowID bool) string
func HandleSyncLogTailReq ¶
func HandleSyncLogTailReq( ckpClient CheckpointClient, mgr *LogtailMgr, c *catalog.Catalog, req api.SyncLogTailReq, canRetry bool) (resp api.SyncLogTailResp, err error)
func LoadCheckpointEntries ¶
func LoadCheckpointEntries( metLoc string, tableID uint64, tableName string, dbID uint64, dbName string, fs fileservice.FileService) (entries []*api.Entry, err error)
func NewDirtyCollector ¶
func NewDirtyCollector( sourcer *LogtailMgr, clock clock.Clock, catalog *catalog.Catalog, interceptor DirtyEntryInterceptor) *dirtyCollector
func ToStringTemplate ¶
func ToStringTemplate(vec containers.Vector, printN int, opts ...common.TypePrintOpt) string
Types ¶
type BaseOperator ¶
type BaseOperator struct {
// contains filtered or unexported fields
}
BoundTableOperator holds a read only reader, knows how to iter entry. Drive a entry visitor, which acts as an api resp builder
type BoundOperator ¶
type BoundOperator struct { *BaseOperator // contains filtered or unexported fields }
func NewBoundOperator ¶
func NewBoundOperator(catalog *catalog.Catalog, reader *LogtailReader, visitor catalog.Processor) *BoundOperator
func (*BoundOperator) Run ¶
func (op *BoundOperator) Run() (err error)
type BoundTableOperator ¶
type BoundTableOperator struct { *BoundOperator // contains filtered or unexported fields }
func NewBoundTableOperator ¶
func NewBoundTableOperator(catalog *catalog.Catalog, reader *LogtailReader, scope Scope, dbID, tableID uint64, visitor catalog.Processor) *BoundTableOperator
func (*BoundTableOperator) Run ¶
func (c *BoundTableOperator) Run() error
type CatalogLogtailRespBuilder ¶
type CatalogLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
CatalogLogtailRespBuilder knows how to make api-entry from catalog entry impl catalog.Processor interface, driven by LogtailCollector
func NewCatalogLogtailRespBuilder ¶
func NewCatalogLogtailRespBuilder(scope Scope, ckp string, start, end types.TS) *CatalogLogtailRespBuilder
func (*CatalogLogtailRespBuilder) BuildResp ¶
func (b *CatalogLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*CatalogLogtailRespBuilder) Close ¶
func (b *CatalogLogtailRespBuilder) Close()
func (*CatalogLogtailRespBuilder) VisitDB ¶
func (b *CatalogLogtailRespBuilder) VisitDB(entry *catalog.DBEntry) error
func (*CatalogLogtailRespBuilder) VisitTbl ¶
func (b *CatalogLogtailRespBuilder) VisitTbl(entry *catalog.TableEntry) error
type CheckpointClient ¶
type CheckpointData ¶
type CheckpointData struct {
// contains filtered or unexported fields
}
func NewCheckpointData ¶
func NewCheckpointData() *CheckpointData
func (*CheckpointData) ApplyReplayTo ¶
func (data *CheckpointData) ApplyReplayTo( c *catalog.Catalog, dataFactory catalog.DataFactory) (err error)
func (*CheckpointData) Close ¶
func (data *CheckpointData) Close()
func (*CheckpointData) GetBlkBatchs ¶
func (data *CheckpointData) GetBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDBBatchs ¶
func (data *CheckpointData) GetDBBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetDNBlkBatchs ¶
func (data *CheckpointData) GetDNBlkBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetSegBatchs ¶
func (data *CheckpointData) GetSegBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) GetTableData ¶
func (data *CheckpointData) GetTableData(tid uint64) (ins, del, cnIns *api.Batch, err error)
func (*CheckpointData) GetTableMeta ¶
func (data *CheckpointData) GetTableMeta(tableID uint64) (meta *CheckpointMeta)
func (*CheckpointData) GetTblBatchs ¶
func (data *CheckpointData) GetTblBatchs() (*containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch, *containers.Batch)
func (*CheckpointData) PrintData ¶
func (data *CheckpointData) PrintData()
func (*CheckpointData) UpdateBlkMeta ¶
func (data *CheckpointData) UpdateBlkMeta(tid uint64, insStart, insEnd, delStart, delEnd int32)
func (*CheckpointData) WriteTo ¶
func (data *CheckpointData) WriteTo( writer *blockio.Writer) (blks []objectio.BlockObject, err error)
type CheckpointMeta ¶
type CheckpointMeta struct {
// contains filtered or unexported fields
}
func NewCheckpointMeta ¶
func NewCheckpointMeta() *CheckpointMeta
type Collector ¶
type Collector interface { String() string Run() ScanInRange(from, to types.TS) (*DirtyTreeEntry, int) ScanInRangePruned(from, to types.TS) *DirtyTreeEntry GetAndRefreshMerged() *DirtyTreeEntry Merge() *DirtyTreeEntry GetMaxLSN(from, to types.TS) uint64 Init(maxts types.TS) }
type DirtyEntryInterceptor ¶
type DirtyTreeEntry ¶
func NewDirtyTreeEntry ¶
func NewDirtyTreeEntry(start, end types.TS, tree *common.Tree) *DirtyTreeEntry
func NewEmptyDirtyTreeEntry ¶
func NewEmptyDirtyTreeEntry() *DirtyTreeEntry
func (*DirtyTreeEntry) GetTimeRange ¶
func (entry *DirtyTreeEntry) GetTimeRange() (from, to types.TS)
func (*DirtyTreeEntry) GetTree ¶
func (entry *DirtyTreeEntry) GetTree() (tree *common.Tree)
func (*DirtyTreeEntry) IsEmpty ¶
func (entry *DirtyTreeEntry) IsEmpty() bool
func (*DirtyTreeEntry) Merge ¶
func (entry *DirtyTreeEntry) Merge(o *DirtyTreeEntry)
func (*DirtyTreeEntry) String ¶
func (entry *DirtyTreeEntry) String() string
type IncrementalCollector ¶
type IncrementalCollector struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func NewIncrementalCollector ¶
func NewIncrementalCollector(start, end types.TS) *IncrementalCollector
func (*IncrementalCollector) Close ¶
func (collector *IncrementalCollector) Close()
func (*IncrementalCollector) OrphanData ¶
func (collector *IncrementalCollector) OrphanData() *CheckpointData
func (*IncrementalCollector) VisitBlk ¶
func (collector *IncrementalCollector) VisitBlk(entry *catalog.BlockEntry) (err error)
func (*IncrementalCollector) VisitDB ¶
func (collector *IncrementalCollector) VisitDB(entry *catalog.DBEntry) error
func (*IncrementalCollector) VisitSeg ¶
func (collector *IncrementalCollector) VisitSeg(entry *catalog.SegmentEntry) (err error)
func (*IncrementalCollector) VisitTable ¶
func (collector *IncrementalCollector) VisitTable(entry *catalog.TableEntry) (err error)
type LogtailMgr ¶
type LogtailMgr struct { txnbase.NoopCommitListener // Lock is used to protect pages. there are three cases to hold lock // 1. activePage is full and moves to pages // 2. prune txn because of checkpoint TODO // 3. copy btree // Not RwLock because a copied btree can be read without holding read lock sync.Mutex // contains filtered or unexported fields }
func NewLogtailMgr ¶
func NewLogtailMgr(pageSize int32, clock clock.Clock) *LogtailMgr
func (*LogtailMgr) AddTxn ¶
func (l *LogtailMgr) AddTxn(txn txnif.AsyncTxn)
Notes: 1. AddTxn happens in a queue, it is safe to assume there is no concurrent AddTxn now. 2. the added txn has no prepareTS because it happens in OnEndPrePrepare, so it is safe to alloc ts to be minTs
func (*LogtailMgr) DecideScope ¶
func (l *LogtailMgr) DecideScope(tableID uint64) Scope
func (*LogtailMgr) GetReader ¶
func (l *LogtailMgr) GetReader(start, end types.TS) *LogtailReader
GetReader returns a read only reader of txns at call time. this is cheap operation, the returned reader can be accessed without any locks
func (*LogtailMgr) GetTableOperator ¶
func (l *LogtailMgr) GetTableOperator(start, end types.TS, catalog *catalog.Catalog, dbID, tableID uint64, scope Scope, visitor catalog.Processor) *BoundTableOperator
func (*LogtailMgr) OnEndPrePrepare ¶
func (l *LogtailMgr) OnEndPrePrepare(txn txnif.AsyncTxn)
LogtailMgr as a commit listener
type LogtailReader ¶
type LogtailReader struct {
// contains filtered or unexported fields
}
a read only view of txns
func (*LogtailReader) GetDirtyByTable ¶
func (v *LogtailReader) GetDirtyByTable(dbID, id uint64) (tree *common.TableTree)
func (*LogtailReader) GetMaxLSN ¶
func (v *LogtailReader) GetMaxLSN() (maxLsn uint64)
func (*LogtailReader) HasCatalogChanges ¶
func (v *LogtailReader) HasCatalogChanges() bool
type RespBuilder ¶
type RespBuilder interface { catalog.Processor BuildResp() (api.SyncLogTailResp, error) Close() }
type TableLogtailRespBuilder ¶
type TableLogtailRespBuilder struct { *catalog.LoopProcessor // contains filtered or unexported fields }
func NewTableLogtailRespBuilder ¶
func NewTableLogtailRespBuilder(ckp string, start, end types.TS, tbl *catalog.TableEntry) *TableLogtailRespBuilder
func (*TableLogtailRespBuilder) BuildResp ¶
func (b *TableLogtailRespBuilder) BuildResp() (api.SyncLogTailResp, error)
func (*TableLogtailRespBuilder) Close ¶
func (b *TableLogtailRespBuilder) Close()
func (*TableLogtailRespBuilder) VisitBlk ¶
func (b *TableLogtailRespBuilder) VisitBlk(entry *catalog.BlockEntry) error
Click to show internal directories.
Click to hide internal directories.