process

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecStop = iota
	ExecNext
	ExecHasMore
)
View Source
const (
	VectorLimit = 32
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Analyze added in v0.6.0

type Analyze interface {
	Stop()
	ChildrenCallStop(time.Time)
	Start()
	Alloc(int64)
	Input(*batch.Batch, bool)
	Output(*batch.Batch, bool)
	WaitStop(time.Time)
	DiskIO(*batch.Batch)
	S3IOByte(*batch.Batch)
	S3IOInputCount(int)
	S3IOOutputCount(int)
	Network(*batch.Batch)
	AddScanTime(t time.Time)
	AddInsertTime(t time.Time)
}

Analyze analyzes information for operator

type AnalyzeInfo added in v0.5.1

type AnalyzeInfo struct {
	// NodeId, index of query's node list
	NodeId int32
	// InputRows, number of rows accepted by node
	InputRows int64
	// OutputRows, number of rows output by node
	OutputRows int64
	// TimeConsumed, time taken by the node in milliseconds
	TimeConsumed int64
	// WaitTimeConsumed, time taken by the node waiting for channel in milliseconds
	WaitTimeConsumed int64
	// InputSize, data size accepted by node
	InputSize int64
	// OutputSize, data size output by node
	OutputSize int64
	// MemorySize, memory alloc by node
	MemorySize int64
	// DiskIO, data size read from disk
	DiskIO int64
	// S3IOByte, data size read from s3
	S3IOByte int64
	// S3IOInputCount, count for PUT, COPY, POST and LIST
	S3IOInputCount int64
	// S3IOOutputCount, count for GET, SELECT and other
	S3IOOutputCount int64
	// NetworkIO, message size send between CN node
	NetworkIO int64
	// ScanTime, scan cost time in external scan
	ScanTime int64
	// InsertTime, insert cost time in load flow
	InsertTime int64

	TimeConsumedArrayMajor []int64
	TimeConsumedArrayMinor []int64
	// contains filtered or unexported fields
}

AnalyzeInfo analyze information for query

func NewAnalyzeInfo added in v0.6.0

func NewAnalyzeInfo() *AnalyzeInfo

func (*AnalyzeInfo) AddNewParallel added in v1.1.0

func (a *AnalyzeInfo) AddNewParallel(major bool) int

func (*AnalyzeInfo) AddSingleParallelTimeConsumed added in v1.1.0

func (a *AnalyzeInfo) AddSingleParallelTimeConsumed(major bool, parallelIdx int, t int64)

func (*AnalyzeInfo) DeepCopyArray added in v1.1.0

func (a *AnalyzeInfo) DeepCopyArray(pa *plan.AnalyzeInfo)

func (*AnalyzeInfo) MergeArray added in v1.1.0

func (a *AnalyzeInfo) MergeArray(pa *plan.AnalyzeInfo)

type ExecStatus added in v1.0.0

type ExecStatus int

type Limitation

type Limitation struct {
	// Size, memory threshold for operator.
	Size int64
	// BatchRows, max rows for batch.
	BatchRows int64
	// BatchSize, max size for batch.
	BatchSize int64
	// PartitionRows, max rows for partition.
	PartitionRows int64
	// ReaderSize, memory threshold for storage's reader
	ReaderSize int64
	// MaxMessageSize max size for read messages from dn
	MaxMsgSize uint64
}

Limitation specifies the maximum resources that can be used in one query.

type Process

type Process struct {
	StmtProfile *StmtProfile
	// Id, query id.
	Id  string
	Reg Register
	Lim Limitation

	// unix timestamp
	UnixTime int64

	TxnClient client.TxnClient

	TxnOperator client.TxnOperator

	AnalInfos []*AnalyzeInfo

	SessionInfo SessionInfo

	Ctx context.Context

	Cancel context.CancelFunc

	FileService fileservice.FileService
	LockService lockservice.LockService
	IncrService incrservice.AutoIncrementService

	LoadTag bool

	LastInsertID *uint64

	LoadLocalReader *io.PipeReader

	DispatchNotifyCh chan WrapCs

	Aicm *defines.AutoIncrCacheManager

	QueryService queryservice.QueryService

	Hakeeper logservice.CNHAKeeperClient

	UdfService udf.Service
	// contains filtered or unexported fields
}

Process contains context used in query execution one or more pipeline will be generated for one query, and one pipeline has one process instance.

func New

func New(
	ctx context.Context,
	m *mpool.MPool,
	txnClient client.TxnClient,
	txnOperator client.TxnOperator,
	fileService fileservice.FileService,
	lockService lockservice.LockService,
	queryService queryservice.QueryService,
	hakeeper logservice.CNHAKeeperClient,
	udfService udf.Service,
	aicm *defines.AutoIncrCacheManager) *Process

New creates a new Process. A process stores the execution context.

func NewFromProc added in v0.5.0

func NewFromProc(p *Process, ctx context.Context, regNumber int) *Process

NewFromProc create a new Process based on another process.

func NewWithAnalyze added in v0.6.0

func NewWithAnalyze(p *Process, ctx context.Context, regNumber int, anals []*AnalyzeInfo) *Process

func (*Process) AllocVectorOfRows added in v0.6.0

func (proc *Process) AllocVectorOfRows(typ types.Type, nele int, nsp *nulls.Nulls) (*vector.Vector, error)

func (*Process) CleanValueScanBatchs added in v1.0.0

func (proc *Process) CleanValueScanBatchs()

func (*Process) CopyValueScanBatch added in v0.8.0

func (proc *Process) CopyValueScanBatch(src *Process)

func (*Process) CopyVectorPool added in v0.8.0

func (proc *Process) CopyVectorPool(src *Process)

func (*Process) FreeVectors added in v0.8.0

func (proc *Process) FreeVectors()

func (*Process) GetAnalyze added in v0.6.0

func (proc *Process) GetAnalyze(idx, parallelIdx int, parallelMajor bool) Analyze

func (*Process) GetLastInsertID added in v0.7.0

func (proc *Process) GetLastInsertID() uint64

func (*Process) GetMPool added in v0.6.0

func (proc *Process) GetMPool() *mpool.MPool

func (*Process) GetPrepareBatch added in v0.8.0

func (proc *Process) GetPrepareBatch() *batch.Batch

func (*Process) GetPrepareExprList added in v0.8.0

func (proc *Process) GetPrepareExprList() any

func (*Process) GetPrepareParams added in v0.8.0

func (proc *Process) GetPrepareParams() *vector.Vector

func (*Process) GetPrepareParamsAt added in v0.8.0

func (proc *Process) GetPrepareParamsAt(i int) ([]byte, error)

func (*Process) GetResolveVariableFunc added in v0.8.0

func (proc *Process) GetResolveVariableFunc() func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error)

func (*Process) GetSessionInfo added in v0.8.0

func (proc *Process) GetSessionInfo() *SessionInfo

func (*Process) GetStmtProfile added in v1.0.0

func (proc *Process) GetStmtProfile() *StmtProfile

func (*Process) GetValueScanBatch added in v0.8.0

func (proc *Process) GetValueScanBatch(key uuid.UUID) *batch.Batch

func (*Process) GetValueScanBatchs added in v0.8.0

func (proc *Process) GetValueScanBatchs() []*batch.Batch

func (*Process) GetVector added in v0.8.0

func (proc *Process) GetVector(typ types.Type) *vector.Vector

func (*Process) InitSeq added in v0.8.0

func (proc *Process) InitSeq()

func (*Process) InputBatch added in v0.6.0

func (proc *Process) InputBatch() *batch.Batch

func (*Process) MarshalBinary added in v0.6.0

func (proc *Process) MarshalBinary() ([]byte, error)

func (*Process) Mp

func (proc *Process) Mp() *mpool.MPool

func (*Process) OperatorOutofMemory added in v0.6.0

func (proc *Process) OperatorOutofMemory(size int64) bool

func (*Process) PutBatch added in v0.8.0

func (proc *Process) PutBatch(bat *batch.Batch)

func (*Process) PutVector added in v0.8.0

func (proc *Process) PutVector(vec *vector.Vector)

func (*Process) QueryId added in v0.6.0

func (proc *Process) QueryId() string

func (*Process) ResetContextFromParent added in v0.8.0

func (proc *Process) ResetContextFromParent(parent context.Context) context.Context

func (*Process) SetCacheForAutoCol added in v0.8.0

func (proc *Process) SetCacheForAutoCol(name string)

func (*Process) SetInputBatch added in v0.6.0

func (proc *Process) SetInputBatch(bat *batch.Batch)

func (*Process) SetLastInsertID added in v0.7.0

func (proc *Process) SetLastInsertID(num uint64)

func (*Process) SetPrepareBatch added in v0.8.0

func (proc *Process) SetPrepareBatch(bat *batch.Batch)

func (*Process) SetPrepareExprList added in v0.8.0

func (proc *Process) SetPrepareExprList(exprList any)

func (*Process) SetPrepareParams added in v0.8.0

func (proc *Process) SetPrepareParams(prepareParams *vector.Vector)

func (*Process) SetQueryId added in v0.6.0

func (proc *Process) SetQueryId(id string)

func (*Process) SetResolveVariableFunc added in v0.8.0

func (proc *Process) SetResolveVariableFunc(f func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error))

func (*Process) SetStmtProfile added in v1.0.0

func (proc *Process) SetStmtProfile(sp *StmtProfile)

func (*Process) SetValueScanBatch added in v0.8.0

func (proc *Process) SetValueScanBatch(key uuid.UUID, batch *batch.Batch)

func (*Process) SetVectorPoolSize added in v1.0.0

func (proc *Process) SetVectorPoolSize(limit int)

func (*Process) UnmarshalBinary added in v0.6.0

func (proc *Process) UnmarshalBinary(_ []byte) error

func (*Process) WithSpanContext added in v0.7.0

func (proc *Process) WithSpanContext(sc trace.SpanContext)

type Register

type Register struct {
	// Ss, temporarily stores the row number list in the execution of operators,
	// and it can be reused in the future execution.
	Ss [][]int64
	// InputBatch, stores the result of the previous operator.
	InputBatch *batch.Batch
	// MergeReceivers, receives result of multi previous operators from other pipelines
	// e.g. merge operator.
	MergeReceivers []*WaitRegister
}

Register used in execution pipeline and shared with all operators of the same pipeline.

type SessionInfo added in v0.5.0

type SessionInfo struct {
	Account        string
	User           string
	Host           string
	Role           string
	ConnectionID   uint64
	AccountId      uint32
	RoleId         uint32
	UserId         uint32
	LastInsertID   uint64
	Database       string
	Version        string
	TimeZone       *time.Location
	StorageEngine  engine.Engine
	QueryId        []string
	ResultColTypes []types.Type
	SeqCurValues   map[uint64]string
	SeqDeleteKeys  []uint64
	SeqAddValues   map[uint64]string
	SeqLastValue   []string
	SqlHelper      sqlHelper
	Buf            *buffer.Buffer
}

SessionInfo session information

func (*SessionInfo) GetCharset added in v0.5.0

func (si *SessionInfo) GetCharset() string

func (*SessionInfo) GetCollation added in v0.5.0

func (si *SessionInfo) GetCollation() string

func (*SessionInfo) GetConnectionID added in v0.5.0

func (si *SessionInfo) GetConnectionID() uint64

func (*SessionInfo) GetDatabase added in v0.5.0

func (si *SessionInfo) GetDatabase() string

func (*SessionInfo) GetHost added in v0.5.0

func (si *SessionInfo) GetHost() string

func (*SessionInfo) GetRole added in v0.5.0

func (si *SessionInfo) GetRole() string

func (*SessionInfo) GetUser added in v0.5.0

func (si *SessionInfo) GetUser() string

func (*SessionInfo) GetUserHost added in v0.5.0

func (si *SessionInfo) GetUserHost() string

func (*SessionInfo) GetVersion added in v0.5.0

func (si *SessionInfo) GetVersion() string

type StmtProfile added in v1.0.0

type StmtProfile struct {
	// contains filtered or unexported fields
}

StmtProfile will be clear for every statement

func (*StmtProfile) Clear added in v1.0.0

func (sp *StmtProfile) Clear()

func (*StmtProfile) GetQueryStart added in v1.0.0

func (sp *StmtProfile) GetQueryStart() time.Time

func (*StmtProfile) GetQueryType added in v1.0.0

func (sp *StmtProfile) GetQueryType() string

func (*StmtProfile) GetSqlOfStmt added in v1.0.0

func (sp *StmtProfile) GetSqlOfStmt() string

func (*StmtProfile) GetSqlSourceType added in v1.0.0

func (sp *StmtProfile) GetSqlSourceType() string

func (*StmtProfile) GetStmtId added in v1.0.0

func (sp *StmtProfile) GetStmtId() uuid.UUID

func (*StmtProfile) GetStmtType added in v1.0.0

func (sp *StmtProfile) GetStmtType() string

func (*StmtProfile) GetTxnId added in v1.0.0

func (sp *StmtProfile) GetTxnId() uuid.UUID

func (*StmtProfile) SetQueryStart added in v1.0.0

func (sp *StmtProfile) SetQueryStart(t time.Time)

func (*StmtProfile) SetQueryType added in v1.0.0

func (sp *StmtProfile) SetQueryType(qt string)

func (*StmtProfile) SetSqlOfStmt added in v1.0.0

func (sp *StmtProfile) SetSqlOfStmt(sot string)

func (*StmtProfile) SetSqlSourceType added in v1.0.0

func (sp *StmtProfile) SetSqlSourceType(st string)

func (*StmtProfile) SetStmtId added in v1.0.0

func (sp *StmtProfile) SetStmtId(id uuid.UUID)

func (*StmtProfile) SetStmtType added in v1.0.0

func (sp *StmtProfile) SetStmtType(st string)

func (*StmtProfile) SetTxnId added in v1.0.0

func (sp *StmtProfile) SetTxnId(id []byte)

type WaitRegister

type WaitRegister struct {
	Ctx context.Context
	Ch  chan *batch.Batch
}

WaitRegister channel

func (*WaitRegister) CleanChannel added in v1.0.0

func (wreg *WaitRegister) CleanChannel(m *mpool.MPool)

func (*WaitRegister) MarshalBinary added in v0.6.0

func (wreg *WaitRegister) MarshalBinary() ([]byte, error)

func (*WaitRegister) UnmarshalBinary added in v0.6.0

func (wreg *WaitRegister) UnmarshalBinary(_ []byte) error

type WrapCs added in v0.7.0

type WrapCs struct {
	MsgId uint64
	Uid   uuid.UUID
	Cs    morpc.ClientSession
	Err   chan error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL