Documentation ¶
Index ¶
- Constants
- Variables
- func BatchDataNotNullCheck(tmpBat *batch.Batch, tableDef *plan.TableDef, ctx context.Context) error
- func EvalExpressionOnce(proc *process.Process, planExpr *plan.Expr, batches []*batch.Batch) (*vector.Vector, error)
- func EvaluateFilterByZoneMap(ctx context.Context, proc *process.Process, expr *plan.Expr, ...) (selected bool)
- func FilterRowIdForDel(proc *process.Process, bat *batch.Batch, idx int, primaryKeyIdx int) (*batch.Batch, error)
- func FixProjectionResult(proc *process.Process, executors []ExpressionExecutor, rbat *batch.Batch, ...) (dupSize int, err error)
- func GenerateConstListExpressionExecutor(proc *process.Process, exprs []*plan.Expr) (*vector.Vector, error)
- func GetExprZoneMap(ctx context.Context, proc *process.Process, expr *plan.Expr, ...) (v objectio.ZoneMap)
- func GetNewRelation(eg engine.Engine, dbName, tbleName string, txn client.TxnOperator, ...) (engine.Relation, error)
- func GroupByPartitionForDelete(proc *process.Process, bat *batch.Batch, rowIdIdx int, partitionIdx int, ...) ([]*batch.Batch, error)
- func GroupByPartitionForInsert(proc *process.Process, bat *batch.Batch, attrs []string, pIdx int, ...) ([]*batch.Batch, error)
- func InsertIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, ...) error
- func InsertOneIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, ...) error
- func NewJoinBatch(bat *batch.Batch, mp *mpool.MPool) (*batch.Batch, []func(*vector.Vector, *vector.Vector, int64, int) error)
- func RewriteFilterExprList(list []*plan.Expr) *plan.Expr
- func SetJoinBatchValues(joinBat, bat *batch.Batch, sel int64, length int, ...) error
- func SortInFilter(vec *vector.Vector)
- func SplitAndExprs(list []*plan.Expr) []*plan.Expr
- type CnSegmentMap
- type ColumnExpressionExecutor
- func (expr *ColumnExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *ColumnExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *ColumnExpressionExecutor) Free()
- func (expr *ColumnExpressionExecutor) GetColIndex() int
- func (expr *ColumnExpressionExecutor) GetRelIndex() int
- func (expr *ColumnExpressionExecutor) IsColumnExpr() bool
- type ExpressionExecutor
- type FixedVectorExpressionExecutor
- func (expr *FixedVectorExpressionExecutor) Eval(_ *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *FixedVectorExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *FixedVectorExpressionExecutor) Free()
- func (expr *FixedVectorExpressionExecutor) IsColumnExpr() bool
- type FunctionExpressionExecutor
- func (expr *FunctionExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *FunctionExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *FunctionExpressionExecutor) Free()
- func (expr *FunctionExpressionExecutor) Init(proc *process.Process, parameterNum int, retType types.Type, ...) (err error)
- func (expr *FunctionExpressionExecutor) IsColumnExpr() bool
- func (expr *FunctionExpressionExecutor) SetParameter(index int, executor ExpressionExecutor)
- type Merge
- type MergeHeap
- type MergeInterface
- type MixData
- type ParamExpressionExecutor
- func (expr *ParamExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *ParamExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *ParamExpressionExecutor) Free()
- func (expr *ParamExpressionExecutor) IsColumnExpr() bool
- type ReceiveInfo
- type ReceiverOperator
- func (r *ReceiverOperator) CloseAllReg()
- func (r *ReceiverOperator) FreeAllReg()
- func (r *ReceiverOperator) FreeMergeTypeOperator(failed bool)
- func (r *ReceiverOperator) FreeSingleReg(regIdx int)
- func (r *ReceiverOperator) InitReceiver(proc *process.Process, isMergeType bool)
- func (r *ReceiverOperator) ReceiveFromAllRegs(analyze process.Analyze) (*batch.Batch, bool, error)
- func (r *ReceiverOperator) ReceiveFromSingleReg(regIdx int, analyze process.Analyze) (*batch.Batch, bool, error)
- func (r *ReceiverOperator) ReceiveFromSingleRegNonBlock(regIdx int, analyze process.Analyze) (*batch.Batch, bool, error)
- type ResultPos
- type RuntimeFilterChan
- type S3Writer
- func (w *S3Writer) Free(proc *process.Process)
- func (w *S3Writer) GenerateWriter(proc *process.Process) (objectio.ObjectName, error)
- func (w *S3Writer) GetBlockInfoBat() *batch.Batch
- func (w *S3Writer) InitBuffers(proc *process.Process, bat *batch.Batch)
- func (w *S3Writer) Output(proc *process.Process) error
- func (w *S3Writer) Put(bat *batch.Batch, proc *process.Process) bool
- func (w *S3Writer) ResetBlockInfoBat(proc *process.Process)
- func (w *S3Writer) SetSchemaVer(ver uint32)
- func (w *S3Writer) SetSeqnums(seqnums []uint16)
- func (w *S3Writer) SetSortIdx(sortIdx int)
- func (w *S3Writer) SetTableName(name string)
- func (w *S3Writer) SortAndFlush(proc *process.Process) error
- func (w *S3Writer) WriteBlock(bat *batch.Batch, dataType ...objectio.DataMetaType) error
- func (w *S3Writer) WriteEndBlocks(proc *process.Process) ([]catalog.BlockInfo, error)
- func (w *S3Writer) WriteS3Batch(proc *process.Process, bat *batch.Batch) error
- func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error
- type Server
- func (srv *Server) DeleteTxnSegmentIds(sids []objectio.Segmentid)
- func (srv *Server) DeleteUuids(uuids []uuid.UUID)
- func (srv *Server) GenerateObject() objectio.ObjectName
- func (srv *Server) GetCnSegmentMap() map[string]int32
- func (srv *Server) GetCnSegmentType(sid *objectio.Segmentid) int32
- func (srv *Server) GetConnector(id uint64) *process.WaitRegister
- func (srv *Server) GetProcByUuid(u uuid.UUID) (*process.Process, bool)
- func (srv *Server) PutCnSegment(sid *objectio.Segmentid, segmentType int32)
- func (srv *Server) PutProcIntoUuidMap(u uuid.UUID, p *process.Process) error
- func (srv *Server) RegistConnector(reg *process.WaitRegister) uint64
- type UuidProcMap
- type VarExpressionExecutor
- func (expr *VarExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *VarExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)
- func (expr *VarExpressionExecutor) Free()
- func (expr *VarExpressionExecutor) IsColumnExpr() bool
Constants ¶
const ( // 'mo_indexes' table MO_INDEX_ID = "id" MO_INDEX_TABLE_ID = "table_id" MO_INDEX_DATABASE_ID = "database_id" MO_INDEX_NAME = "name" MO_INDEX_TYPE = "type" MO_INDEX_IS_VISIBLE = "is_visible" MO_INDEX_HIDDEN = "hidden" MO_INDEX_COMMENT = "comment" MO_INDEX_OPTIONS = "options" MO_INDEX_COLUMN_NAME = "column_name" MO_INDEX_ORDINAL_POSITION = "ordinal_position" MO_INDEX_TABLE_NAME = "index_table_name" MO_INDEX_PRIKEY = catalog.CPrimaryKeyColName )
const ( INDEX_TYPE_PRIMARY = "PRIMARY" INDEX_TYPE_UNIQUE = "UNIQUE" INDEX_TYPE_MULTIPLE = "MULTIPLE" )
const ( // WriteS3Threshold when batches' size of table reaches this, we will // trigger write s3 WriteS3Threshold uint64 = 128 * mpool.MB TagS3Size uint64 = 10 * mpool.MB TagS3SizeForMOLogger uint64 = 1 * mpool.MB )
const ( TxnWorkSpaceIdType = 1 CnBlockIdType = 2 )
const (
ALLOCID_INDEX_KEY = "index_key"
)
Variables ¶
var MO_INDEX_COLTYPE = map[string]types.T{ MO_INDEX_ID: types.T_uint64, MO_INDEX_TABLE_ID: types.T_uint64, MO_INDEX_DATABASE_ID: types.T_uint64, MO_INDEX_NAME: types.T_varchar, MO_INDEX_TYPE: types.T_varchar, MO_INDEX_IS_VISIBLE: types.T_int8, MO_INDEX_HIDDEN: types.T_int8, MO_INDEX_COMMENT: types.T_varchar, MO_INDEX_COLUMN_NAME: types.T_varchar, MO_INDEX_ORDINAL_POSITION: types.T_uint32, MO_INDEX_OPTIONS: types.T_text, MO_INDEX_TABLE_NAME: types.T_varchar, MO_INDEX_PRIKEY: types.T_varchar, }
Column type mapping of table 'mo_indexes'
Functions ¶
func BatchDataNotNullCheck ¶ added in v0.8.0
func EvalExpressionOnce ¶ added in v0.8.0
func EvaluateFilterByZoneMap ¶ added in v0.8.0
func FilterRowIdForDel ¶ added in v0.8.0
func FixProjectionResult ¶ added in v0.8.0
func FixProjectionResult(proc *process.Process, executors []ExpressionExecutor, rbat *batch.Batch, sbat *batch.Batch) (dupSize int, err error)
FixProjectionResult set result vector for rbat. sbat is the source batch.
func GenerateConstListExpressionExecutor ¶ added in v1.0.0
func GetExprZoneMap ¶ added in v0.8.0
func GetNewRelation ¶ added in v0.6.0
func GroupByPartitionForDelete ¶ added in v0.8.0
func GroupByPartitionForDelete(proc *process.Process, bat *batch.Batch, rowIdIdx int, partitionIdx int, partitionNum int, pkIdx int) ([]*batch.Batch, error)
GroupByPartitionForDeleteS3: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch
func GroupByPartitionForInsert ¶ added in v0.8.0
func GroupByPartitionForInsert(proc *process.Process, bat *batch.Batch, attrs []string, pIdx int, partitionNum int) ([]*batch.Batch, error)
GroupByPartitionForInsert: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch
func InsertIndexMetadata ¶ added in v0.8.0
func InsertIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string) error
InsertIndexMetadata :Synchronize the index metadata information of the table to the index metadata table
func InsertOneIndexMetadata ¶ added in v0.8.0
func InsertOneIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string, idxdef *plan.IndexDef) error
InsertOneIndexMetadata :Synchronize the single index metadata information into the index metadata table
func NewJoinBatch ¶ added in v0.8.0
func RewriteFilterExprList ¶
RewriteFilterExprList will convert an expression list to be an AndExpr
func SetJoinBatchValues ¶ added in v0.8.0
func SortInFilter ¶ added in v0.8.0
Types ¶
type CnSegmentMap ¶ added in v0.8.0
type ColumnExpressionExecutor ¶ added in v0.8.0
type ColumnExpressionExecutor struct {
// contains filtered or unexported fields
}
func (*ColumnExpressionExecutor) EvalWithoutResultReusing ¶ added in v0.8.0
func (*ColumnExpressionExecutor) Free ¶ added in v0.8.0
func (expr *ColumnExpressionExecutor) Free()
func (*ColumnExpressionExecutor) GetColIndex ¶ added in v1.0.0
func (expr *ColumnExpressionExecutor) GetColIndex() int
func (*ColumnExpressionExecutor) GetRelIndex ¶ added in v1.0.0
func (expr *ColumnExpressionExecutor) GetRelIndex() int
func (*ColumnExpressionExecutor) IsColumnExpr ¶ added in v0.8.0
func (expr *ColumnExpressionExecutor) IsColumnExpr() bool
type ExpressionExecutor ¶ added in v0.8.0
type ExpressionExecutor interface { // Eval will return the result vector of expression. // the result memory is reused, so it should not be modified or saved. // If it needs, it should be copied by vector.Dup(). Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error) // EvalWithoutResultReusing is the same as Eval, but it will not reuse the memory of result vector. // so you can save the result vector directly. but should be careful about memory leak. // and watch out that maybe the vector is one of the input vectors of batches. EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error) // Free should release all memory of executor. // it will be called after query has done. Free() IsColumnExpr() bool }
ExpressionExecutor generated from plan.Expr, can evaluate the result from vectors directly.
func NewExpressionExecutor ¶ added in v0.8.0
func NewExpressionExecutorsFromPlanExpressions ¶ added in v0.8.0
type FixedVectorExpressionExecutor ¶ added in v0.8.0
type FixedVectorExpressionExecutor struct {
// contains filtered or unexported fields
}
FixedVectorExpressionExecutor the content of its vector is fixed. e.g.
ConstVector [1, 1, 1, 1, 1] ConstVector [null, null, null] ListVector ["1", "2", "3", null, "5"]
func (*FixedVectorExpressionExecutor) EvalWithoutResultReusing ¶ added in v0.8.0
func (*FixedVectorExpressionExecutor) Free ¶ added in v0.8.0
func (expr *FixedVectorExpressionExecutor) Free()
func (*FixedVectorExpressionExecutor) IsColumnExpr ¶ added in v0.8.0
func (expr *FixedVectorExpressionExecutor) IsColumnExpr() bool
type FunctionExpressionExecutor ¶ added in v0.8.0
type FunctionExpressionExecutor struct {
// contains filtered or unexported fields
}
func (*FunctionExpressionExecutor) EvalWithoutResultReusing ¶ added in v0.8.0
func (*FunctionExpressionExecutor) Free ¶ added in v0.8.0
func (expr *FunctionExpressionExecutor) Free()
func (*FunctionExpressionExecutor) IsColumnExpr ¶ added in v0.8.0
func (expr *FunctionExpressionExecutor) IsColumnExpr() bool
func (*FunctionExpressionExecutor) SetParameter ¶ added in v0.8.0
func (expr *FunctionExpressionExecutor) SetParameter(index int, executor ExpressionExecutor)
type Merge ¶ added in v0.8.0
type Merge[T any] struct { // contains filtered or unexported fields }
we will sort by primary key or clusterby key, so we just need one vector of every batch.
func (*Merge[T]) GetNextPos ¶ added in v0.8.0
type MergeHeap ¶ added in v0.8.0
type MergeHeap[T any] struct { // contains filtered or unexported fields }
MergeHeap will take null first rule
func NewMergeHeap ¶ added in v0.8.0
type MergeInterface ¶ added in v0.8.0
type MixData ¶ added in v0.8.0
type MixData[T any] struct { // contains filtered or unexported fields }
type ParamExpressionExecutor ¶ added in v0.8.0
type ParamExpressionExecutor struct {
// contains filtered or unexported fields
}
func (*ParamExpressionExecutor) EvalWithoutResultReusing ¶ added in v0.8.0
func (*ParamExpressionExecutor) Free ¶ added in v0.8.0
func (expr *ParamExpressionExecutor) Free()
func (*ParamExpressionExecutor) IsColumnExpr ¶ added in v0.8.0
func (expr *ParamExpressionExecutor) IsColumnExpr() bool
type ReceiveInfo ¶ added in v0.7.0
ReceiveInfo used to spec which node, and which registers you need
type ReceiverOperator ¶ added in v0.8.0
type ReceiverOperator struct {
// contains filtered or unexported fields
}
ReceiverOperator need to receive batch from proc.Reg.MergeReceivers
func (*ReceiverOperator) CloseAllReg ¶
func (r *ReceiverOperator) CloseAllReg()
func (*ReceiverOperator) FreeAllReg ¶ added in v0.8.0
func (r *ReceiverOperator) FreeAllReg()
func (*ReceiverOperator) FreeMergeTypeOperator ¶ added in v0.8.0
func (r *ReceiverOperator) FreeMergeTypeOperator(failed bool)
func (*ReceiverOperator) FreeSingleReg ¶ added in v0.8.0
func (r *ReceiverOperator) FreeSingleReg(regIdx int)
clean up the batch left in channel
func (*ReceiverOperator) InitReceiver ¶ added in v0.8.0
func (r *ReceiverOperator) InitReceiver(proc *process.Process, isMergeType bool)
isMergeType means the receiver operator receive batch from all regs or single by some order Merge/MergeGroup/MergeLimit ... are Merge-Type while Join/Intersect/Minus ... are not
func (*ReceiverOperator) ReceiveFromAllRegs ¶ added in v0.8.0
You MUST Init ReceiverOperator with Merge-Type if you want to use this function
func (*ReceiverOperator) ReceiveFromSingleReg ¶ added in v0.8.0
func (*ReceiverOperator) ReceiveFromSingleRegNonBlock ¶ added in v1.0.0
type ResultPos ¶ added in v0.6.0
func NewResultPos ¶ added in v0.6.0
type RuntimeFilterChan ¶ added in v0.8.0
type RuntimeFilterChan struct { Spec *plan.RuntimeFilterSpec Chan chan *pipeline.RuntimeFilter }
type S3Writer ¶ added in v0.8.0
type S3Writer struct { // Bats[i] used to store the batches of table // Each batch in Bats will be sorted internally, and all batches correspond to only one table // when the batches' size is over 64M, we will use merge sort, and then write a segment in s3 Bats []*batch.Batch // contains filtered or unexported fields }
S3Writer is used to write table data to S3 and package a series of `BlockWriter` write operations Currently there are two scenarios will let cn write s3 directly scenario 1 is insert operator directly go s3, when a one-time insert/load data volume is relatively large will trigger the scenario scenario 2 is txn.workspace exceeds the threshold value, in the txn.dumpBatch function trigger a write s3
func AllocPartitionS3Writer ¶ added in v0.8.0
AllocPartitionS3Writer Alloc S3 writers for partitioned table.
func AllocS3Writer ¶ added in v0.8.0
func (*S3Writer) GenerateWriter ¶ added in v0.8.0
func (*S3Writer) GetBlockInfoBat ¶ added in v1.0.0
func (*S3Writer) InitBuffers ¶ added in v0.8.0
func (*S3Writer) Put ¶ added in v0.8.0
Put batch into w.bats , and make sure that each batch in w.bats
contains options.DefaultBlockMaxRows rows except for the last one. true: the tableBatches[idx] is over threshold false: the tableBatches[idx] is less than or equal threshold
func (*S3Writer) ResetBlockInfoBat ¶ added in v1.0.0
func (*S3Writer) SetSchemaVer ¶ added in v1.0.0
func (*S3Writer) SetSeqnums ¶ added in v1.0.0
func (*S3Writer) SetSortIdx ¶ added in v0.8.0
func (*S3Writer) SetTableName ¶ added in v1.0.0
func (*S3Writer) SortAndFlush ¶ added in v0.8.0
func (*S3Writer) WriteBlock ¶ added in v0.8.0
func (*S3Writer) WriteEndBlocks ¶ added in v0.8.0
WriteEndBlocks writes batches in buffer to fileservice(aka s3 in this feature) and get meta data about block on fileservice and put it into metaLocBat For more information, please refer to the comment about func WriteEnd in Writer interface
func (*S3Writer) WriteS3Batch ¶ added in v0.8.0
WriteS3Batch logic: S3Writer caches the batches in memory and when the batches size reaches 10M, we add a tag to indicate we need to write these data into s3, but not immediately. We continue to wait until no more data or the data size reaches 64M, at that time we will trigger write s3.
type Server ¶ added in v0.7.0
type Server struct { sync.Mutex CNSegmentId types.Uuid InitSegmentId bool // contains filtered or unexported fields }
TODO: remove batchCntMap when dispatch executor using the stream correctly Server used to support cn2s3 directly, for more info, refer to docs about it
var Srv *Server
func NewServer ¶ added in v0.7.0
func NewServer(client logservice.CNHAKeeperClient) *Server
func (*Server) DeleteTxnSegmentIds ¶ added in v0.8.0
func (*Server) DeleteUuids ¶ added in v1.0.0
func (*Server) GenerateObject ¶ added in v1.0.0
func (srv *Server) GenerateObject() objectio.ObjectName
SegmentId is part of Id for cn2s3 directly, for more info, refer to docs about it
func (*Server) GetCnSegmentMap ¶ added in v0.8.0
func (*Server) GetCnSegmentType ¶ added in v0.8.0
func (*Server) GetConnector ¶ added in v0.7.0
func (srv *Server) GetConnector(id uint64) *process.WaitRegister
func (*Server) GetProcByUuid ¶ added in v1.0.0
func (*Server) PutCnSegment ¶ added in v0.8.0
func (*Server) PutProcIntoUuidMap ¶ added in v1.0.0
func (*Server) RegistConnector ¶ added in v0.7.0
func (srv *Server) RegistConnector(reg *process.WaitRegister) uint64
type UuidProcMap ¶ added in v0.8.0
type VarExpressionExecutor ¶ added in v0.8.0
type VarExpressionExecutor struct {
// contains filtered or unexported fields
}
func (*VarExpressionExecutor) EvalWithoutResultReusing ¶ added in v0.8.0
func (*VarExpressionExecutor) Free ¶ added in v0.8.0
func (expr *VarExpressionExecutor) Free()
func (*VarExpressionExecutor) IsColumnExpr ¶ added in v0.8.0
func (expr *VarExpressionExecutor) IsColumnExpr() bool