process

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Analyze added in v0.6.0

type Analyze interface {
	Stop()
	Start()
	Alloc(int64)
	Input(*batch.Batch, bool)
	Output(*batch.Batch, bool)
	WaitStop(time.Time)
	DiskIO(*batch.Batch)
	S3IOByte(*batch.Batch)
	S3IOCount(int)
	Network(*batch.Batch)
	AddScanTime(t time.Time)
	AddInsertTime(t time.Time)
}

Analyze analyzes information for operator

type AnalyzeInfo added in v0.5.1

type AnalyzeInfo struct {
	// NodeId, index of query's node list
	NodeId int32
	// InputRows, number of rows accepted by node
	InputRows int64
	// OutputRows, number of rows output by node
	OutputRows int64
	// TimeConsumed, time taken by the node in milliseconds
	TimeConsumed int64
	// WaitTimeConsumed, time taken by the node waiting for channel in milliseconds
	WaitTimeConsumed int64
	// InputSize, data size accepted by node
	InputSize int64
	// OutputSize, data size output by node
	OutputSize int64
	// MemorySize, memory alloc by node
	MemorySize int64
	// DiskIO, data size read from disk
	DiskIO int64
	// S3IOByte, data size read from s3
	S3IOByte int64
	// S3IOCount, query count that read from s3
	S3IOCount int64
	// NetworkIO, message size send between CN node
	NetworkIO int64
	// ScanTime, scan cost time in external scan
	ScanTime int64
	// InsertTime, insert cost time in load flow
	InsertTime int64
}

AnalyzeInfo analyze information for query

func NewAnalyzeInfo added in v0.6.0

func NewAnalyzeInfo(nodeId int32) *AnalyzeInfo

type Limitation

type Limitation struct {
	// Size, memory threshold for operator.
	Size int64
	// BatchRows, max rows for batch.
	BatchRows int64
	// BatchSize, max size for batch.
	BatchSize int64
	// PartitionRows, max rows for partition.
	PartitionRows int64
	// ReaderSize, memory threshold for storage's reader
	ReaderSize int64
	// MaxMessageSize max size for read messages from dn
	MaxMsgSize uint64
}

Limitation specifies the maximum resources that can be used in one query.

type Process

type Process struct {
	// Id, query id.
	Id  string
	Reg Register
	Lim Limitation

	// unix timestamp
	UnixTime int64

	TxnClient client.TxnClient

	TxnOperator client.TxnOperator

	AnalInfos []*AnalyzeInfo

	SessionInfo SessionInfo

	Ctx context.Context

	Cancel context.CancelFunc

	FileService fileservice.FileService

	GetClusterDetails engine.GetClusterDetailsFunc

	LoadTag bool

	LastInsertID *uint64

	LoadLocalReader *io.PipeReader

	DispatchNotifyCh chan WrapCs
	// contains filtered or unexported fields
}

Process contains context used in query execution one or more pipeline will be generated for one query, and one pipeline has one process instance.

func New

func New(
	ctx context.Context,
	m *mpool.MPool,
	txnClient client.TxnClient,
	txnOperator client.TxnOperator,
	fileService fileservice.FileService,
	getClusterDetails engine.GetClusterDetailsFunc,
) *Process

New creates a new Process. A process stores the execution context.

func NewFromProc added in v0.5.0

func NewFromProc(p *Process, ctx context.Context, regNumber int) *Process

NewFromProc create a new Process based on another process.

func NewWithAnalyze added in v0.6.0

func NewWithAnalyze(p *Process, ctx context.Context, regNumber int, anals []*AnalyzeInfo) *Process

func (*Process) AllocBoolScalarVector added in v0.6.0

func (proc *Process) AllocBoolScalarVector(v bool) *vector.Vector

func (*Process) AllocConstNullVector added in v0.5.1

func (proc *Process) AllocConstNullVector(typ types.Type, cnt int) *vector.Vector

func (*Process) AllocInt64ScalarVector added in v0.6.0

func (proc *Process) AllocInt64ScalarVector(v int64) *vector.Vector

func (*Process) AllocScalarNullVector added in v0.5.0

func (proc *Process) AllocScalarNullVector(typ types.Type) *vector.Vector

func (*Process) AllocScalarVector added in v0.5.0

func (proc *Process) AllocScalarVector(typ types.Type) *vector.Vector

func (*Process) AllocVector added in v0.5.0

func (proc *Process) AllocVector(typ types.Type, size int64) (*vector.Vector, error)

func (*Process) AllocVectorOfRows added in v0.6.0

func (proc *Process) AllocVectorOfRows(typ types.Type, nele int64, nsp *nulls.Nulls) (*vector.Vector, error)

func (*Process) GetAnalyze added in v0.6.0

func (proc *Process) GetAnalyze(idx int) Analyze

func (*Process) GetLastInsertID added in v0.7.0

func (proc *Process) GetLastInsertID() uint64

func (*Process) GetMPool added in v0.6.0

func (proc *Process) GetMPool() *mpool.MPool

func (*Process) InputBatch added in v0.6.0

func (proc *Process) InputBatch() *batch.Batch

func (*Process) MarshalBinary added in v0.6.0

func (proc *Process) MarshalBinary() ([]byte, error)

func (*Process) Mp

func (proc *Process) Mp() *mpool.MPool

func (*Process) OperatorOutofMemory added in v0.6.0

func (proc *Process) OperatorOutofMemory(size int64) bool

func (*Process) QueryId added in v0.6.0

func (proc *Process) QueryId() string

func (*Process) SetInputBatch added in v0.6.0

func (proc *Process) SetInputBatch(bat *batch.Batch)

func (*Process) SetLastInsertID added in v0.7.0

func (proc *Process) SetLastInsertID(num uint64)

func (*Process) SetQueryId added in v0.6.0

func (proc *Process) SetQueryId(id string)

func (*Process) UnmarshalBinary added in v0.6.0

func (proc *Process) UnmarshalBinary(_ []byte) error

func (*Process) WithSpanContext added in v0.7.0

func (proc *Process) WithSpanContext(sc trace.SpanContext)

type Register

type Register struct {
	// Ss, temporarily stores the row number list in the execution of operators,
	// and it can be reused in the future execution.
	Ss [][]int64
	// InputBatch, stores the result of the previous operator.
	InputBatch *batch.Batch
	// MergeReceivers, receives result of multi previous operators from other pipelines
	// e.g. merge operator.
	MergeReceivers []*WaitRegister
}

Register used in execution pipeline and shared with all operators of the same pipeline.

type SessionInfo added in v0.5.0

type SessionInfo struct {
	Account           string
	User              string
	Host              string
	Role              string
	ConnectionID      uint64
	AccountId         uint32
	RoleId            uint32
	UserId            uint32
	LastInsertID      uint64
	Database          string
	Version           string
	TimeZone          *time.Location
	StorageEngine     engine.Engine
	QueryId           []string
	ResultColTypes    []types.Type
	AutoIncrCaches    defines.AutoIncrCaches
	AutoIncrCacheSize uint64
}

SessionInfo session information

func (*SessionInfo) GetCharset added in v0.5.0

func (si *SessionInfo) GetCharset() string

func (*SessionInfo) GetCollation added in v0.5.0

func (si *SessionInfo) GetCollation() string

func (*SessionInfo) GetConnectionID added in v0.5.0

func (si *SessionInfo) GetConnectionID() uint64

func (*SessionInfo) GetDatabase added in v0.5.0

func (si *SessionInfo) GetDatabase() string

func (*SessionInfo) GetHost added in v0.5.0

func (si *SessionInfo) GetHost() string

func (*SessionInfo) GetRole added in v0.5.0

func (si *SessionInfo) GetRole() string

func (*SessionInfo) GetUser added in v0.5.0

func (si *SessionInfo) GetUser() string

func (*SessionInfo) GetUserHost added in v0.5.0

func (si *SessionInfo) GetUserHost() string

func (*SessionInfo) GetVersion added in v0.5.0

func (si *SessionInfo) GetVersion() string

type WaitRegister

type WaitRegister struct {
	Ctx context.Context
	Ch  chan *batch.Batch
}

WaitRegister channel

func (*WaitRegister) MarshalBinary added in v0.6.0

func (wreg *WaitRegister) MarshalBinary() ([]byte, error)

func (*WaitRegister) UnmarshalBinary added in v0.6.0

func (wreg *WaitRegister) UnmarshalBinary(_ []byte) error

type WrapCs added in v0.7.0

type WrapCs struct {
	MsgId  uint64
	Uid    uuid.UUID
	Cs     morpc.ClientSession
	DoneCh chan struct{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL