colexec

package
v1.2.3-hotfix-20240916 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 'mo_indexes' table
	MO_INDEX_ID          = "id"
	MO_INDEX_TABLE_ID    = "table_id"
	MO_INDEX_DATABASE_ID = "database_id"
	MO_INDEX_NAME        = "name"
	// MO_INDEX_TYPE can have values : unique, primary or multiple.
	// It is better called as MO_INDEX_CATEGORY. But for now, we will keep it as MO_INDEX_TYPE.
	// The INDEX_TYPE in MYSQL has values BTREE, FULLTEXT, HASH, RTREE
	// Ref: https://dev.mysql.com/doc/mysql-infoschema-excerpt/5.7/en/information-schema-statistics-table.html
	MO_INDEX_TYPE                 = "type"
	MO_INDEX_ALGORITHM            = "algo"
	MO_INDEX_ALGORITHM_TABLE_TYPE = "algo_table_type"
	MO_INDEX_ALGORITHM_PARAMS     = "algo_params"
	MO_INDEX_IS_VISIBLE           = "is_visible"
	MO_INDEX_HIDDEN               = "hidden"
	MO_INDEX_COMMENT              = "comment"
	MO_INDEX_OPTIONS              = "options"
	MO_INDEX_COLUMN_NAME          = "column_name"
	MO_INDEX_ORDINAL_POSITION     = "ordinal_position"
	MO_INDEX_TABLE_NAME           = "index_table_name"
	MO_INDEX_PRIKEY               = catalog.CPrimaryKeyColName
)
View Source
const (
	INDEX_TYPE_PRIMARY  = "PRIMARY"
	INDEX_TYPE_UNIQUE   = "UNIQUE"
	INDEX_TYPE_MULTIPLE = "MULTIPLE"
)
View Source
const (
	// WriteS3Threshold when batches'  size of table reaches this, we will
	// trigger write s3
	WriteS3Threshold uint64 = 128 * mpool.MB

	TagS3Size            uint64 = 10 * mpool.MB
	TagS3SizeForMOLogger uint64 = 1 * mpool.MB
)
View Source
const (
	TxnWorkSpaceIdType = 1
	CnBlockIdType      = 2
)
View Source
const (
	ALLOCID_INDEX_KEY = "index_key"
)
View Source
const (
	DefaultBatchSize = 8192
)

Variables

Column type mapping of table 'mo_indexes'

Functions

func BatchDataNotNullCheck added in v0.8.0

func BatchDataNotNullCheck(tmpBat *batch.Batch, tableDef *plan.TableDef, ctx context.Context) error

func EvalExpressionOnce added in v0.8.0

func EvalExpressionOnce(proc *process.Process, planExpr *plan.Expr, batches []*batch.Batch) (*vector.Vector, error)

func EvaluateFilterByZoneMap added in v0.8.0

func EvaluateFilterByZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (selected bool)

func FilterRowIdForDel added in v0.8.0

func FilterRowIdForDel(proc *process.Process, bat *batch.Batch,
	idx int, primaryKeyIdx int) (*batch.Batch, error)

func FixProjectionResult added in v0.8.0

func FixProjectionResult(proc *process.Process,
	executors []ExpressionExecutor,
	uafs []func(v, w *vector.Vector) error,
	rbat *batch.Batch, sbat *batch.Batch) (dupSize int, err error)

FixProjectionResult set result vector for rbat. sbat is the source batch.

func GenerateConstListExpressionExecutor added in v1.0.0

func GenerateConstListExpressionExecutor(proc *process.Process, exprs []*plan.Expr) (*vector.Vector, error)

func GetExprZoneMap added in v0.8.0

func GetExprZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (v objectio.ZoneMap)

func GetNewRelation added in v0.6.0

func GetNewRelation(eg engine.Engine, dbName, tbleName string, txn client.TxnOperator, ctx context.Context) (engine.Relation, error)

func GroupByPartitionForDelete added in v0.8.0

func GroupByPartitionForDelete(proc *process.Process, bat *batch.Batch, rowIdIdx int, partitionIdx int, partitionNum int, pkIdx int) ([]*batch.Batch, error)

GroupByPartitionForDeleteS3: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func GroupByPartitionForInsert added in v0.8.0

func GroupByPartitionForInsert(proc *process.Process, bat *batch.Batch, attrs []string, pIdx int, partitionNum int) ([]*batch.Batch, error)

GroupByPartitionForInsert: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func InsertIndexMetadata added in v0.8.0

func InsertIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string) error

InsertIndexMetadata :Synchronize the index metadata information of the table to the index metadata table

func InsertOneIndexMetadata added in v0.8.0

func InsertOneIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string, idxdef *plan.IndexDef) error

InsertOneIndexMetadata :Synchronize the single index metadata information into the index metadata table

func NewJoinBatch added in v0.8.0

func NewJoinBatch(bat *batch.Batch, mp *mpool.MPool) (*batch.Batch,
	[]func(*vector.Vector, *vector.Vector, int64, int) error)

func RewriteFilterExprList

func RewriteFilterExprList(list []*plan.Expr) *plan.Expr

RewriteFilterExprList will convert an expression list to be an AndExpr

func Set added in v1.2.0

func Set(s *Server)

func SetJoinBatchValues added in v0.8.0

func SetJoinBatchValues(joinBat, bat *batch.Batch, sel int64, length int,
	cfs []func(*vector.Vector, *vector.Vector, int64, int) error) error

func SplitAndExprs added in v0.6.0

func SplitAndExprs(list []*plan.Expr) []*plan.Expr

Types

type CnSegmentMap added in v0.8.0

type CnSegmentMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type ColumnExpressionExecutor added in v0.8.0

type ColumnExpressionExecutor struct {
	// contains filtered or unexported fields
}

func NewColumnExpressionExecutor added in v1.2.0

func NewColumnExpressionExecutor() *ColumnExpressionExecutor

func (*ColumnExpressionExecutor) Eval added in v0.8.0

func (expr *ColumnExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ColumnExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) Free added in v0.8.0

func (expr *ColumnExpressionExecutor) Free()

func (*ColumnExpressionExecutor) GetColIndex added in v1.0.0

func (expr *ColumnExpressionExecutor) GetColIndex() int

func (*ColumnExpressionExecutor) GetRelIndex added in v1.0.0

func (expr *ColumnExpressionExecutor) GetRelIndex() int

func (*ColumnExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ColumnExpressionExecutor) IsColumnExpr() bool

func (ColumnExpressionExecutor) TypeName added in v1.2.0

func (expr ColumnExpressionExecutor) TypeName() string

type ExpressionExecutor added in v0.8.0

type ExpressionExecutor interface {
	// Eval will return the result vector of expression.
	// the result memory is reused, so it should not be modified or saved.
	// If it needs, it should be copied by vector.Dup().
	Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// EvalWithoutResultReusing is the same as Eval, but it will not reuse the memory of result vector.
	// so you can save the result vector directly. but should be careful about memory leak.
	// and watch out that maybe the vector is one of the input vectors of batches.
	EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// Free should release all memory of executor.
	// it will be called after query has done.
	Free()

	IsColumnExpr() bool
}

ExpressionExecutor generated from plan.Expr, can evaluate the result from vectors directly.

func NewExpressionExecutor added in v0.8.0

func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (ExpressionExecutor, error)

func NewExpressionExecutorsFromPlanExpressions added in v0.8.0

func NewExpressionExecutorsFromPlanExpressions(proc *process.Process, planExprs []*plan.Expr) (executors []ExpressionExecutor, err error)

type FixedVectorExpressionExecutor added in v0.8.0

type FixedVectorExpressionExecutor struct {
	// contains filtered or unexported fields
}

FixedVectorExpressionExecutor the content of its vector is fixed. e.g.

ConstVector [1, 1, 1, 1, 1]
ConstVector [null, null, null]
ListVector  ["1", "2", "3", null, "5"]

func NewFixedVectorExpressionExecutor added in v1.2.0

func NewFixedVectorExpressionExecutor(m *mpool.MPool, fixed bool, resultVector *vector.Vector) *FixedVectorExpressionExecutor

func (*FixedVectorExpressionExecutor) Eval added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Eval(_ *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FixedVectorExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) Free added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Free()

func (*FixedVectorExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FixedVectorExpressionExecutor) IsColumnExpr() bool

func (FixedVectorExpressionExecutor) TypeName added in v1.2.0

func (expr FixedVectorExpressionExecutor) TypeName() string

type FunctionExpressionExecutor added in v0.8.0

type FunctionExpressionExecutor struct {
	// contains filtered or unexported fields
}

func NewFunctionExpressionExecutor added in v1.2.0

func NewFunctionExpressionExecutor() *FunctionExpressionExecutor

func (*FunctionExpressionExecutor) Eval added in v0.8.0

func (expr *FunctionExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FunctionExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) Free added in v0.8.0

func (expr *FunctionExpressionExecutor) Free()

func (*FunctionExpressionExecutor) Init added in v0.8.0

func (expr *FunctionExpressionExecutor) Init(
	proc *process.Process,
	parameterNum int,
	retType types.Type,
	fn func(
		params []*vector.Vector,
		result vector.FunctionResultWrapper,
		proc *process.Process,
		length int) error,
	freeFn func() error) (err error)

func (*FunctionExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FunctionExpressionExecutor) IsColumnExpr() bool

func (*FunctionExpressionExecutor) SetParameter added in v0.8.0

func (expr *FunctionExpressionExecutor) SetParameter(index int, executor ExpressionExecutor)

func (FunctionExpressionExecutor) TypeName added in v1.2.0

func (expr FunctionExpressionExecutor) TypeName() string

type Merge added in v0.8.0

type Merge[T any] struct {
	// contains filtered or unexported fields
}

we will sort by primary key or clusterby key, so we just need one vector of every batch.

type MergeInterface added in v0.8.0

type MergeInterface interface {
	// contains filtered or unexported methods
}

type ParamExpressionExecutor added in v0.8.0

type ParamExpressionExecutor struct {
	// contains filtered or unexported fields
}

func NewParamExpressionExecutor added in v1.2.0

func NewParamExpressionExecutor(mp *mpool.MPool, pos int, typ types.Type) *ParamExpressionExecutor

func (*ParamExpressionExecutor) Eval added in v0.8.0

func (expr *ParamExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ParamExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) Free added in v0.8.0

func (expr *ParamExpressionExecutor) Free()

func (*ParamExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ParamExpressionExecutor) IsColumnExpr() bool

func (ParamExpressionExecutor) TypeName added in v1.2.0

func (expr ParamExpressionExecutor) TypeName() string

type ReceiveInfo added in v0.7.0

type ReceiveInfo struct {
	// it's useless
	NodeAddr string
	Uuid     uuid.UUID
}

ReceiveInfo used to spec which node, and which registers you need

type ReceiverOperator added in v0.8.0

type ReceiverOperator struct {
	// contains filtered or unexported fields
}

ReceiverOperator need to receive batch from proc.Reg.MergeReceivers

func (*ReceiverOperator) DisableChosen added in v1.0.0

func (r *ReceiverOperator) DisableChosen(idx int)

func (*ReceiverOperator) FreeAllReg added in v0.8.0

func (r *ReceiverOperator) FreeAllReg()

func (*ReceiverOperator) FreeMergeTypeOperator added in v0.8.0

func (r *ReceiverOperator) FreeMergeTypeOperator(failed bool)

func (*ReceiverOperator) FreeSingleReg added in v0.8.0

func (r *ReceiverOperator) FreeSingleReg(regIdx int)

func (*ReceiverOperator) InitReceiver added in v0.8.0

func (r *ReceiverOperator) InitReceiver(proc *process.Process, isMergeType bool)

isMergeType means the receiver operator receive batch from all regs or single by some order Merge/MergeGroup/MergeLimit ... are Merge-Type while Join/Intersect/Minus ... are not

func (*ReceiverOperator) ReceiveBitmapFromChannel added in v1.2.1

func (r *ReceiverOperator) ReceiveBitmapFromChannel(ch chan *bitmap.Bitmap) *bitmap.Bitmap

func (*ReceiverOperator) ReceiveFromAllRegs added in v0.8.0

func (r *ReceiverOperator) ReceiveFromAllRegs(analyze process.Analyze) *process.RegisterMessage

You MUST Init ReceiverOperator with Merge-Type if you want to use this function

func (*ReceiverOperator) ReceiveFromSingleReg added in v0.8.0

func (r *ReceiverOperator) ReceiveFromSingleReg(regIdx int, analyze process.Analyze) *process.RegisterMessage

func (*ReceiverOperator) RemoveChosen added in v1.0.0

func (r *ReceiverOperator) RemoveChosen(idx int)

type ResultPos added in v0.6.0

type ResultPos struct {
	Rel int32
	Pos int32
}

func NewResultPos added in v0.6.0

func NewResultPos(rel int32, pos int32) ResultPos

type RunningPipelineMapForRemoteNode added in v1.2.1

type RunningPipelineMapForRemoteNode struct {
	sync.Mutex
	// contains filtered or unexported fields
}

RunningPipelineMapForRemoteNode is a map to record which pipeline was built for a remote node. these pipelines will send data to a remote node, we record them for a better control for their lives.

type S3Writer added in v0.8.0

type S3Writer struct {

	// Bats[i] used to store the batches of table
	// Each batch in Bats will be sorted internally, and all batches correspond to only one table
	// when the batches' size is over 64M, we will use merge sort, and then write a segment in s3
	Bats []*batch.Batch
	// contains filtered or unexported fields
}

S3Writer is used to write table data to S3 and package a series of `BlockWriter` write operations Currently there are two scenarios will let cn write s3 directly scenario 1 is insert operator directly go s3, when a one-time insert/load data volume is relatively large will trigger the scenario scenario 2 is txn.workspace exceeds the threshold value, in the txn.dumpBatch function trigger a write s3

func AllocPartitionS3Writer added in v0.8.0

func AllocPartitionS3Writer(proc *process.Process, tableDef *plan.TableDef) ([]*S3Writer, error)

AllocPartitionS3Writer Alloc S3 writers for partitioned table.

func AllocS3Writer added in v0.8.0

func AllocS3Writer(proc *process.Process, tableDef *plan.TableDef) (*S3Writer, error)

func (*S3Writer) Free added in v0.8.0

func (w *S3Writer) Free(proc *process.Process)

func (*S3Writer) GenerateWriter added in v0.8.0

func (w *S3Writer) GenerateWriter(proc *process.Process) (objectio.ObjectName, error)

func (*S3Writer) GetBlockInfoBat added in v1.0.0

func (w *S3Writer) GetBlockInfoBat() *batch.Batch

func (*S3Writer) InitBuffers added in v0.8.0

func (w *S3Writer) InitBuffers(proc *process.Process, bat *batch.Batch)

func (*S3Writer) Output added in v0.8.0

func (w *S3Writer) Output(proc *process.Process, result *vm.CallResult) error

func (*S3Writer) Put added in v0.8.0

func (w *S3Writer) Put(bat *batch.Batch, proc *process.Process) bool

Put batch into w.bats , and make sure that each batch in w.bats

contains options.DefaultBlockMaxRows rows except for the last one.
true: the tableBatches[idx] is over threshold
false: the tableBatches[idx] is less than or equal threshold

func (*S3Writer) ResetBlockInfoBat added in v1.0.0

func (w *S3Writer) ResetBlockInfoBat(proc *process.Process)

func (*S3Writer) SetSchemaVer added in v1.0.0

func (w *S3Writer) SetSchemaVer(ver uint32)

func (*S3Writer) SetSeqnums added in v1.0.0

func (w *S3Writer) SetSeqnums(seqnums []uint16)

func (*S3Writer) SetSortIdx added in v0.8.0

func (w *S3Writer) SetSortIdx(sortIdx int)

func (*S3Writer) SetTableName added in v1.0.0

func (w *S3Writer) SetTableName(name string)

func (*S3Writer) SortAndFlush added in v0.8.0

func (w *S3Writer) SortAndFlush(proc *process.Process) error

func (*S3Writer) WriteBlock added in v0.8.0

func (w *S3Writer) WriteBlock(bat *batch.Batch, dataType ...objectio.DataMetaType) error

func (*S3Writer) WriteEndBlocks added in v0.8.0

func (w *S3Writer) WriteEndBlocks(proc *process.Process) ([]objectio.BlockInfo, []objectio.ObjectStats, error)

WriteEndBlocks writes batches in buffer to fileservice(aka s3 in this feature) and get meta data about block on fileservice and put it into metaLocBat For more information, please refer to the comment about func WriteEnd in Writer interface

func (*S3Writer) WriteS3Batch added in v0.8.0

func (w *S3Writer) WriteS3Batch(proc *process.Process, bat *batch.Batch) error

WriteS3Batch logic: S3Writer caches the batches in memory and when the batches size reaches 10M, we add a tag to indicate we need to write these data into s3, but not immediately. We continue to wait until no more data or the data size reaches 64M, at that time we will trigger write s3.

func (*S3Writer) WriteS3CacheBatch added in v0.8.0

func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error

type Server added in v0.7.0

type Server struct {
	// contains filtered or unexported fields
}

func Get added in v1.2.0

func Get() *Server

func NewServer added in v0.7.0

func NewServer(client logservice.CNHAKeeperClient) *Server

func (*Server) CancelRunningPipeline added in v1.2.1

func (srv *Server) CancelRunningPipeline(session morpc.ClientSession, id uint64)

func (*Server) DeleteTxnSegmentIds added in v0.8.0

func (srv *Server) DeleteTxnSegmentIds(sids []objectio.Segmentid)

func (*Server) DeleteUuids added in v1.0.0

func (srv *Server) DeleteUuids(uuids []uuid.UUID)

func (*Server) GenerateObject added in v1.0.0

func (srv *Server) GenerateObject() objectio.ObjectName

GenerateObject used to generate a new object name for CN

func (*Server) GetCnSegmentMap added in v0.8.0

func (srv *Server) GetCnSegmentMap() map[string]int32

func (*Server) GetCnSegmentType added in v0.8.0

func (srv *Server) GetCnSegmentType(sid *objectio.Segmentid) int32

func (*Server) GetProcByUuid added in v1.0.0

func (srv *Server) GetProcByUuid(u uuid.UUID, forcedDelete bool) (*process.Process, bool)

GetProcByUuid used the uuid to get a process from the srv. if the process is nil, it means the process has done. if forcedDelete, do an action to avoid another routine to put a new item.

func (*Server) PutCnSegment added in v0.8.0

func (srv *Server) PutCnSegment(sid *objectio.Segmentid, segmentType int32)

func (*Server) PutProcIntoUuidMap added in v1.0.0

func (srv *Server) PutProcIntoUuidMap(u uuid.UUID, p *process.Process) error

func (*Server) RecordRunningPipeline added in v1.2.1

func (srv *Server) RecordRunningPipeline(session morpc.ClientSession, id uint64, proc *process.Process) (queryCancel bool)

func (*Server) RemoveRunningPipeline added in v1.2.1

func (srv *Server) RemoveRunningPipeline(session morpc.ClientSession, id uint64)

type UuidProcMap added in v0.8.0

type UuidProcMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type VarExpressionExecutor added in v0.8.0

type VarExpressionExecutor struct {
	// contains filtered or unexported fields
}

func NewVarExpressionExecutor added in v1.2.0

func NewVarExpressionExecutor() *VarExpressionExecutor

func (*VarExpressionExecutor) Eval added in v0.8.0

func (expr *VarExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *VarExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) Free added in v0.8.0

func (expr *VarExpressionExecutor) Free()

func (*VarExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *VarExpressionExecutor) IsColumnExpr() bool

func (VarExpressionExecutor) TypeName added in v1.2.0

func (expr VarExpressionExecutor) TypeName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL