Documentation ¶
Index ¶
- Constants
- Variables
- func MatchAddress(m Message, raddr *MessageAddress) bool
- type Analyze
- type AnalyzeInfo
- type ExecStatus
- type Limitation
- type Message
- type MessageAddress
- type MessageBoard
- type MessageReceiver
- type MsgType
- type Process
- func (proc *Process) AllocVectorOfRows(typ types.Type, nele int, nsp *nulls.Nulls) (*vector.Vector, error)
- func (proc *Process) AppendToFixedSizeFromOffset(dst *batch.Batch, src *batch.Batch, offset int) (*batch.Batch, int, error)
- func (proc *Process) CleanValueScanBatchs()
- func (proc *Process) CopyValueScanBatch(src *Process)
- func (proc *Process) CopyVectorPool(src *Process)
- func (proc *Process) Debug(ctx context.Context, msg string, fields ...zap.Field)
- func (proc *Process) Debugf(ctx context.Context, msg string, args ...any)
- func (proc *Process) Error(ctx context.Context, msg string, fields ...zap.Field)
- func (proc *Process) Errorf(ctx context.Context, msg string, args ...any)
- func (proc *Process) Fatal(ctx context.Context, msg string, fields ...zap.Field)
- func (proc *Process) Fatalf(ctx context.Context, msg string, args ...any)
- func (proc *Process) FinalizeRuntimeFilter(m *plan.RuntimeFilterSpec)
- func (proc *Process) FreeVectors()
- func (proc *Process) GetAnalyze(idx, parallelIdx int, parallelMajor bool) Analyze
- func (proc *Process) GetCloneTxnOperator() client.TxnOperator
- func (proc *Process) GetLastInsertID() uint64
- func (proc *Process) GetMPool() *mpool.MPool
- func (proc *Process) GetPrepareBatch() *batch.Batch
- func (proc *Process) GetPrepareExprList() any
- func (proc *Process) GetPrepareParams() *vector.Vector
- func (proc *Process) GetPrepareParamsAt(i int) ([]byte, error)
- func (proc *Process) GetResolveVariableFunc() func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error)
- func (proc *Process) GetSessionInfo() *SessionInfo
- func (proc *Process) GetStmtProfile() *StmtProfile
- func (proc *Process) GetValueScanBatch(key uuid.UUID) *batch.Batch
- func (proc *Process) GetValueScanBatchs() []*batch.Batch
- func (proc *Process) GetVector(typ types.Type) *vector.Vector
- func (proc *Process) Info(ctx context.Context, msg string, fields ...zap.Field)
- func (proc *Process) Infof(ctx context.Context, msg string, args ...any)
- func (proc *Process) InitSeq()
- func (proc *Process) InputBatch() *batch.Batch
- func (proc *Process) MarshalBinary() ([]byte, error)
- func (proc *Process) Mp() *mpool.MPool
- func (proc *Process) NewBatchFromSrc(src *batch.Batch, preAllocSize int) (*batch.Batch, error)
- func (proc *Process) NewMessageReceiver(tags []int32, addr MessageAddress) *MessageReceiver
- func (proc *Process) OperatorOutofMemory(size int64) bool
- func (proc *Process) PutBatch(bat *batch.Batch)
- func (proc *Process) PutVector(vec *vector.Vector)
- func (proc *Process) QueryId() string
- func (proc *Process) ResetContextFromParent(parent context.Context) context.Context
- func (proc *Process) SendMessage(m Message)
- func (proc *Process) SendRuntimeFilter(rt RuntimeFilterMessage, m *plan.RuntimeFilterSpec)
- func (proc *Process) SetCacheForAutoCol(name string)
- func (proc *Process) SetCloneTxnOperator(op client.TxnOperator)
- func (proc *Process) SetInputBatch(bat *batch.Batch)
- func (proc *Process) SetLastInsertID(num uint64)
- func (proc *Process) SetPrepareBatch(bat *batch.Batch)
- func (proc *Process) SetPrepareExprList(exprList any)
- func (proc *Process) SetPrepareParams(prepareParams *vector.Vector)
- func (proc *Process) SetQueryId(id string)
- func (proc *Process) SetResolveVariableFunc(f func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error))
- func (proc *Process) SetStmtProfile(sp *StmtProfile)
- func (proc *Process) SetValueScanBatch(key uuid.UUID, batch *batch.Batch)
- func (proc *Process) SetVectorPoolSize(limit int)
- func (proc *Process) UnmarshalBinary(_ []byte) error
- func (proc *Process) Warn(ctx context.Context, msg string, fields ...zap.Field)
- func (proc *Process) Warnf(ctx context.Context, msg string, args ...any)
- func (proc *Process) WithSpanContext(sc trace.SpanContext)
- type Register
- type RegisterMessage
- type RuntimeFilterMessage
- type SessionInfo
- func (si *SessionInfo) GetCharset() string
- func (si *SessionInfo) GetCollation() string
- func (si *SessionInfo) GetConnectionID() uint64
- func (si *SessionInfo) GetDatabase() string
- func (si *SessionInfo) GetHost() string
- func (si *SessionInfo) GetRole() string
- func (si *SessionInfo) GetUser() string
- func (si *SessionInfo) GetUserHost() string
- func (si *SessionInfo) GetVersion() string
- type StmtProfile
- func (sp *StmtProfile) Clear()
- func (sp *StmtProfile) GetQueryStart() time.Time
- func (sp *StmtProfile) GetQueryType() string
- func (sp *StmtProfile) GetSqlOfStmt() string
- func (sp *StmtProfile) GetSqlSourceType() string
- func (sp *StmtProfile) GetStmtId() uuid.UUID
- func (sp *StmtProfile) GetStmtType() string
- func (sp *StmtProfile) GetTxnId() uuid.UUID
- func (sp *StmtProfile) SetQueryStart(t time.Time)
- func (sp *StmtProfile) SetQueryType(qt string)
- func (sp *StmtProfile) SetSqlOfStmt(sot string)
- func (sp *StmtProfile) SetSqlSourceType(st string)
- func (sp *StmtProfile) SetStmtId(id uuid.UUID)
- func (sp *StmtProfile) SetStmtType(st string)
- func (sp *StmtProfile) SetTxnId(id []byte)
- type TopValueMessage
- type WaitRegister
- type WrapCs
Constants ¶
const ( RuntimeFilter_IN = 0 RuntimeFilter_BITMAP = 1 RuntimeFilter_MIN_MAX = 2 RuntimeFilter_BINARY_FUSE = 3 RuntimeFilter_PASS = 100 RuntimeFilter_DROP = 101 )
const ( ExecStop = iota ExecNext ExecHasMore )
const ALLCN = "ALLCN"
const CURRENTCN = "CURRENTCN"
const DefaultBatchSize = 8192
const (
VectorLimit = 32
)
Variables ¶
var (
NormalEndRegisterMessage = NewRegMsg(nil)
)
Functions ¶
func MatchAddress ¶ added in v1.2.0
func MatchAddress(m Message, raddr *MessageAddress) bool
Types ¶
type Analyze ¶ added in v0.6.0
type Analyze interface { Stop() ChildrenCallStop(time.Time) Start() Alloc(int64) Input(*batch.Batch, bool) Output(*batch.Batch, bool) WaitStop(time.Time) DiskIO(*batch.Batch) S3IOByte(*batch.Batch) S3IOInputCount(int) S3IOOutputCount(int) Network(*batch.Batch) AddScanTime(t time.Time) AddInsertTime(t time.Time) }
Analyze analyzes information for operator
type AnalyzeInfo ¶ added in v0.5.1
type AnalyzeInfo struct { // NodeId, index of query's node list NodeId int32 // InputRows, number of rows accepted by node InputRows int64 // OutputRows, number of rows output by node OutputRows int64 // TimeConsumed, time taken by the node in milliseconds TimeConsumed int64 // WaitTimeConsumed, time taken by the node waiting for channel in milliseconds WaitTimeConsumed int64 // InputSize, data size accepted by node InputSize int64 // OutputSize, data size output by node OutputSize int64 // MemorySize, memory alloc by node MemorySize int64 // DiskIO, data size read from disk DiskIO int64 // S3IOByte, data size read from s3 S3IOByte int64 // S3IOInputCount, count for PUT, COPY, POST and LIST S3IOInputCount int64 // S3IOOutputCount, count for GET, SELECT and other S3IOOutputCount int64 // NetworkIO, message size send between CN node NetworkIO int64 // ScanTime, scan cost time in external scan ScanTime int64 // InsertTime, insert cost time in load flow InsertTime int64 TimeConsumedArrayMajor []int64 TimeConsumedArrayMinor []int64 // contains filtered or unexported fields }
AnalyzeInfo analyze information for query
func (*AnalyzeInfo) AddNewParallel ¶ added in v1.1.0
func (a *AnalyzeInfo) AddNewParallel(major bool) int
func (*AnalyzeInfo) AddSingleParallelTimeConsumed ¶ added in v1.1.0
func (a *AnalyzeInfo) AddSingleParallelTimeConsumed(major bool, parallelIdx int, t int64)
func (*AnalyzeInfo) DeepCopyArray ¶ added in v1.1.0
func (a *AnalyzeInfo) DeepCopyArray(pa *plan.AnalyzeInfo)
func (*AnalyzeInfo) MergeArray ¶ added in v1.1.0
func (a *AnalyzeInfo) MergeArray(pa *plan.AnalyzeInfo)
func (AnalyzeInfo) TypeName ¶ added in v1.2.0
func (a AnalyzeInfo) TypeName() string
type ExecStatus ¶ added in v1.0.0
type ExecStatus int
type Limitation ¶
type Limitation struct { // Size, memory threshold for operator. Size int64 // BatchRows, max rows for batch. BatchRows int64 // BatchSize, max size for batch. BatchSize int64 // PartitionRows, max rows for partition. PartitionRows int64 // ReaderSize, memory threshold for storage's reader ReaderSize int64 // MaxMessageSize max size for read messages from dn MaxMsgSize uint64 }
Limitation specifies the maximum resources that can be used in one query.
type MessageAddress ¶ added in v1.2.0
func AddrBroadCastOnALLCN ¶ added in v1.2.0
func AddrBroadCastOnALLCN() MessageAddress
func AddrBroadCastOnCurrentCN ¶ added in v1.2.0
func AddrBroadCastOnCurrentCN() MessageAddress
type MessageBoard ¶ added in v1.2.0
type MessageBoard struct { Messages []*Message Waiters []chan bool RwMutex *sync.RWMutex // for nonblock message }
func NewMessageBoard ¶ added in v1.2.0
func NewMessageBoard() *MessageBoard
func (*MessageBoard) Reset ¶ added in v1.2.0
func (m *MessageBoard) Reset()
type MessageReceiver ¶ added in v1.2.0
type MessageReceiver struct {
// contains filtered or unexported fields
}
func (*MessageReceiver) Free ¶ added in v1.2.0
func (mr *MessageReceiver) Free()
func (*MessageReceiver) ReceiveMessage ¶ added in v1.2.0
type Process ¶
type Process struct { StmtProfile *StmtProfile // Id, query id. Id string Reg Register Lim Limitation // unix timestamp UnixTime int64 TxnClient client.TxnClient TxnOperator client.TxnOperator AnalInfos []*AnalyzeInfo SessionInfo SessionInfo Ctx context.Context Cancel context.CancelFunc FileService fileservice.FileService LockService lockservice.LockService IncrService incrservice.AutoIncrementService LoadTag bool LastInsertID *uint64 LoadLocalReader *io.PipeReader DispatchNotifyCh chan WrapCs Aicm *defines.AutoIncrCacheManager QueryClient qclient.QueryClient Hakeeper logservice.CNHAKeeperClient UdfService udf.Service WaitPolicy lock.WaitPolicy MessageBoard *MessageBoard CloneTxnOperator client.TxnOperator // contains filtered or unexported fields }
Process contains context used in query execution one or more pipeline will be generated for one query, and one pipeline has one process instance.
func New ¶
func New( ctx context.Context, m *mpool.MPool, txnClient client.TxnClient, txnOperator client.TxnOperator, fileService fileservice.FileService, lockService lockservice.LockService, queryClient qclient.QueryClient, hakeeper logservice.CNHAKeeperClient, udfService udf.Service, aicm *defines.AutoIncrCacheManager) *Process
New creates a new Process. A process stores the execution context.
func NewFromProc ¶ added in v0.5.0
NewFromProc create a new Process based on another process.
func NewWithAnalyze ¶ added in v0.6.0
func (*Process) AllocVectorOfRows ¶ added in v0.6.0
func (*Process) AppendToFixedSizeFromOffset ¶ added in v1.2.0
func (*Process) CleanValueScanBatchs ¶ added in v1.0.0
func (proc *Process) CleanValueScanBatchs()
func (*Process) CopyValueScanBatch ¶ added in v0.8.0
func (*Process) CopyVectorPool ¶ added in v0.8.0
func (*Process) FinalizeRuntimeFilter ¶ added in v1.2.0
func (proc *Process) FinalizeRuntimeFilter(m *plan.RuntimeFilterSpec)
func (*Process) FreeVectors ¶ added in v0.8.0
func (proc *Process) FreeVectors()
func (*Process) GetAnalyze ¶ added in v0.6.0
func (*Process) GetCloneTxnOperator ¶ added in v1.2.1
func (proc *Process) GetCloneTxnOperator() client.TxnOperator
func (*Process) GetLastInsertID ¶ added in v0.7.0
func (*Process) GetPrepareBatch ¶ added in v0.8.0
func (*Process) GetPrepareExprList ¶ added in v0.8.0
func (*Process) GetPrepareParams ¶ added in v0.8.0
func (*Process) GetPrepareParamsAt ¶ added in v0.8.0
func (*Process) GetResolveVariableFunc ¶ added in v0.8.0
func (*Process) GetSessionInfo ¶ added in v0.8.0
func (proc *Process) GetSessionInfo() *SessionInfo
func (*Process) GetStmtProfile ¶ added in v1.0.0
func (proc *Process) GetStmtProfile() *StmtProfile
func (*Process) GetValueScanBatch ¶ added in v0.8.0
func (*Process) GetValueScanBatchs ¶ added in v0.8.0
func (*Process) InputBatch ¶ added in v0.6.0
func (*Process) MarshalBinary ¶ added in v0.6.0
func (*Process) NewBatchFromSrc ¶ added in v1.2.0
func (*Process) NewMessageReceiver ¶ added in v1.2.0
func (proc *Process) NewMessageReceiver(tags []int32, addr MessageAddress) *MessageReceiver
func (*Process) OperatorOutofMemory ¶ added in v0.6.0
func (*Process) ResetContextFromParent ¶ added in v0.8.0
func (*Process) SendMessage ¶ added in v1.2.0
func (*Process) SendRuntimeFilter ¶ added in v1.2.0
func (proc *Process) SendRuntimeFilter(rt RuntimeFilterMessage, m *plan.RuntimeFilterSpec)
func (*Process) SetCacheForAutoCol ¶ added in v0.8.0
func (*Process) SetCloneTxnOperator ¶ added in v1.2.1
func (proc *Process) SetCloneTxnOperator(op client.TxnOperator)
func (*Process) SetInputBatch ¶ added in v0.6.0
func (*Process) SetLastInsertID ¶ added in v0.7.0
func (*Process) SetPrepareBatch ¶ added in v0.8.0
func (*Process) SetPrepareExprList ¶ added in v0.8.0
func (*Process) SetPrepareParams ¶ added in v0.8.0
func (*Process) SetQueryId ¶ added in v0.6.0
func (*Process) SetResolveVariableFunc ¶ added in v0.8.0
func (*Process) SetStmtProfile ¶ added in v1.0.0
func (proc *Process) SetStmtProfile(sp *StmtProfile)
func (*Process) SetValueScanBatch ¶ added in v0.8.0
func (*Process) SetVectorPoolSize ¶ added in v1.0.0
func (*Process) UnmarshalBinary ¶ added in v0.6.0
func (*Process) WithSpanContext ¶ added in v0.7.0
func (proc *Process) WithSpanContext(sc trace.SpanContext)
type Register ¶
type Register struct { // Ss, temporarily stores the row number list in the execution of operators, // and it can be reused in the future execution. Ss [][]int64 // InputBatch, stores the result of the previous operator. InputBatch *batch.Batch // MergeReceivers, receives result of multi previous operators from other pipelines // e.g. merge operator. MergeReceivers []*WaitRegister }
Register used in execution pipeline and shared with all operators of the same pipeline.
type RegisterMessage ¶ added in v1.2.1
RegisterMessage channel data Err == nil means pipeline finish with error Batch == nil means pipeline finish without error Batch != nil means pipeline is running
func NewRegMsg ¶ added in v1.2.1
func NewRegMsg(bat *batch.Batch) *RegisterMessage
type RuntimeFilterMessage ¶ added in v1.2.0
func (RuntimeFilterMessage) Deserialize ¶ added in v1.2.0
func (rt RuntimeFilterMessage) Deserialize([]byte) Message
func (RuntimeFilterMessage) GetMsgTag ¶ added in v1.2.0
func (rt RuntimeFilterMessage) GetMsgTag() int32
func (RuntimeFilterMessage) GetReceiverAddr ¶ added in v1.2.0
func (rt RuntimeFilterMessage) GetReceiverAddr() MessageAddress
func (RuntimeFilterMessage) NeedBlock ¶ added in v1.2.0
func (rt RuntimeFilterMessage) NeedBlock() bool
func (RuntimeFilterMessage) Serialize ¶ added in v1.2.0
func (rt RuntimeFilterMessage) Serialize() []byte
type SessionInfo ¶ added in v0.5.0
type SessionInfo struct { Account string User string Host string Role string ConnectionID uint64 AccountId uint32 RoleId uint32 UserId uint32 LastInsertID uint64 Database string Version string TimeZone *time.Location StorageEngine engine.Engine QueryId []string ResultColTypes []types.Type SeqCurValues map[uint64]string SeqDeleteKeys []uint64 SeqAddValues map[uint64]string SeqLastValue []string SqlHelper sqlHelper Buf *buffer.Buffer SourceInMemScanBatch []*kafka.Message LogLevel zapcore.Level SessionId uuid.UUID }
SessionInfo session information
func (*SessionInfo) GetCharset ¶ added in v0.5.0
func (si *SessionInfo) GetCharset() string
func (*SessionInfo) GetCollation ¶ added in v0.5.0
func (si *SessionInfo) GetCollation() string
func (*SessionInfo) GetConnectionID ¶ added in v0.5.0
func (si *SessionInfo) GetConnectionID() uint64
func (*SessionInfo) GetDatabase ¶ added in v0.5.0
func (si *SessionInfo) GetDatabase() string
func (*SessionInfo) GetHost ¶ added in v0.5.0
func (si *SessionInfo) GetHost() string
func (*SessionInfo) GetRole ¶ added in v0.5.0
func (si *SessionInfo) GetRole() string
func (*SessionInfo) GetUser ¶ added in v0.5.0
func (si *SessionInfo) GetUser() string
func (*SessionInfo) GetUserHost ¶ added in v0.5.0
func (si *SessionInfo) GetUserHost() string
func (*SessionInfo) GetVersion ¶ added in v0.5.0
func (si *SessionInfo) GetVersion() string
type StmtProfile ¶ added in v1.0.0
type StmtProfile struct {
// contains filtered or unexported fields
}
StmtProfile will be clear for every statement
func NewStmtProfile ¶ added in v1.2.1
func NewStmtProfile(txnId, stmtId uuid.UUID) *StmtProfile
func (*StmtProfile) Clear ¶ added in v1.0.0
func (sp *StmtProfile) Clear()
func (*StmtProfile) GetQueryStart ¶ added in v1.0.0
func (sp *StmtProfile) GetQueryStart() time.Time
func (*StmtProfile) GetQueryType ¶ added in v1.0.0
func (sp *StmtProfile) GetQueryType() string
func (*StmtProfile) GetSqlOfStmt ¶ added in v1.0.0
func (sp *StmtProfile) GetSqlOfStmt() string
func (*StmtProfile) GetSqlSourceType ¶ added in v1.0.0
func (sp *StmtProfile) GetSqlSourceType() string
func (*StmtProfile) GetStmtId ¶ added in v1.0.0
func (sp *StmtProfile) GetStmtId() uuid.UUID
func (*StmtProfile) GetStmtType ¶ added in v1.0.0
func (sp *StmtProfile) GetStmtType() string
func (*StmtProfile) GetTxnId ¶ added in v1.0.0
func (sp *StmtProfile) GetTxnId() uuid.UUID
func (*StmtProfile) SetQueryStart ¶ added in v1.0.0
func (sp *StmtProfile) SetQueryStart(t time.Time)
func (*StmtProfile) SetQueryType ¶ added in v1.0.0
func (sp *StmtProfile) SetQueryType(qt string)
func (*StmtProfile) SetSqlOfStmt ¶ added in v1.0.0
func (sp *StmtProfile) SetSqlOfStmt(sot string)
func (*StmtProfile) SetSqlSourceType ¶ added in v1.0.0
func (sp *StmtProfile) SetSqlSourceType(st string)
func (*StmtProfile) SetStmtId ¶ added in v1.0.0
func (sp *StmtProfile) SetStmtId(id uuid.UUID)
func (*StmtProfile) SetStmtType ¶ added in v1.0.0
func (sp *StmtProfile) SetStmtType(st string)
func (*StmtProfile) SetTxnId ¶ added in v1.0.0
func (sp *StmtProfile) SetTxnId(id []byte)
type TopValueMessage ¶ added in v1.2.0
func (TopValueMessage) Deserialize ¶ added in v1.2.0
func (t TopValueMessage) Deserialize([]byte) Message
func (TopValueMessage) GetMsgTag ¶ added in v1.2.0
func (t TopValueMessage) GetMsgTag() int32
func (TopValueMessage) GetReceiverAddr ¶ added in v1.2.0
func (t TopValueMessage) GetReceiverAddr() MessageAddress
func (TopValueMessage) NeedBlock ¶ added in v1.2.0
func (t TopValueMessage) NeedBlock() bool
func (TopValueMessage) Serialize ¶ added in v1.2.0
func (t TopValueMessage) Serialize() []byte
type WaitRegister ¶
type WaitRegister struct { Ctx context.Context Ch chan *RegisterMessage }
WaitRegister channel
func (*WaitRegister) CleanChannel ¶ added in v1.0.0
func (wreg *WaitRegister) CleanChannel(m *mpool.MPool)
func (*WaitRegister) MarshalBinary ¶ added in v0.6.0
func (wreg *WaitRegister) MarshalBinary() ([]byte, error)
func (*WaitRegister) UnmarshalBinary ¶ added in v0.6.0
func (wreg *WaitRegister) UnmarshalBinary(_ []byte) error