Documentation ¶
Index ¶
- Constants
- Variables
- func CheckTxnIsValid(txnOp client.TxnOperator) (err error)
- func CompileFilterExpr(expr *plan.Expr, proc *process.Process, tableDef *plan.TableDef, ...) (fastFilterOp FastFilterOp, loadOp LoadOp, objectFilterOp ObjectFilterOp, ...)
- func CompileFilterExprs(exprs []*plan.Expr, proc *process.Process, tableDef *plan.TableDef, ...) (fastFilterOp FastFilterOp, loadOp LoadOp, objectFilterOp ObjectFilterOp, ...)
- func ConstructObjStatsByLoadObjMeta(ctx context.Context, metaLoc objectio.Location, fs fileservice.FileService) (stats objectio.ObjectStats, dataMeta objectio.ObjectDataMeta, err error)
- func ExecuteBlockFilter(ctx context.Context, tbl *txnTable, txnOffset int, ...) (err error)
- func ForeachBlkInObjStatsList(next bool, dataMeta objectio.ObjectDataMeta, ...)
- func ForeachCommittedObjects(createObjs map[objectio.ObjectNameShort]struct{}, ...) (err error)
- func ForeachSnapshotObjects(ts timestamp.Timestamp, ...) (err error)
- func ForeachVisibleDataObject(state *logtailreplay.PartitionState, ts types.TS, ...) (err error)
- func LinearSearchOffsetByValFactory(pk *vector.Vector) func(*vector.Vector) []int32
- func ListTnService(appendFn func(service *metadata.TNService))
- func NewMergeReader(readers []engine.Reader) *mergeReader
- func TryFastFilterBlocks(ctx context.Context, tbl *txnTable, txnOffset int, ...) (ok bool, err error)
- func UpdateStats(ctx context.Context, req *updateStatsRequest, executor ConcurrentExecutor) error
- type BasePKFilter
- type BlockFilterOp
- type ConcurrentExecutor
- type DNStore
- type Engine
- func (e *Engine) AllocateIDByKey(ctx context.Context, key string) (uint64, error)
- func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error
- func (e *Engine) Database(ctx context.Context, name string, op client.TxnOperator) (engine.Database, error)
- func (e *Engine) DatabaseByAccountID(accountID uint32, name string, op client.TxnOperator) (engine.Database, error)
- func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error)
- func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) (err error)
- func (e *Engine) GetNameById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName string, tblName string, err error)
- func (e *Engine) GetRelationById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName, tableName string, rel engine.Relation, err error)
- func (e *Engine) Hints() (h engine.Hints)
- func (e *Engine) InitLogTailPushModel(ctx context.Context, timestampWaiter client.TimestampWaiter) error
- func (e *Engine) New(ctx context.Context, op client.TxnOperator) error
- func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Timestamp, expr *plan.Expr, ...) ([]engine.Reader, error)
- func (e *Engine) Nodes(isInternal bool, tenant string, username string, cnLabel map[string]string) (engine.Nodes, error)
- func (e *Engine) PushClient() *PushClient
- func (e *Engine) Stats(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo
- func (e *Engine) TryToSubscribeTable(ctx context.Context, dbID, tbID uint64) error
- func (e *Engine) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error
- func (e *Engine) UpdateOfPush(ctx context.Context, databaseId, tableId uint64, ts timestamp.Timestamp) error
- type Entry
- type FastFilterOp
- type GlobalStats
- type GlobalStatsConfig
- type GlobalStatsOption
- type IDGenerator
- type InMemPKFilter
- type LoadOp
- type LoadOpFactory
- type ObjectFilterOp
- type PKFilters
- type PartitionReader
- func (p *PartitionReader) Close() error
- func (p *PartitionReader) GetOrderBy() []*plan.OrderBySpec
- func (p *PartitionReader) Read(_ context.Context, colNames []string, _ *plan.Expr, mp *mpool.MPool, ...) (result *batch.Batch, err error)
- func (p *PartitionReader) SetFilterZM(objectio.ZoneMap)
- func (p *PartitionReader) SetOrderBy([]*plan.OrderBySpec)
- type Pos
- type PushClient
- type SeekFirstBlockOp
- type State
- type StatsBlkIter
- type SubTableID
- type SubTableStatus
- type SubscribeState
- type Transaction
- func (txn *Transaction) Adjust(writeOffset uint64) error
- func (txn *Transaction) BindTxnOp(op client.TxnOperator)
- func (txn *Transaction) CloneSnapshotWS() client.Workspace
- func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error)
- func (txn *Transaction) EndStatement()
- func (txn *Transaction) GetHaveDDL() bool
- func (txn *Transaction) GetSQLCount() uint64
- func (txn *Transaction) GetSnapshotWriteOffset() int
- func (txn *Transaction) IncrSQLCount()
- func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error
- func (txn *Transaction) PutCnBlockDeletes(blockId *types.Blockid, offsets []int64)
- func (txn *Transaction) ReadOnly() bool
- func (txn *Transaction) Rollback(ctx context.Context) error
- func (txn *Transaction) RollbackLastStatement(ctx context.Context) error
- func (txn *Transaction) SetHaveDDL(haveDDL bool)
- func (txn *Transaction) StartStatement()
- func (txn *Transaction) UpdateSnapshotWriteOffset()
- func (txn *Transaction) WriteBatch(typ int, accountId uint32, databaseId uint64, tableId uint64, ...) error
- func (txn *Transaction) WriteFile(typ int, accountId uint32, databaseId, tableId uint64, ...) error
- func (txn *Transaction) WriteFileLocked(typ int, accountId uint32, databaseId, tableId uint64, ...) error
- func (txn *Transaction) WriteOffset() uint64
Constants ¶
const ( PREFETCH_THRESHOLD = 256 PREFETCH_ROUNDS = 24 SMALLSCAN_THRESHOLD = 100 LARGESCAN_THRESHOLD = 1500 )
const ( INSERT = iota DELETE COMPACTION_CN UPDATE ALTER INSERT_TXN // Only for CN workspace consumption, not sent to DN DELETE_TXN // Only for CN workspace consumption, not sent to DN MERGEOBJECT )
const ( SMALL = iota NORMAL LARGE )
const ( MO_DATABASE_ID_NAME_IDX = 1 MO_DATABASE_ID_ACCOUNT_IDX = 2 MO_DATABASE_LIST_ACCOUNT_IDX = 1 MO_TABLE_ID_NAME_IDX = 1 MO_TABLE_ID_DATABASE_ID_IDX = 2 MO_TABLE_ID_ACCOUNT_IDX = 3 MO_TABLE_LIST_DATABASE_ID_IDX = 1 MO_TABLE_LIST_ACCOUNT_IDX = 2 MO_PRIMARY_OFF = 2 INIT_ROWID_OFFSET = math.MaxUint32 )
const ( WorkspaceThreshold uint64 = 1 * mpool.MB InsertEntryThreshold = 5000 GCBatchOfFileCount int = 1000 GCPoolSize int = 5 )
const (
AllColumns = "*"
)
Variables ¶
var GcCycle = 10 * time.Second
var ( // MinUpdateInterval is the minimal interval to update stats info as it // is necessary to update stats every time. MinUpdateInterval = time.Second * 15 )
Functions ¶
func CheckTxnIsValid ¶ added in v1.2.1
func CheckTxnIsValid(txnOp client.TxnOperator) (err error)
func CompileFilterExpr ¶ added in v1.2.0
func CompileFilterExpr( expr *plan.Expr, proc *process.Process, tableDef *plan.TableDef, fs fileservice.FileService, ) ( fastFilterOp FastFilterOp, loadOp LoadOp, objectFilterOp ObjectFilterOp, blockFilterOp BlockFilterOp, seekOp SeekFirstBlockOp, canCompile bool, highSelectivityHint bool, )
func CompileFilterExprs ¶ added in v1.2.0
func CompileFilterExprs( exprs []*plan.Expr, proc *process.Process, tableDef *plan.TableDef, fs fileservice.FileService, ) ( fastFilterOp FastFilterOp, loadOp LoadOp, objectFilterOp ObjectFilterOp, blockFilterOp BlockFilterOp, seekOp SeekFirstBlockOp, canCompile bool, highSelectivityHint bool, )
func ConstructObjStatsByLoadObjMeta ¶ added in v1.1.0
func ConstructObjStatsByLoadObjMeta( ctx context.Context, metaLoc objectio.Location, fs fileservice.FileService) (stats objectio.ObjectStats, dataMeta objectio.ObjectDataMeta, err error)
func ExecuteBlockFilter ¶ added in v1.2.0
func ExecuteBlockFilter( ctx context.Context, tbl *txnTable, txnOffset int, snapshotTS timestamp.Timestamp, fastFilterOp FastFilterOp, loadOp LoadOp, objectFilterOp ObjectFilterOp, blockFilterOp BlockFilterOp, seekOp SeekFirstBlockOp, snapshot *logtailreplay.PartitionState, uncommittedObjects []objectio.ObjectStats, dirtyBlocks *map[types.Blockid]struct{}, outBlocks *objectio.BlockInfoSlice, fs fileservice.FileService, proc *process.Process, highSelectivityHint bool, ) (err error)
func ForeachBlkInObjStatsList ¶ added in v1.1.0
func ForeachBlkInObjStatsList( next bool, dataMeta objectio.ObjectDataMeta, onBlock func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool, objects ...objectio.ObjectStats, )
ForeachBlkInObjStatsList receives an object info list, and visits each blk of these object info by OnBlock, until the onBlock returns false or all blks have been enumerated. when onBlock returns a false, the next argument decides whether continue onBlock on the next stats or exit foreach completely.
func ForeachCommittedObjects ¶ added in v1.2.0
func ForeachCommittedObjects( createObjs map[objectio.ObjectNameShort]struct{}, delObjs map[objectio.ObjectNameShort]struct{}, p *logtailreplay.PartitionState, onObj func(info logtailreplay.ObjectInfo) error) (err error)
func ForeachSnapshotObjects ¶ added in v1.1.0
func ForeachSnapshotObjects( ts timestamp.Timestamp, onObject func(obj logtailreplay.ObjectInfo, isCommitted bool) error, tableSnapshot *logtailreplay.PartitionState, uncommitted ...objectio.ObjectStats, ) (err error)
func ForeachVisibleDataObject ¶ added in v1.2.0
func ForeachVisibleDataObject( state *logtailreplay.PartitionState, ts types.TS, fn func(obj logtailreplay.ObjectEntry) error, executor ConcurrentExecutor, ) (err error)
func LinearSearchOffsetByValFactory ¶ added in v1.2.0
func ListTnService ¶ added in v1.1.0
ListTnService gets all tn service in the cluster
func NewMergeReader ¶ added in v0.8.0
func TryFastFilterBlocks ¶ added in v1.2.0
func TryFastFilterBlocks( ctx context.Context, tbl *txnTable, txnOffset int, snapshotTS timestamp.Timestamp, tableDef *plan.TableDef, exprs []*plan.Expr, snapshot *logtailreplay.PartitionState, uncommittedObjects []objectio.ObjectStats, dirtyBlocks *map[types.Blockid]struct{}, outBlocks *objectio.BlockInfoSlice, fs fileservice.FileService, proc *process.Process, ) (ok bool, err error)
func UpdateStats ¶ added in v1.0.0
func UpdateStats( ctx context.Context, req *updateStatsRequest, executor ConcurrentExecutor, ) error
UpdateStats is the main function to calculate and update the stats for scan node.
Types ¶
type BasePKFilter ¶ added in v1.2.1
type BasePKFilter struct {
// contains filtered or unexported fields
}
func (*BasePKFilter) String ¶ added in v1.2.1
func (b *BasePKFilter) String() string
type BlockFilterOp ¶ added in v1.2.0
type BlockFilterOp func(int, objectio.BlockObject, objectio.BloomFilter) (bool, bool, error)
type ConcurrentExecutor ¶ added in v1.2.2
type ConcurrentExecutor interface { // AppendTask append the concurrent task to the exuecutor. AppendTask(concurrentTask) // Run starts receive task to execute. Run(context.Context) // GetConcurrency returns the concurrency of this executor. GetConcurrency() int }
ConcurrentExecutor is an interface that runs tasks concurrently.
type Engine ¶
func New ¶
func New( ctx context.Context, mp *mpool.MPool, fs fileservice.FileService, cli client.TxnClient, hakeeper logservice.CNHAKeeperClient, keyRouter client2.KeyRouter[pb.StatsInfoKey], updateWorkerFactor int, ) *Engine
func (*Engine) AllocateIDByKey ¶ added in v0.8.0
func (*Engine) DatabaseByAccountID ¶ added in v1.2.0
func (*Engine) GetNameById ¶ added in v0.7.0
func (*Engine) GetRelationById ¶ added in v0.7.0
func (*Engine) InitLogTailPushModel ¶ added in v0.8.0
func (*Engine) NewBlockReader ¶
func (*Engine) PushClient ¶ added in v1.2.0
func (e *Engine) PushClient() *PushClient
func (*Engine) TryToSubscribeTable ¶ added in v1.2.0
TryToSubscribeTable implements the LogtailEngine interface.
func (*Engine) UnsubscribeTable ¶ added in v1.2.0
UnsubscribeTable implements the LogtailEngine interface.
type Entry ¶
type Entry struct {
// contains filtered or unexported fields
}
Entry represents a delete/insert
type FastFilterOp ¶ added in v1.2.0
type FastFilterOp func(objectio.ObjectStats) (bool, error)
type GlobalStats ¶ added in v1.2.0
type GlobalStats struct { // KeyRouter is the router to decides which node should send to. KeyRouter client.KeyRouter[pb.StatsInfoKey] // contains filtered or unexported fields }
func NewGlobalStats ¶ added in v1.2.0
func NewGlobalStats( ctx context.Context, e *Engine, keyRouter client.KeyRouter[pb.StatsInfoKey], opts ...GlobalStatsOption, ) *GlobalStats
func (*GlobalStats) Get ¶ added in v1.2.0
func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo
func (*GlobalStats) RemoveTid ¶ added in v1.2.2
func (gs *GlobalStats) RemoveTid(tid uint64)
func (*GlobalStats) ShouldUpdate ¶ added in v1.2.1
func (gs *GlobalStats) ShouldUpdate(key pb.StatsInfoKey, entryNum int64) bool
type GlobalStatsConfig ¶ added in v1.2.0
type GlobalStatsConfig struct {
LogtailUpdateStatsThreshold int
}
type GlobalStatsOption ¶ added in v1.2.0
type GlobalStatsOption func(s *GlobalStats)
func WithUpdateWorkerFactor ¶ added in v1.2.2
func WithUpdateWorkerFactor(f int) GlobalStatsOption
WithUpdateWorkerFactor set the update worker factor.
type IDGenerator ¶
type InMemPKFilter ¶ added in v1.2.1
type InMemPKFilter struct {
// contains filtered or unexported fields
}
func (*InMemPKFilter) SetFullData ¶ added in v1.2.1
func (f *InMemPKFilter) SetFullData(op int, isVec bool, val ...[]byte)
func (*InMemPKFilter) SetNull ¶ added in v1.2.1
func (f *InMemPKFilter) SetNull()
func (*InMemPKFilter) String ¶ added in v1.2.1
func (f *InMemPKFilter) String() string
type LoadOp ¶ added in v1.2.0
type LoadOp = func( context.Context, objectio.ObjectStats, objectio.ObjectMeta, objectio.BloomFilter, ) (objectio.ObjectMeta, objectio.BloomFilter, error)
type LoadOpFactory ¶ added in v1.2.0
type LoadOpFactory func(fileservice.FileService) LoadOp
type ObjectFilterOp ¶ added in v1.2.0
type ObjectFilterOp func(objectio.ObjectMeta, objectio.BloomFilter) (bool, error)
type PKFilters ¶ added in v1.2.1
type PKFilters struct {
// contains filtered or unexported fields
}
func ConstructPKFilters ¶ added in v1.2.1
type PartitionReader ¶
type PartitionReader struct {
// contains filtered or unexported fields
}
func (*PartitionReader) Close ¶
func (p *PartitionReader) Close() error
func (*PartitionReader) GetOrderBy ¶ added in v1.2.0
func (p *PartitionReader) GetOrderBy() []*plan.OrderBySpec
func (*PartitionReader) Read ¶
func (p *PartitionReader) Read( _ context.Context, colNames []string, _ *plan.Expr, mp *mpool.MPool, pool engine.VectorPool) (result *batch.Batch, err error)
PartitionReader.Read reads memory data which comes from partitionState.rows and txn.writes, and load its tombstones.
func (*PartitionReader) SetFilterZM ¶ added in v1.2.0
func (p *PartitionReader) SetFilterZM(objectio.ZoneMap)
func (*PartitionReader) SetOrderBy ¶ added in v1.2.0
func (p *PartitionReader) SetOrderBy([]*plan.OrderBySpec)
type PushClient ¶ added in v1.2.0
type PushClient struct {
// contains filtered or unexported fields
}
PushClient is a structure responsible for all operations related to the log tail push model. It provides the following methods:
----------------------------------------------------------------------------------------------------- 1. checkTxnTimeIsLegal : block the process until we have received enough log tail (T_log >= T_txn) 2. TryToSubscribeTable : block the process until we subscribed a table succeed. 3. subscribeTable : send a table subscribe request to service. 4. subSysTables : subscribe mo_databases, mo_tables, mo_columns 5. receiveTableLogTailContinuously : start (1 + consumerNumber) routine to receive log tail from service. Watch out for the following points: 1. if we want to lock both subscriber and subscribed, we should lock subscriber first. -----------------------------------------------------------------------------------------------------
func (*PushClient) GetState ¶ added in v1.2.0
func (c *PushClient) GetState() State
func (*PushClient) TryToSubscribeTable ¶ added in v1.2.0
func (c *PushClient) TryToSubscribeTable( ctx context.Context, dbId, tblId uint64) (err error)
TryToSubscribeTable subscribe a table and block until subscribe succeed. It's deprecated, please use toSubscribeTable instead.
func (*PushClient) UnsubscribeTable ¶ added in v1.2.0
func (c *PushClient) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error
UnsubscribeTable implements the LogtailEngine interface.
type SeekFirstBlockOp ¶ added in v1.2.0
type SeekFirstBlockOp func(objectio.ObjectDataMeta) int
type State ¶ added in v1.2.0
type State struct { LatestTS timestamp.Timestamp SubTables map[SubTableID]SubTableStatus }
type StatsBlkIter ¶ added in v1.1.0
type StatsBlkIter struct {
// contains filtered or unexported fields
}
func NewStatsBlkIter ¶ added in v1.1.0
func NewStatsBlkIter(stats *objectio.ObjectStats, meta objectio.ObjectDataMeta) *StatsBlkIter
func (*StatsBlkIter) Entry ¶ added in v1.1.0
func (i *StatsBlkIter) Entry() objectio.BlockInfo
func (*StatsBlkIter) Next ¶ added in v1.1.0
func (i *StatsBlkIter) Next() bool
type SubTableID ¶ added in v1.2.0
type SubTableStatus ¶ added in v1.2.0
type SubTableStatus struct { SubState SubscribeState LatestTime time.Time }
type SubscribeState ¶ added in v1.2.1
type SubscribeState int32
const ( InvalidSubState SubscribeState = iota Subscribing SubRspReceived Subscribed Unsubscribing Unsubscribed )
type Transaction ¶
Transaction represents a transaction
func (*Transaction) Adjust ¶ added in v1.0.0
func (txn *Transaction) Adjust(writeOffset uint64) error
Adjust adjust writes order after the current statement finished.
func (*Transaction) BindTxnOp ¶ added in v1.2.0
func (txn *Transaction) BindTxnOp(op client.TxnOperator)
func (*Transaction) CloneSnapshotWS ¶ added in v1.2.0
func (txn *Transaction) CloneSnapshotWS() client.Workspace
func (*Transaction) Commit ¶ added in v0.8.0
func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error)
func (*Transaction) EndStatement ¶ added in v1.0.0
func (txn *Transaction) EndStatement()
func (*Transaction) GetHaveDDL ¶ added in v1.2.2
func (txn *Transaction) GetHaveDDL() bool
func (*Transaction) GetSQLCount ¶ added in v1.0.0
func (txn *Transaction) GetSQLCount() uint64
func (*Transaction) GetSnapshotWriteOffset ¶ added in v1.1.2
func (txn *Transaction) GetSnapshotWriteOffset() int
func (*Transaction) IncrSQLCount ¶ added in v1.0.0
func (txn *Transaction) IncrSQLCount()
func (*Transaction) IncrStatementID ¶ added in v0.8.0
func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error
func (*Transaction) PutCnBlockDeletes ¶ added in v0.8.0
func (txn *Transaction) PutCnBlockDeletes(blockId *types.Blockid, offsets []int64)
func (*Transaction) ReadOnly ¶
func (txn *Transaction) ReadOnly() bool
detecting whether a transaction is a read-only transaction
func (*Transaction) Rollback ¶ added in v0.8.0
func (txn *Transaction) Rollback(ctx context.Context) error
func (*Transaction) RollbackLastStatement ¶ added in v0.8.0
func (txn *Transaction) RollbackLastStatement(ctx context.Context) error
func (*Transaction) SetHaveDDL ¶ added in v1.2.2
func (txn *Transaction) SetHaveDDL(haveDDL bool)
func (*Transaction) StartStatement ¶ added in v1.0.0
func (txn *Transaction) StartStatement()
func (*Transaction) UpdateSnapshotWriteOffset ¶ added in v1.1.2
func (txn *Transaction) UpdateSnapshotWriteOffset()
func (*Transaction) WriteBatch ¶
func (txn *Transaction) WriteBatch( typ int, accountId uint32, databaseId uint64, tableId uint64, databaseName string, tableName string, bat *batch.Batch, tnStore DNStore, primaryIdx int, insertBatchHasRowId bool, truncate bool) error
WriteBatch used to write data to the transaction buffer insert/delete/update all use this api insertBatchHasRowId : it denotes the batch has Rowid when the typ is INSERT. if typ is not INSERT, it is always false. truncate : it denotes the batch with typ DELETE on mo_tables is generated when Truncating a table.
func (*Transaction) WriteFile ¶
func (txn *Transaction) WriteFile( typ int, accountId uint32, databaseId, tableId uint64, databaseName, tableName string, fileName string, bat *batch.Batch, tnStore DNStore) error
WriteFile used to add a s3 file information to the transaction buffer insert/delete/update all use this api
func (*Transaction) WriteFileLocked ¶ added in v1.0.0
func (*Transaction) WriteOffset ¶ added in v1.1.1
func (txn *Transaction) WriteOffset() uint64
writeOffset returns the offset of the first write in the workspace