colexec

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 'mo_indexes' table
	MO_INDEX_ID          = "id"
	MO_INDEX_TABLE_ID    = "table_id"
	MO_INDEX_DATABASE_ID = "database_id"
	MO_INDEX_NAME        = "name"
	// MO_INDEX_TYPE can have values : unique, primary or multiple.
	// It is better called as MO_INDEX_CATEGORY. But for now, we will keep it as MO_INDEX_TYPE.
	// The INDEX_TYPE in MYSQL has values BTREE, FULLTEXT, HASH, RTREE
	// Ref: https://dev.mysql.com/doc/mysql-infoschema-excerpt/5.7/en/information-schema-statistics-table.html
	MO_INDEX_TYPE                 = "type"
	MO_INDEX_ALGORITHM            = "algo"
	MO_INDEX_ALGORITHM_TABLE_TYPE = "algo_table_type"
	MO_INDEX_ALGORITHM_PARAMS     = "algo_params"
	MO_INDEX_IS_VISIBLE           = "is_visible"
	MO_INDEX_HIDDEN               = "hidden"
	MO_INDEX_COMMENT              = "comment"
	MO_INDEX_OPTIONS              = "options"
	MO_INDEX_COLUMN_NAME          = "column_name"
	MO_INDEX_ORDINAL_POSITION     = "ordinal_position"
	MO_INDEX_TABLE_NAME           = "index_table_name"
	MO_INDEX_PRIKEY               = catalog.CPrimaryKeyColName
)
View Source
const (
	INDEX_TYPE_PRIMARY  = "PRIMARY"
	INDEX_TYPE_UNIQUE   = "UNIQUE"
	INDEX_TYPE_MULTIPLE = "MULTIPLE"
)
View Source
const (
	// WriteS3Threshold when batches'  size of table reaches this, we will
	// trigger write s3
	WriteS3Threshold uint64 = 128 * mpool.MB

	TagS3Size            uint64 = 10 * mpool.MB
	TagS3SizeForMOLogger uint64 = 1 * mpool.MB
)
View Source
const (
	TxnWorkSpaceIdType = 1
	CnBlockIdType      = 2
)
View Source
const (
	ALLOCID_INDEX_KEY = "index_key"
)

Variables

Column type mapping of table 'mo_indexes'

Functions

func BatchDataNotNullCheck added in v0.8.0

func BatchDataNotNullCheck(tmpBat *batch.Batch, tableDef *plan.TableDef, ctx context.Context) error

func EvalExpressionOnce added in v0.8.0

func EvalExpressionOnce(proc *process.Process, planExpr *plan.Expr, batches []*batch.Batch) (*vector.Vector, error)

func EvaluateFilterByZoneMap added in v0.8.0

func EvaluateFilterByZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (selected bool)

func FilterRowIdForDel added in v0.8.0

func FilterRowIdForDel(proc *process.Process, bat *batch.Batch,
	idx int, primaryKeyIdx int) (*batch.Batch, error)

func FixProjectionResult added in v0.8.0

func FixProjectionResult(proc *process.Process,
	executors []ExpressionExecutor,
	uafs []func(v, w *vector.Vector) error,
	rbat *batch.Batch, sbat *batch.Batch) (dupSize int, err error)

FixProjectionResult set result vector for rbat. sbat is the source batch.

func GenerateConstListExpressionExecutor added in v1.0.0

func GenerateConstListExpressionExecutor(proc *process.Process, exprs []*plan.Expr) (*vector.Vector, error)

func GetExprZoneMap added in v0.8.0

func GetExprZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (v objectio.ZoneMap)

func GetNewRelation added in v0.6.0

func GetNewRelation(eg engine.Engine, dbName, tbleName string, txn client.TxnOperator, ctx context.Context) (engine.Relation, error)

func GroupByPartitionForDelete added in v0.8.0

func GroupByPartitionForDelete(proc *process.Process, bat *batch.Batch, rowIdIdx int, partitionIdx int, partitionNum int, pkIdx int) ([]*batch.Batch, error)

GroupByPartitionForDeleteS3: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func GroupByPartitionForInsert added in v0.8.0

func GroupByPartitionForInsert(proc *process.Process, bat *batch.Batch, attrs []string, pIdx int, partitionNum int) ([]*batch.Batch, error)

GroupByPartitionForInsert: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func InsertIndexMetadata added in v0.8.0

func InsertIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string) error

InsertIndexMetadata :Synchronize the index metadata information of the table to the index metadata table

func InsertOneIndexMetadata added in v0.8.0

func InsertOneIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string, idxdef *plan.IndexDef) error

InsertOneIndexMetadata :Synchronize the single index metadata information into the index metadata table

func NewJoinBatch added in v0.8.0

func NewJoinBatch(bat *batch.Batch, mp *mpool.MPool) (*batch.Batch,
	[]func(*vector.Vector, *vector.Vector, int64, int) error)

func RewriteFilterExprList

func RewriteFilterExprList(list []*plan.Expr) *plan.Expr

RewriteFilterExprList will convert an expression list to be an AndExpr

func SetJoinBatchValues added in v0.8.0

func SetJoinBatchValues(joinBat, bat *batch.Batch, sel int64, length int,
	cfs []func(*vector.Vector, *vector.Vector, int64, int) error) error

func SortInFilter added in v0.8.0

func SortInFilter(vec *vector.Vector)

func SplitAndExprs added in v0.6.0

func SplitAndExprs(list []*plan.Expr) []*plan.Expr

Types

type CnSegmentMap added in v0.8.0

type CnSegmentMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type ColumnExpressionExecutor added in v0.8.0

type ColumnExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*ColumnExpressionExecutor) Eval added in v0.8.0

func (expr *ColumnExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ColumnExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) Free added in v0.8.0

func (expr *ColumnExpressionExecutor) Free()

func (*ColumnExpressionExecutor) GetColIndex added in v1.0.0

func (expr *ColumnExpressionExecutor) GetColIndex() int

func (*ColumnExpressionExecutor) GetRelIndex added in v1.0.0

func (expr *ColumnExpressionExecutor) GetRelIndex() int

func (*ColumnExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ColumnExpressionExecutor) IsColumnExpr() bool

type ExpressionExecutor added in v0.8.0

type ExpressionExecutor interface {
	// Eval will return the result vector of expression.
	// the result memory is reused, so it should not be modified or saved.
	// If it needs, it should be copied by vector.Dup().
	Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// EvalWithoutResultReusing is the same as Eval, but it will not reuse the memory of result vector.
	// so you can save the result vector directly. but should be careful about memory leak.
	// and watch out that maybe the vector is one of the input vectors of batches.
	EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// Free should release all memory of executor.
	// it will be called after query has done.
	Free()

	IsColumnExpr() bool
}

ExpressionExecutor generated from plan.Expr, can evaluate the result from vectors directly.

func NewExpressionExecutor added in v0.8.0

func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (ExpressionExecutor, error)

func NewExpressionExecutorsFromPlanExpressions added in v0.8.0

func NewExpressionExecutorsFromPlanExpressions(proc *process.Process, planExprs []*plan.Expr) (executors []ExpressionExecutor, err error)

type FixedVectorExpressionExecutor added in v0.8.0

type FixedVectorExpressionExecutor struct {
	// contains filtered or unexported fields
}

FixedVectorExpressionExecutor the content of its vector is fixed. e.g.

ConstVector [1, 1, 1, 1, 1]
ConstVector [null, null, null]
ListVector  ["1", "2", "3", null, "5"]

func (*FixedVectorExpressionExecutor) Eval added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Eval(_ *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FixedVectorExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) Free added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Free()

func (*FixedVectorExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FixedVectorExpressionExecutor) IsColumnExpr() bool

type FunctionExpressionExecutor added in v0.8.0

type FunctionExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*FunctionExpressionExecutor) Eval added in v0.8.0

func (expr *FunctionExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FunctionExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) Free added in v0.8.0

func (expr *FunctionExpressionExecutor) Free()

func (*FunctionExpressionExecutor) Init added in v0.8.0

func (expr *FunctionExpressionExecutor) Init(
	proc *process.Process,
	parameterNum int,
	retType types.Type,
	fn func(
		params []*vector.Vector,
		result vector.FunctionResultWrapper,
		proc *process.Process,
		length int) error) (err error)

func (*FunctionExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FunctionExpressionExecutor) IsColumnExpr() bool

func (*FunctionExpressionExecutor) SetParameter added in v0.8.0

func (expr *FunctionExpressionExecutor) SetParameter(index int, executor ExpressionExecutor)

type Merge added in v0.8.0

type Merge[T any] struct {
	// contains filtered or unexported fields
}

we will sort by primary key or clusterby key, so we just need one vector of every batch.

func NewMerge added in v0.8.0

func NewMerge[T any](size int, compLess func([]T, int64, int64) bool, cols [][]T, nulls []*nulls.Nulls) (merge *Merge[T])

func (*Merge[T]) GetNextPos added in v0.8.0

func (merge *Merge[T]) GetNextPos() (batchIndex, rowIndex, size int)

func (*Merge[T]) InitHeap added in v0.8.0

func (merge *Merge[T]) InitHeap()

func (*Merge[T]) Len added in v0.8.0

func (merge *Merge[T]) Len() int

func (*Merge[T]) Less added in v0.8.0

func (merge *Merge[T]) Less(i, j int) bool

type MergeHeap added in v0.8.0

type MergeHeap[T any] struct {
	// contains filtered or unexported fields
}

MergeHeap will take null first rule

func NewMergeHeap added in v0.8.0

func NewMergeHeap[T any](cap_size uint64, cmp func([]T, int64, int64) bool) *MergeHeap[T]

func (*MergeHeap[T]) Pop added in v0.8.0

func (heap *MergeHeap[T]) Pop() (data *MixData[T])

func (*MergeHeap[T]) Push added in v0.8.0

func (heap *MergeHeap[T]) Push(data *MixData[T])

type MergeInterface added in v0.8.0

type MergeInterface interface {
	GetNextPos() (int, int, int)
}

type MixData added in v0.8.0

type MixData[T any] struct {
	// contains filtered or unexported fields
}

type ParamExpressionExecutor added in v0.8.0

type ParamExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*ParamExpressionExecutor) Eval added in v0.8.0

func (expr *ParamExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ParamExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) Free added in v0.8.0

func (expr *ParamExpressionExecutor) Free()

func (*ParamExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ParamExpressionExecutor) IsColumnExpr() bool

type ReceiveInfo added in v0.7.0

type ReceiveInfo struct {
	// it's useless
	NodeAddr string
	Uuid     uuid.UUID
}

ReceiveInfo used to spec which node, and which registers you need

type ReceiverOperator added in v0.8.0

type ReceiverOperator struct {
	// contains filtered or unexported fields
}

ReceiverOperator need to receive batch from proc.Reg.MergeReceivers

func (*ReceiverOperator) DisableChosen added in v1.0.0

func (r *ReceiverOperator) DisableChosen(idx int)

func (*ReceiverOperator) FreeAllReg added in v0.8.0

func (r *ReceiverOperator) FreeAllReg()

func (*ReceiverOperator) FreeMergeTypeOperator added in v0.8.0

func (r *ReceiverOperator) FreeMergeTypeOperator(failed bool)

func (*ReceiverOperator) FreeSingleReg added in v0.8.0

func (r *ReceiverOperator) FreeSingleReg(regIdx int)

func (*ReceiverOperator) InitReceiver added in v0.8.0

func (r *ReceiverOperator) InitReceiver(proc *process.Process, isMergeType bool)

isMergeType means the receiver operator receive batch from all regs or single by some order Merge/MergeGroup/MergeLimit ... are Merge-Type while Join/Intersect/Minus ... are not

func (*ReceiverOperator) ReceiveFromAllRegs added in v0.8.0

func (r *ReceiverOperator) ReceiveFromAllRegs(analyze process.Analyze) (*batch.Batch, bool, error)

You MUST Init ReceiverOperator with Merge-Type if you want to use this function

func (*ReceiverOperator) ReceiveFromSingleReg added in v0.8.0

func (r *ReceiverOperator) ReceiveFromSingleReg(regIdx int, analyze process.Analyze) (*batch.Batch, bool, error)

func (*ReceiverOperator) ReceiveFromSingleRegNonBlock added in v1.0.0

func (r *ReceiverOperator) ReceiveFromSingleRegNonBlock(regIdx int, analyze process.Analyze) (*batch.Batch, bool, error)

func (*ReceiverOperator) RemoveChosen added in v1.0.0

func (r *ReceiverOperator) RemoveChosen(idx int)

type ResultPos added in v0.6.0

type ResultPos struct {
	Rel int32
	Pos int32
}

func NewResultPos added in v0.6.0

func NewResultPos(rel int32, pos int32) ResultPos

type RuntimeFilterChan added in v0.8.0

type RuntimeFilterChan struct {
	Spec *plan.RuntimeFilterSpec
	Chan chan *pipeline.RuntimeFilter
}

type S3Writer added in v0.8.0

type S3Writer struct {

	// Bats[i] used to store the batches of table
	// Each batch in Bats will be sorted internally, and all batches correspond to only one table
	// when the batches' size is over 64M, we will use merge sort, and then write a segment in s3
	Bats []*batch.Batch
	// contains filtered or unexported fields
}

S3Writer is used to write table data to S3 and package a series of `BlockWriter` write operations Currently there are two scenarios will let cn write s3 directly scenario 1 is insert operator directly go s3, when a one-time insert/load data volume is relatively large will trigger the scenario scenario 2 is txn.workspace exceeds the threshold value, in the txn.dumpBatch function trigger a write s3

func AllocPartitionS3Writer added in v0.8.0

func AllocPartitionS3Writer(proc *process.Process, tableDef *plan.TableDef) ([]*S3Writer, error)

AllocPartitionS3Writer Alloc S3 writers for partitioned table.

func AllocS3Writer added in v0.8.0

func AllocS3Writer(proc *process.Process, tableDef *plan.TableDef) (*S3Writer, error)

func (*S3Writer) Free added in v0.8.0

func (w *S3Writer) Free(proc *process.Process)

func (*S3Writer) GenerateWriter added in v0.8.0

func (w *S3Writer) GenerateWriter(proc *process.Process) (objectio.ObjectName, error)

func (*S3Writer) GetBlockInfoBat added in v1.0.0

func (w *S3Writer) GetBlockInfoBat() *batch.Batch

func (*S3Writer) InitBuffers added in v0.8.0

func (w *S3Writer) InitBuffers(proc *process.Process, bat *batch.Batch)

func (*S3Writer) Output added in v0.8.0

func (w *S3Writer) Output(proc *process.Process, result *vm.CallResult) error

func (*S3Writer) Put added in v0.8.0

func (w *S3Writer) Put(bat *batch.Batch, proc *process.Process) bool

Put batch into w.bats , and make sure that each batch in w.bats

contains options.DefaultBlockMaxRows rows except for the last one.
true: the tableBatches[idx] is over threshold
false: the tableBatches[idx] is less than or equal threshold

func (*S3Writer) ResetBlockInfoBat added in v1.0.0

func (w *S3Writer) ResetBlockInfoBat(proc *process.Process)

func (*S3Writer) SetSchemaVer added in v1.0.0

func (w *S3Writer) SetSchemaVer(ver uint32)

func (*S3Writer) SetSeqnums added in v1.0.0

func (w *S3Writer) SetSeqnums(seqnums []uint16)

func (*S3Writer) SetSortIdx added in v0.8.0

func (w *S3Writer) SetSortIdx(sortIdx int)

func (*S3Writer) SetTableName added in v1.0.0

func (w *S3Writer) SetTableName(name string)

func (*S3Writer) SortAndFlush added in v0.8.0

func (w *S3Writer) SortAndFlush(proc *process.Process) error

func (*S3Writer) WriteBlock added in v0.8.0

func (w *S3Writer) WriteBlock(bat *batch.Batch, dataType ...objectio.DataMetaType) error

func (*S3Writer) WriteEndBlocks added in v0.8.0

func (w *S3Writer) WriteEndBlocks(proc *process.Process) ([]catalog.BlockInfo, []objectio.ObjectStats, error)

WriteEndBlocks writes batches in buffer to fileservice(aka s3 in this feature) and get meta data about block on fileservice and put it into metaLocBat For more information, please refer to the comment about func WriteEnd in Writer interface

func (*S3Writer) WriteS3Batch added in v0.8.0

func (w *S3Writer) WriteS3Batch(proc *process.Process, bat *batch.Batch) error

WriteS3Batch logic: S3Writer caches the batches in memory and when the batches size reaches 10M, we add a tag to indicate we need to write these data into s3, but not immediately. We continue to wait until no more data or the data size reaches 64M, at that time we will trigger write s3.

func (*S3Writer) WriteS3CacheBatch added in v0.8.0

func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error

type Server added in v0.7.0

type Server struct {
	sync.Mutex

	CNSegmentId   types.Uuid
	InitSegmentId bool
	// contains filtered or unexported fields
}

Server used to support cn2s3 directly, for more info, refer to docs about it

var Srv *Server

func NewServer added in v0.7.0

func NewServer(client logservice.CNHAKeeperClient) *Server

func (*Server) DeleteTxnSegmentIds added in v0.8.0

func (srv *Server) DeleteTxnSegmentIds(sids []objectio.Segmentid)

func (*Server) DeleteUuids added in v1.0.0

func (srv *Server) DeleteUuids(uuids []uuid.UUID)

func (*Server) GenerateObject added in v1.0.0

func (srv *Server) GenerateObject() objectio.ObjectName

SegmentId is part of Id for cn2s3 directly, for more info, refer to docs about it

func (*Server) GetCnSegmentMap added in v0.8.0

func (srv *Server) GetCnSegmentMap() map[string]int32

func (*Server) GetCnSegmentType added in v0.8.0

func (srv *Server) GetCnSegmentType(sid *objectio.Segmentid) int32

func (*Server) GetProcByUuid added in v1.0.0

func (srv *Server) GetProcByUuid(u uuid.UUID, forcedDelete bool) (*process.Process, bool)

GetProcByUuid used the uuid to get a process from the srv. if the process is nil, it means the process has done. if forcedDelete, do an action to avoid another routine to put a new item.

func (*Server) PutCnSegment added in v0.8.0

func (srv *Server) PutCnSegment(sid *objectio.Segmentid, segmentType int32)

func (*Server) PutProcIntoUuidMap added in v1.0.0

func (srv *Server) PutProcIntoUuidMap(u uuid.UUID, p *process.Process) error

type UuidProcMap added in v0.8.0

type UuidProcMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type VarExpressionExecutor added in v0.8.0

type VarExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*VarExpressionExecutor) Eval added in v0.8.0

func (expr *VarExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *VarExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) Free added in v0.8.0

func (expr *VarExpressionExecutor) Free()

func (*VarExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *VarExpressionExecutor) IsColumnExpr() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL