process

package
v1.2.1-20240709 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2024 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RuntimeFilter_IN          = 0
	RuntimeFilter_BITMAP      = 1
	RuntimeFilter_MIN_MAX     = 2
	RuntimeFilter_BINARY_FUSE = 3
	RuntimeFilter_PASS        = 100
	RuntimeFilter_DROP        = 101
)
View Source
const (
	ExecStop = iota
	ExecNext
	ExecHasMore
)
View Source
const ALLCN = "ALLCN"
View Source
const CURRENTCN = "CURRENTCN"
View Source
const DefaultBatchSize = 8192
View Source
const (
	VectorLimit = 32
)

Variables

View Source
var (
	NormalEndRegisterMessage = NewRegMsg(nil)
)

Functions

func MatchAddress added in v1.2.0

func MatchAddress(m Message, raddr *MessageAddress) bool

Types

type Analyze added in v0.6.0

type Analyze interface {
	Stop()
	ChildrenCallStop(time.Time)
	Start()
	Alloc(int64)
	Input(*batch.Batch, bool)
	Output(*batch.Batch, bool)
	WaitStop(time.Time)
	DiskIO(*batch.Batch)
	S3IOByte(*batch.Batch)
	S3IOInputCount(int)
	S3IOOutputCount(int)
	Network(*batch.Batch)
	AddScanTime(t time.Time)
	AddInsertTime(t time.Time)
}

Analyze analyzes information for operator

type AnalyzeInfo added in v0.5.1

type AnalyzeInfo struct {
	// NodeId, index of query's node list
	NodeId int32
	// InputRows, number of rows accepted by node
	InputRows int64
	// OutputRows, number of rows output by node
	OutputRows int64
	// TimeConsumed, time taken by the node in milliseconds
	TimeConsumed int64
	// WaitTimeConsumed, time taken by the node waiting for channel in milliseconds
	WaitTimeConsumed int64
	// InputSize, data size accepted by node
	InputSize int64
	// OutputSize, data size output by node
	OutputSize int64
	// MemorySize, memory alloc by node
	MemorySize int64
	// DiskIO, data size read from disk
	DiskIO int64
	// S3IOByte, data size read from s3
	S3IOByte int64
	// S3IOInputCount, count for PUT, COPY, POST and LIST
	S3IOInputCount int64
	// S3IOOutputCount, count for GET, SELECT and other
	S3IOOutputCount int64
	// NetworkIO, message size send between CN node
	NetworkIO int64
	// ScanTime, scan cost time in external scan
	ScanTime int64
	// InsertTime, insert cost time in load flow
	InsertTime int64

	TimeConsumedArrayMajor []int64
	TimeConsumedArrayMinor []int64
	// contains filtered or unexported fields
}

AnalyzeInfo analyze information for query

func (*AnalyzeInfo) AddNewParallel added in v1.1.0

func (a *AnalyzeInfo) AddNewParallel(major bool) int

func (*AnalyzeInfo) AddSingleParallelTimeConsumed added in v1.1.0

func (a *AnalyzeInfo) AddSingleParallelTimeConsumed(major bool, parallelIdx int, t int64)

func (*AnalyzeInfo) DeepCopyArray added in v1.1.0

func (a *AnalyzeInfo) DeepCopyArray(pa *plan.AnalyzeInfo)

func (*AnalyzeInfo) MergeArray added in v1.1.0

func (a *AnalyzeInfo) MergeArray(pa *plan.AnalyzeInfo)

func (AnalyzeInfo) TypeName added in v1.2.0

func (a AnalyzeInfo) TypeName() string

type ExecStatus added in v1.0.0

type ExecStatus int

type Limitation

type Limitation struct {
	// Size, memory threshold for operator.
	Size int64
	// BatchRows, max rows for batch.
	BatchRows int64
	// BatchSize, max size for batch.
	BatchSize int64
	// PartitionRows, max rows for partition.
	PartitionRows int64
	// ReaderSize, memory threshold for storage's reader
	ReaderSize int64
	// MaxMessageSize max size for read messages from dn
	MaxMsgSize uint64
}

Limitation specifies the maximum resources that can be used in one query.

type Message added in v1.2.0

type Message interface {
	Serialize() []byte
	Deserialize([]byte) Message
	NeedBlock() bool
	GetMsgTag() int32
	GetReceiverAddr() MessageAddress
}

type MessageAddress added in v1.2.0

type MessageAddress struct {
	CnAddr     string
	OperatorID int32
	ParallelID int32
}

func AddrBroadCastOnALLCN added in v1.2.0

func AddrBroadCastOnALLCN() MessageAddress

func AddrBroadCastOnCurrentCN added in v1.2.0

func AddrBroadCastOnCurrentCN() MessageAddress

type MessageBoard added in v1.2.0

type MessageBoard struct {
	Messages []*Message
	Waiters  []chan bool
	RwMutex  *sync.RWMutex // for nonblock message
}

func NewMessageBoard added in v1.2.0

func NewMessageBoard() *MessageBoard

func (*MessageBoard) Reset added in v1.2.0

func (m *MessageBoard) Reset()

type MessageReceiver added in v1.2.0

type MessageReceiver struct {
	// contains filtered or unexported fields
}

func (*MessageReceiver) Free added in v1.2.0

func (mr *MessageReceiver) Free()

func (*MessageReceiver) ReceiveMessage added in v1.2.0

func (mr *MessageReceiver) ReceiveMessage(needBlock bool, ctx context.Context) ([]Message, bool)

type MsgType added in v1.2.0

type MsgType int32
const (
	MsgTopValue      MsgType = 0
	MsgPipelineStart MsgType = 1
	MsgPipelineStop  MsgType = 2
	MsgRuntimeFilter MsgType = 3
	MsgHashMap       MsgType = 4
	MaxMessage       MsgType = 1024
)

func (MsgType) MessageName added in v1.2.0

func (m MsgType) MessageName() string

type Process

type Process struct {
	StmtProfile *StmtProfile
	// Id, query id.
	Id  string
	Reg Register
	Lim Limitation

	// unix timestamp
	UnixTime int64

	TxnClient client.TxnClient

	TxnOperator client.TxnOperator

	AnalInfos []*AnalyzeInfo

	SessionInfo SessionInfo

	Ctx context.Context

	Cancel context.CancelFunc

	FileService fileservice.FileService
	LockService lockservice.LockService
	IncrService incrservice.AutoIncrementService

	LoadTag bool

	LastInsertID *uint64

	LoadLocalReader *io.PipeReader

	DispatchNotifyCh chan WrapCs

	Aicm *defines.AutoIncrCacheManager

	QueryClient qclient.QueryClient

	Hakeeper logservice.CNHAKeeperClient

	UdfService udf.Service

	WaitPolicy lock.WaitPolicy

	MessageBoard *MessageBoard

	CloneTxnOperator client.TxnOperator
	// contains filtered or unexported fields
}

Process contains context used in query execution one or more pipeline will be generated for one query, and one pipeline has one process instance.

func New

func New(
	ctx context.Context,
	m *mpool.MPool,
	txnClient client.TxnClient,
	txnOperator client.TxnOperator,
	fileService fileservice.FileService,
	lockService lockservice.LockService,
	queryClient qclient.QueryClient,
	hakeeper logservice.CNHAKeeperClient,
	udfService udf.Service,
	aicm *defines.AutoIncrCacheManager) *Process

New creates a new Process. A process stores the execution context.

func NewFromProc added in v0.5.0

func NewFromProc(p *Process, ctx context.Context, regNumber int) *Process

NewFromProc create a new Process based on another process.

func NewWithAnalyze added in v0.6.0

func NewWithAnalyze(p *Process, ctx context.Context, regNumber int, anals []*AnalyzeInfo) *Process

func (*Process) AllocVectorOfRows added in v0.6.0

func (proc *Process) AllocVectorOfRows(typ types.Type, nele int, nsp *nulls.Nulls) (*vector.Vector, error)

func (*Process) AppendToFixedSizeFromOffset added in v1.2.0

func (proc *Process) AppendToFixedSizeFromOffset(dst *batch.Batch, src *batch.Batch, offset int) (*batch.Batch, int, error)

func (*Process) CleanValueScanBatchs added in v1.0.0

func (proc *Process) CleanValueScanBatchs()

func (*Process) CopyValueScanBatch added in v0.8.0

func (proc *Process) CopyValueScanBatch(src *Process)

func (*Process) CopyVectorPool added in v0.8.0

func (proc *Process) CopyVectorPool(src *Process)

func (*Process) Debug added in v1.2.1

func (proc *Process) Debug(ctx context.Context, msg string, fields ...zap.Field)

func (*Process) Debugf added in v1.2.1

func (proc *Process) Debugf(ctx context.Context, msg string, args ...any)

func (*Process) Error added in v1.2.1

func (proc *Process) Error(ctx context.Context, msg string, fields ...zap.Field)

func (*Process) Errorf added in v1.2.1

func (proc *Process) Errorf(ctx context.Context, msg string, args ...any)

func (*Process) Fatal added in v1.2.1

func (proc *Process) Fatal(ctx context.Context, msg string, fields ...zap.Field)

func (*Process) Fatalf added in v1.2.1

func (proc *Process) Fatalf(ctx context.Context, msg string, args ...any)

func (*Process) FinalizeRuntimeFilter added in v1.2.0

func (proc *Process) FinalizeRuntimeFilter(m *plan.RuntimeFilterSpec)

func (*Process) FreeVectors added in v0.8.0

func (proc *Process) FreeVectors()

func (*Process) GetAnalyze added in v0.6.0

func (proc *Process) GetAnalyze(idx, parallelIdx int, parallelMajor bool) Analyze

func (*Process) GetCloneTxnOperator added in v1.2.1

func (proc *Process) GetCloneTxnOperator() client.TxnOperator

func (*Process) GetLastInsertID added in v0.7.0

func (proc *Process) GetLastInsertID() uint64

func (*Process) GetMPool added in v0.6.0

func (proc *Process) GetMPool() *mpool.MPool

func (*Process) GetPrepareBatch added in v0.8.0

func (proc *Process) GetPrepareBatch() *batch.Batch

func (*Process) GetPrepareExprList added in v0.8.0

func (proc *Process) GetPrepareExprList() any

func (*Process) GetPrepareParams added in v0.8.0

func (proc *Process) GetPrepareParams() *vector.Vector

func (*Process) GetPrepareParamsAt added in v0.8.0

func (proc *Process) GetPrepareParamsAt(i int) ([]byte, error)

func (*Process) GetResolveVariableFunc added in v0.8.0

func (proc *Process) GetResolveVariableFunc() func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error)

func (*Process) GetSessionInfo added in v0.8.0

func (proc *Process) GetSessionInfo() *SessionInfo

func (*Process) GetStmtProfile added in v1.0.0

func (proc *Process) GetStmtProfile() *StmtProfile

func (*Process) GetValueScanBatch added in v0.8.0

func (proc *Process) GetValueScanBatch(key uuid.UUID) *batch.Batch

func (*Process) GetValueScanBatchs added in v0.8.0

func (proc *Process) GetValueScanBatchs() []*batch.Batch

func (*Process) GetVector added in v0.8.0

func (proc *Process) GetVector(typ types.Type) *vector.Vector

func (*Process) Info added in v1.2.1

func (proc *Process) Info(ctx context.Context, msg string, fields ...zap.Field)

func (*Process) Infof added in v1.2.1

func (proc *Process) Infof(ctx context.Context, msg string, args ...any)

func (*Process) InitSeq added in v0.8.0

func (proc *Process) InitSeq()

func (*Process) InputBatch added in v0.6.0

func (proc *Process) InputBatch() *batch.Batch

func (*Process) MarshalBinary added in v0.6.0

func (proc *Process) MarshalBinary() ([]byte, error)

func (*Process) Mp

func (proc *Process) Mp() *mpool.MPool

func (*Process) NewBatchFromSrc added in v1.2.0

func (proc *Process) NewBatchFromSrc(src *batch.Batch, preAllocSize int) (*batch.Batch, error)

func (*Process) NewMessageReceiver added in v1.2.0

func (proc *Process) NewMessageReceiver(tags []int32, addr MessageAddress) *MessageReceiver

func (*Process) OperatorOutofMemory added in v0.6.0

func (proc *Process) OperatorOutofMemory(size int64) bool

func (*Process) PutBatch added in v0.8.0

func (proc *Process) PutBatch(bat *batch.Batch)

func (*Process) PutVector added in v0.8.0

func (proc *Process) PutVector(vec *vector.Vector)

func (*Process) QueryId added in v0.6.0

func (proc *Process) QueryId() string

func (*Process) ResetContextFromParent added in v0.8.0

func (proc *Process) ResetContextFromParent(parent context.Context) context.Context

func (*Process) SendMessage added in v1.2.0

func (proc *Process) SendMessage(m Message)

func (*Process) SendRuntimeFilter added in v1.2.0

func (proc *Process) SendRuntimeFilter(rt RuntimeFilterMessage, m *plan.RuntimeFilterSpec)

func (*Process) SetCacheForAutoCol added in v0.8.0

func (proc *Process) SetCacheForAutoCol(name string)

func (*Process) SetCloneTxnOperator added in v1.2.1

func (proc *Process) SetCloneTxnOperator(op client.TxnOperator)

func (*Process) SetInputBatch added in v0.6.0

func (proc *Process) SetInputBatch(bat *batch.Batch)

func (*Process) SetLastInsertID added in v0.7.0

func (proc *Process) SetLastInsertID(num uint64)

func (*Process) SetPrepareBatch added in v0.8.0

func (proc *Process) SetPrepareBatch(bat *batch.Batch)

func (*Process) SetPrepareExprList added in v0.8.0

func (proc *Process) SetPrepareExprList(exprList any)

func (*Process) SetPrepareParams added in v0.8.0

func (proc *Process) SetPrepareParams(prepareParams *vector.Vector)

func (*Process) SetQueryId added in v0.6.0

func (proc *Process) SetQueryId(id string)

func (*Process) SetResolveVariableFunc added in v0.8.0

func (proc *Process) SetResolveVariableFunc(f func(varName string, isSystemVar, isGlobalVar bool) (interface{}, error))

func (*Process) SetStmtProfile added in v1.0.0

func (proc *Process) SetStmtProfile(sp *StmtProfile)

func (*Process) SetValueScanBatch added in v0.8.0

func (proc *Process) SetValueScanBatch(key uuid.UUID, batch *batch.Batch)

func (*Process) SetVectorPoolSize added in v1.0.0

func (proc *Process) SetVectorPoolSize(limit int)

func (*Process) UnmarshalBinary added in v0.6.0

func (proc *Process) UnmarshalBinary(_ []byte) error

func (*Process) Warn added in v1.2.1

func (proc *Process) Warn(ctx context.Context, msg string, fields ...zap.Field)

func (*Process) Warnf added in v1.2.1

func (proc *Process) Warnf(ctx context.Context, msg string, args ...any)

func (*Process) WithSpanContext added in v0.7.0

func (proc *Process) WithSpanContext(sc trace.SpanContext)

type Register

type Register struct {
	// Ss, temporarily stores the row number list in the execution of operators,
	// and it can be reused in the future execution.
	Ss [][]int64
	// InputBatch, stores the result of the previous operator.
	InputBatch *batch.Batch
	// MergeReceivers, receives result of multi previous operators from other pipelines
	// e.g. merge operator.
	MergeReceivers []*WaitRegister
}

Register used in execution pipeline and shared with all operators of the same pipeline.

type RegisterMessage added in v1.2.1

type RegisterMessage struct {
	Batch *batch.Batch
	Err   error
}

RegisterMessage channel data Err == nil means pipeline finish with error Batch == nil means pipeline finish without error Batch != nil means pipeline is running

func NewRegMsg added in v1.2.1

func NewRegMsg(bat *batch.Batch) *RegisterMessage

type RuntimeFilterMessage added in v1.2.0

type RuntimeFilterMessage struct {
	Tag  int32
	Typ  int32
	Card int32
	Data []byte
}

func (RuntimeFilterMessage) Deserialize added in v1.2.0

func (rt RuntimeFilterMessage) Deserialize([]byte) Message

func (RuntimeFilterMessage) GetMsgTag added in v1.2.0

func (rt RuntimeFilterMessage) GetMsgTag() int32

func (RuntimeFilterMessage) GetReceiverAddr added in v1.2.0

func (rt RuntimeFilterMessage) GetReceiverAddr() MessageAddress

func (RuntimeFilterMessage) NeedBlock added in v1.2.0

func (rt RuntimeFilterMessage) NeedBlock() bool

func (RuntimeFilterMessage) Serialize added in v1.2.0

func (rt RuntimeFilterMessage) Serialize() []byte

type SessionInfo added in v0.5.0

type SessionInfo struct {
	Account              string
	User                 string
	Host                 string
	Role                 string
	ConnectionID         uint64
	AccountId            uint32
	RoleId               uint32
	UserId               uint32
	LastInsertID         uint64
	Database             string
	Version              string
	TimeZone             *time.Location
	StorageEngine        engine.Engine
	QueryId              []string
	ResultColTypes       []types.Type
	SeqCurValues         map[uint64]string
	SeqDeleteKeys        []uint64
	SeqAddValues         map[uint64]string
	SeqLastValue         []string
	SqlHelper            sqlHelper
	Buf                  *buffer.Buffer
	SourceInMemScanBatch []*kafka.Message
	LogLevel             zapcore.Level
	SessionId            uuid.UUID
}

SessionInfo session information

func (*SessionInfo) GetCharset added in v0.5.0

func (si *SessionInfo) GetCharset() string

func (*SessionInfo) GetCollation added in v0.5.0

func (si *SessionInfo) GetCollation() string

func (*SessionInfo) GetConnectionID added in v0.5.0

func (si *SessionInfo) GetConnectionID() uint64

func (*SessionInfo) GetDatabase added in v0.5.0

func (si *SessionInfo) GetDatabase() string

func (*SessionInfo) GetHost added in v0.5.0

func (si *SessionInfo) GetHost() string

func (*SessionInfo) GetRole added in v0.5.0

func (si *SessionInfo) GetRole() string

func (*SessionInfo) GetUser added in v0.5.0

func (si *SessionInfo) GetUser() string

func (*SessionInfo) GetUserHost added in v0.5.0

func (si *SessionInfo) GetUserHost() string

func (*SessionInfo) GetVersion added in v0.5.0

func (si *SessionInfo) GetVersion() string

type StmtProfile added in v1.0.0

type StmtProfile struct {
	// contains filtered or unexported fields
}

StmtProfile will be clear for every statement

func NewStmtProfile added in v1.2.1

func NewStmtProfile(txnId, stmtId uuid.UUID) *StmtProfile

func (*StmtProfile) Clear added in v1.0.0

func (sp *StmtProfile) Clear()

func (*StmtProfile) GetQueryStart added in v1.0.0

func (sp *StmtProfile) GetQueryStart() time.Time

func (*StmtProfile) GetQueryType added in v1.0.0

func (sp *StmtProfile) GetQueryType() string

func (*StmtProfile) GetSqlOfStmt added in v1.0.0

func (sp *StmtProfile) GetSqlOfStmt() string

func (*StmtProfile) GetSqlSourceType added in v1.0.0

func (sp *StmtProfile) GetSqlSourceType() string

func (*StmtProfile) GetStmtId added in v1.0.0

func (sp *StmtProfile) GetStmtId() uuid.UUID

func (*StmtProfile) GetStmtType added in v1.0.0

func (sp *StmtProfile) GetStmtType() string

func (*StmtProfile) GetTxnId added in v1.0.0

func (sp *StmtProfile) GetTxnId() uuid.UUID

func (*StmtProfile) SetQueryStart added in v1.0.0

func (sp *StmtProfile) SetQueryStart(t time.Time)

func (*StmtProfile) SetQueryType added in v1.0.0

func (sp *StmtProfile) SetQueryType(qt string)

func (*StmtProfile) SetSqlOfStmt added in v1.0.0

func (sp *StmtProfile) SetSqlOfStmt(sot string)

func (*StmtProfile) SetSqlSourceType added in v1.0.0

func (sp *StmtProfile) SetSqlSourceType(st string)

func (*StmtProfile) SetStmtId added in v1.0.0

func (sp *StmtProfile) SetStmtId(id uuid.UUID)

func (*StmtProfile) SetStmtType added in v1.0.0

func (sp *StmtProfile) SetStmtType(st string)

func (*StmtProfile) SetTxnId added in v1.0.0

func (sp *StmtProfile) SetTxnId(id []byte)

type TopValueMessage added in v1.2.0

type TopValueMessage struct {
	TopValueZM objectio.ZoneMap
	Tag        int32
}

func (TopValueMessage) Deserialize added in v1.2.0

func (t TopValueMessage) Deserialize([]byte) Message

func (TopValueMessage) GetMsgTag added in v1.2.0

func (t TopValueMessage) GetMsgTag() int32

func (TopValueMessage) GetReceiverAddr added in v1.2.0

func (t TopValueMessage) GetReceiverAddr() MessageAddress

func (TopValueMessage) NeedBlock added in v1.2.0

func (t TopValueMessage) NeedBlock() bool

func (TopValueMessage) Serialize added in v1.2.0

func (t TopValueMessage) Serialize() []byte

type WaitRegister

type WaitRegister struct {
	Ctx context.Context
	Ch  chan *RegisterMessage
}

WaitRegister channel

func (*WaitRegister) CleanChannel added in v1.0.0

func (wreg *WaitRegister) CleanChannel(m *mpool.MPool)

func (*WaitRegister) MarshalBinary added in v0.6.0

func (wreg *WaitRegister) MarshalBinary() ([]byte, error)

func (*WaitRegister) UnmarshalBinary added in v0.6.0

func (wreg *WaitRegister) UnmarshalBinary(_ []byte) error

type WrapCs added in v0.7.0

type WrapCs struct {
	MsgId uint64
	Uid   uuid.UUID
	Cs    morpc.ClientSession
	Err   chan error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL