disttae

package
v1.2.3-hotfix-20240916 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2024 License: Apache-2.0 Imports: 77 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PREFETCH_THRESHOLD  = 256
	PREFETCH_ROUNDS     = 24
	SMALLSCAN_THRESHOLD = 100
	LARGESCAN_THRESHOLD = 1500
)
View Source
const (
	INSERT = iota
	DELETE
	COMPACTION_CN
	UPDATE
	ALTER
	INSERT_TXN // Only for CN workspace consumption, not sent to DN
	DELETE_TXN // Only for CN workspace consumption, not sent to DN

	MERGEOBJECT
)
View Source
const (
	SMALL = iota
	NORMAL
	LARGE
)
View Source
const (
	MO_DATABASE_ID_NAME_IDX       = 1
	MO_DATABASE_ID_ACCOUNT_IDX    = 2
	MO_DATABASE_LIST_ACCOUNT_IDX  = 1
	MO_TABLE_ID_NAME_IDX          = 1
	MO_TABLE_ID_DATABASE_ID_IDX   = 2
	MO_TABLE_ID_ACCOUNT_IDX       = 3
	MO_TABLE_LIST_DATABASE_ID_IDX = 1
	MO_TABLE_LIST_ACCOUNT_IDX     = 2
	MO_PRIMARY_OFF                = 2
	INIT_ROWID_OFFSET             = math.MaxUint32
)
View Source
const (
	WorkspaceThreshold   uint64 = 1 * mpool.MB
	InsertEntryThreshold        = 5000
	GCBatchOfFileCount   int    = 1000
	GCPoolSize           int    = 5
)
View Source
const (
	AllColumns = "*"
)

Variables

View Source
var GcCycle = 10 * time.Second
View Source
var (
	// MinUpdateInterval is the minimal interval to update stats info as it
	// is necessary to update stats every time.
	MinUpdateInterval = time.Second * 15
)

Functions

func CheckTxnIsValid added in v1.2.1

func CheckTxnIsValid(txnOp client.TxnOperator) (err error)

func CompileFilterExpr added in v1.2.0

func CompileFilterExpr(
	expr *plan.Expr,
	proc *process.Process,
	tableDef *plan.TableDef,
	fs fileservice.FileService,
) (
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	canCompile bool,
	highSelectivityHint bool,
)

func CompileFilterExprs added in v1.2.0

func CompileFilterExprs(
	exprs []*plan.Expr,
	proc *process.Process,
	tableDef *plan.TableDef,
	fs fileservice.FileService,
) (
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	canCompile bool,
	highSelectivityHint bool,
)

func ConstructObjStatsByLoadObjMeta added in v1.1.0

func ConstructObjStatsByLoadObjMeta(
	ctx context.Context, metaLoc objectio.Location,
	fs fileservice.FileService) (stats objectio.ObjectStats, dataMeta objectio.ObjectDataMeta, err error)

func ExecuteBlockFilter added in v1.2.0

func ExecuteBlockFilter(
	ctx context.Context,
	tbl *txnTable,
	txnOffset int,
	snapshotTS timestamp.Timestamp,
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	snapshot *logtailreplay.PartitionState,
	uncommittedObjects []objectio.ObjectStats,
	dirtyBlocks *map[types.Blockid]struct{},
	outBlocks *objectio.BlockInfoSlice,
	fs fileservice.FileService,
	proc *process.Process,
	highSelectivityHint bool,
) (err error)

func ForeachBlkInObjStatsList added in v1.1.0

func ForeachBlkInObjStatsList(
	next bool,
	dataMeta objectio.ObjectDataMeta,
	onBlock func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool,
	objects ...objectio.ObjectStats,
)

ForeachBlkInObjStatsList receives an object info list, and visits each blk of these object info by OnBlock, until the onBlock returns false or all blks have been enumerated. when onBlock returns a false, the next argument decides whether continue onBlock on the next stats or exit foreach completely.

func ForeachCommittedObjects added in v1.2.0

func ForeachCommittedObjects(
	createObjs map[objectio.ObjectNameShort]struct{},
	delObjs map[objectio.ObjectNameShort]struct{},
	p *logtailreplay.PartitionState,
	onObj func(info logtailreplay.ObjectInfo) error) (err error)

func ForeachSnapshotObjects added in v1.1.0

func ForeachSnapshotObjects(
	ts timestamp.Timestamp,
	onObject func(obj logtailreplay.ObjectInfo, isCommitted bool) error,
	tableSnapshot *logtailreplay.PartitionState,
	uncommitted ...objectio.ObjectStats,
) (err error)

func ForeachVisibleDataObject added in v1.2.0

func ForeachVisibleDataObject(
	state *logtailreplay.PartitionState,
	ts types.TS,
	fn func(obj logtailreplay.ObjectEntry) error,
	executor ConcurrentExecutor,
) (err error)

func LinearSearchOffsetByValFactory added in v1.2.0

func LinearSearchOffsetByValFactory(pk *vector.Vector) func(*vector.Vector) []int32

func ListTnService added in v1.1.0

func ListTnService(appendFn func(service *metadata.TNService))

ListTnService gets all tn service in the cluster

func NewMergeReader added in v0.8.0

func NewMergeReader(readers []engine.Reader) *mergeReader

func TryFastFilterBlocks added in v1.2.0

func TryFastFilterBlocks(
	ctx context.Context,
	tbl *txnTable,
	txnOffset int,
	snapshotTS timestamp.Timestamp,
	tableDef *plan.TableDef,
	exprs []*plan.Expr,
	snapshot *logtailreplay.PartitionState,
	uncommittedObjects []objectio.ObjectStats,
	dirtyBlocks *map[types.Blockid]struct{},
	outBlocks *objectio.BlockInfoSlice,
	fs fileservice.FileService,
	proc *process.Process,
) (ok bool, err error)

func UpdateStats added in v1.0.0

func UpdateStats(
	ctx context.Context, req *updateStatsRequest, executor ConcurrentExecutor,
) error

UpdateStats is the main function to calculate and update the stats for scan node.

Types

type BasePKFilter added in v1.2.1

type BasePKFilter struct {
	// contains filtered or unexported fields
}

func (*BasePKFilter) String added in v1.2.1

func (b *BasePKFilter) String() string

type BlockFilterOp added in v1.2.0

type BlockFilterOp func(int, objectio.BlockObject, objectio.BloomFilter) (bool, bool, error)

type ConcurrentExecutor added in v1.2.2

type ConcurrentExecutor interface {
	// AppendTask append the concurrent task to the exuecutor.
	AppendTask(concurrentTask)
	// Run starts receive task to execute.
	Run(context.Context)
	// GetConcurrency returns the concurrency of this executor.
	GetConcurrency() int
}

ConcurrentExecutor is an interface that runs tasks concurrently.

type DNStore

type DNStore = metadata.TNService

type Engine

type Engine struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(
	ctx context.Context,
	mp *mpool.MPool,
	fs fileservice.FileService,
	cli client.TxnClient,
	hakeeper logservice.CNHAKeeperClient,
	keyRouter client2.KeyRouter[pb.StatsInfoKey],
	updateWorkerFactor int,
) *Engine

func (*Engine) AllocateIDByKey added in v0.8.0

func (e *Engine) AllocateIDByKey(ctx context.Context, key string) (uint64, error)

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error

func (*Engine) Database

func (e *Engine) Database(ctx context.Context, name string,
	op client.TxnOperator) (engine.Database, error)

func (*Engine) DatabaseByAccountID added in v1.2.0

func (e *Engine) DatabaseByAccountID(
	accountID uint32,
	name string,
	op client.TxnOperator) (engine.Database, error)

func (*Engine) Databases

func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error)

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) (err error)

func (*Engine) GetNameById added in v0.7.0

func (e *Engine) GetNameById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName string, tblName string, err error)

func (*Engine) GetRelationById added in v0.7.0

func (e *Engine) GetRelationById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName, tableName string, rel engine.Relation, err error)

func (*Engine) Hints

func (e *Engine) Hints() (h engine.Hints)

func (*Engine) InitLogTailPushModel added in v0.8.0

func (e *Engine) InitLogTailPushModel(ctx context.Context, timestampWaiter client.TimestampWaiter) error

func (*Engine) New

func (e *Engine) New(ctx context.Context, op client.TxnOperator) error

func (*Engine) NewBlockReader

func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Timestamp,
	expr *plan.Expr, filter any, ranges []byte, tblDef *plan.TableDef, proc any) ([]engine.Reader, error)

func (*Engine) Nodes

func (e *Engine) Nodes(
	isInternal bool, tenant string, username string, cnLabel map[string]string,
) (engine.Nodes, error)

func (*Engine) PushClient added in v1.2.0

func (e *Engine) PushClient() *PushClient

func (*Engine) Stats added in v1.2.0

func (e *Engine) Stats(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo

func (*Engine) TryToSubscribeTable added in v1.2.0

func (e *Engine) TryToSubscribeTable(ctx context.Context, dbID, tbID uint64) error

TryToSubscribeTable implements the LogtailEngine interface.

func (*Engine) UnsubscribeTable added in v1.2.0

func (e *Engine) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error

UnsubscribeTable implements the LogtailEngine interface.

func (*Engine) UpdateOfPush added in v0.8.0

func (e *Engine) UpdateOfPush(
	ctx context.Context,
	databaseId,
	tableId uint64, ts timestamp.Timestamp) error

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry represents a delete/insert

type FastFilterOp added in v1.2.0

type FastFilterOp func(objectio.ObjectStats) (bool, error)

type GlobalStats added in v1.2.0

type GlobalStats struct {

	// KeyRouter is the router to decides which node should send to.
	KeyRouter client.KeyRouter[pb.StatsInfoKey]
	// contains filtered or unexported fields
}

func NewGlobalStats added in v1.2.0

func NewGlobalStats(
	ctx context.Context, e *Engine, keyRouter client.KeyRouter[pb.StatsInfoKey], opts ...GlobalStatsOption,
) *GlobalStats

func (*GlobalStats) Get added in v1.2.0

func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo

func (*GlobalStats) RemoveTid added in v1.2.2

func (gs *GlobalStats) RemoveTid(tid uint64)

func (*GlobalStats) ShouldUpdate added in v1.2.1

func (gs *GlobalStats) ShouldUpdate(key pb.StatsInfoKey, entryNum int64) bool

type GlobalStatsConfig added in v1.2.0

type GlobalStatsConfig struct {
	LogtailUpdateStatsThreshold int
}

type GlobalStatsOption added in v1.2.0

type GlobalStatsOption func(s *GlobalStats)

func WithUpdateWorkerFactor added in v1.2.2

func WithUpdateWorkerFactor(f int) GlobalStatsOption

WithUpdateWorkerFactor set the update worker factor.

type IDGenerator

type IDGenerator interface {
	AllocateID(ctx context.Context) (uint64, error)
	// AllocateIDByKey allocate a globally unique ID by key.
	AllocateIDByKey(ctx context.Context, key string) (uint64, error)
}

type InMemPKFilter added in v1.2.1

type InMemPKFilter struct {
	// contains filtered or unexported fields
}

func (*InMemPKFilter) SetFullData added in v1.2.1

func (f *InMemPKFilter) SetFullData(op int, isVec bool, val ...[]byte)

func (*InMemPKFilter) SetNull added in v1.2.1

func (f *InMemPKFilter) SetNull()

func (*InMemPKFilter) String added in v1.2.1

func (f *InMemPKFilter) String() string

type LoadOpFactory added in v1.2.0

type LoadOpFactory func(fileservice.FileService) LoadOp

type ObjectFilterOp added in v1.2.0

type ObjectFilterOp func(objectio.ObjectMeta, objectio.BloomFilter) (bool, error)

type PKFilters added in v1.2.1

type PKFilters struct {
	// contains filtered or unexported fields
}

func ConstructPKFilters added in v1.2.1

func ConstructPKFilters(tableDef *plan.TableDef, dbName string,
	ts timestamp.Timestamp, state *logtailreplay.PartitionState,
	expr *plan.Expr, proc *process.Process,
	packerPool *fileservice.Pool[*types.Packer]) (filters PKFilters)

type PartitionReader

type PartitionReader struct {
	// contains filtered or unexported fields
}

func (*PartitionReader) Close

func (p *PartitionReader) Close() error

func (*PartitionReader) GetOrderBy added in v1.2.0

func (p *PartitionReader) GetOrderBy() []*plan.OrderBySpec

func (*PartitionReader) Read

func (p *PartitionReader) Read(
	_ context.Context,
	colNames []string,
	_ *plan.Expr,
	mp *mpool.MPool,
	pool engine.VectorPool) (result *batch.Batch, err error)

PartitionReader.Read reads memory data which comes from partitionState.rows and txn.writes, and load its tombstones.

func (*PartitionReader) SetFilterZM added in v1.2.0

func (p *PartitionReader) SetFilterZM(objectio.ZoneMap)

func (*PartitionReader) SetOrderBy added in v1.2.0

func (p *PartitionReader) SetOrderBy([]*plan.OrderBySpec)

type Pos added in v0.8.0

type Pos struct {
	// contains filtered or unexported fields
}

type PushClient added in v1.2.0

type PushClient struct {
	// contains filtered or unexported fields
}

PushClient is a structure responsible for all operations related to the log tail push model. It provides the following methods:

	-----------------------------------------------------------------------------------------------------
	 1. checkTxnTimeIsLegal : block the process until we have received enough log tail (T_log >= T_txn)
	 2. TryToSubscribeTable : block the process until we subscribed a table succeed.
	 3. subscribeTable	   : send a table subscribe request to service.
	 4. subSysTables : subscribe mo_databases, mo_tables, mo_columns
	 5. receiveTableLogTailContinuously   : start (1 + consumerNumber) routine to receive log tail from service.

 Watch out for the following points:
	 1. if we want to lock both subscriber and subscribed, we should lock subscriber first.
	-----------------------------------------------------------------------------------------------------

func (*PushClient) GetState added in v1.2.0

func (c *PushClient) GetState() State

func (*PushClient) TryToSubscribeTable added in v1.2.0

func (c *PushClient) TryToSubscribeTable(
	ctx context.Context,
	dbId, tblId uint64) (err error)

TryToSubscribeTable subscribe a table and block until subscribe succeed. It's deprecated, please use toSubscribeTable instead.

func (*PushClient) UnsubscribeTable added in v1.2.0

func (c *PushClient) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error

UnsubscribeTable implements the LogtailEngine interface.

type SeekFirstBlockOp added in v1.2.0

type SeekFirstBlockOp func(objectio.ObjectDataMeta) int

type State added in v1.2.0

type State struct {
	LatestTS  timestamp.Timestamp
	SubTables map[SubTableID]SubTableStatus
}

type StatsBlkIter added in v1.1.0

type StatsBlkIter struct {
	// contains filtered or unexported fields
}

func NewStatsBlkIter added in v1.1.0

func NewStatsBlkIter(stats *objectio.ObjectStats, meta objectio.ObjectDataMeta) *StatsBlkIter

func (*StatsBlkIter) Entry added in v1.1.0

func (i *StatsBlkIter) Entry() objectio.BlockInfo

func (*StatsBlkIter) Next added in v1.1.0

func (i *StatsBlkIter) Next() bool

type SubTableID added in v1.2.0

type SubTableID struct {
	DatabaseID uint64
	TableID    uint64
}

type SubTableStatus added in v1.2.0

type SubTableStatus struct {
	SubState   SubscribeState
	LatestTime time.Time
}

type SubscribeState added in v1.2.1

type SubscribeState int32
const (
	InvalidSubState SubscribeState = iota
	Subscribing
	SubRspReceived
	Subscribed
	Unsubscribing
	Unsubscribed
)

type Transaction

type Transaction struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Transaction represents a transaction

func (*Transaction) Adjust added in v1.0.0

func (txn *Transaction) Adjust(writeOffset uint64) error

Adjust adjust writes order after the current statement finished.

func (*Transaction) BindTxnOp added in v1.2.0

func (txn *Transaction) BindTxnOp(op client.TxnOperator)

func (*Transaction) CloneSnapshotWS added in v1.2.0

func (txn *Transaction) CloneSnapshotWS() client.Workspace

func (*Transaction) Commit added in v0.8.0

func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error)

func (*Transaction) EndStatement added in v1.0.0

func (txn *Transaction) EndStatement()

func (*Transaction) GetHaveDDL added in v1.2.2

func (txn *Transaction) GetHaveDDL() bool

func (*Transaction) GetSQLCount added in v1.0.0

func (txn *Transaction) GetSQLCount() uint64

func (*Transaction) GetSnapshotWriteOffset added in v1.1.2

func (txn *Transaction) GetSnapshotWriteOffset() int

func (*Transaction) IncrSQLCount added in v1.0.0

func (txn *Transaction) IncrSQLCount()

func (*Transaction) IncrStatementID added in v0.8.0

func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error

func (*Transaction) PutCnBlockDeletes added in v0.8.0

func (txn *Transaction) PutCnBlockDeletes(blockId *types.Blockid, offsets []int64)

func (*Transaction) ReadOnly

func (txn *Transaction) ReadOnly() bool

detecting whether a transaction is a read-only transaction

func (*Transaction) Rollback added in v0.8.0

func (txn *Transaction) Rollback(ctx context.Context) error

func (*Transaction) RollbackLastStatement added in v0.8.0

func (txn *Transaction) RollbackLastStatement(ctx context.Context) error

func (*Transaction) SetHaveDDL added in v1.2.2

func (txn *Transaction) SetHaveDDL(haveDDL bool)

func (*Transaction) StartStatement added in v1.0.0

func (txn *Transaction) StartStatement()

func (*Transaction) UpdateSnapshotWriteOffset added in v1.1.2

func (txn *Transaction) UpdateSnapshotWriteOffset()

func (*Transaction) WriteBatch

func (txn *Transaction) WriteBatch(
	typ int,
	accountId uint32,
	databaseId uint64,
	tableId uint64,
	databaseName string,
	tableName string,
	bat *batch.Batch,
	tnStore DNStore,
	primaryIdx int,
	insertBatchHasRowId bool,
	truncate bool) error

WriteBatch used to write data to the transaction buffer insert/delete/update all use this api insertBatchHasRowId : it denotes the batch has Rowid when the typ is INSERT. if typ is not INSERT, it is always false. truncate : it denotes the batch with typ DELETE on mo_tables is generated when Truncating a table.

func (*Transaction) WriteFile

func (txn *Transaction) WriteFile(
	typ int,
	accountId uint32,
	databaseId,
	tableId uint64,
	databaseName,
	tableName string,
	fileName string,
	bat *batch.Batch,
	tnStore DNStore) error

WriteFile used to add a s3 file information to the transaction buffer insert/delete/update all use this api

func (*Transaction) WriteFileLocked added in v1.0.0

func (txn *Transaction) WriteFileLocked(
	typ int,
	accountId uint32,
	databaseId,
	tableId uint64,
	databaseName,
	tableName string,
	fileName string,
	bat *batch.Batch,
	tnStore DNStore) error

func (*Transaction) WriteOffset added in v1.1.1

func (txn *Transaction) WriteOffset() uint64

writeOffset returns the offset of the first write in the workspace

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL