sortexec

package
v1.1.0-beta.0...-a878e1f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const SignalCheckpointForSort uint = 20000

SignalCheckpointForSort indicates the times of row comparation that a signal detection will be triggered.

Variables

This section is empty.

Functions

func NewDataCursor

func NewDataCursor() *dataCursor

NewDataCursor creates a new dataCursor

func SetSmallSpillChunkSizeForTest

func SetSmallSpillChunkSizeForTest()

SetSmallSpillChunkSizeForTest set spill chunk size for test.

Types

type SortExec

type SortExec struct {
	exec.BaseExecutor

	ByItems []*plannerutil.ByItems

	ExecSchema *expression.Schema

	IsUnparallel bool

	Unparallel struct {
		Idx int
		// contains filtered or unexported fields
	}

	Parallel struct {
		// contains filtered or unexported fields
	}
	// contains filtered or unexported fields
}

SortExec represents sorting executor.

func (*SortExec) Close

func (e *SortExec) Close() error

Close implements the Executor Close interface.

func (*SortExec) GetRowNumInOnePartitionDiskForTest

func (e *SortExec) GetRowNumInOnePartitionDiskForTest(idx int) int64

GetRowNumInOnePartitionDiskForTest returns number of rows a partition holds in disk, it's only used in test.

func (*SortExec) GetRowNumInOnePartitionMemoryForTest

func (e *SortExec) GetRowNumInOnePartitionMemoryForTest(idx int) int64

GetRowNumInOnePartitionMemoryForTest returns number of rows a partition holds in memory, it's only used in test.

func (*SortExec) GetSortMetaForTest

func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)

GetSortMetaForTest returns some sort meta, it's only used in test.

func (*SortExec) GetSortPartitionListLenForTest

func (e *SortExec) GetSortPartitionListLenForTest() int

GetSortPartitionListLenForTest returns the number of partitions, it's only used in test.

func (*SortExec) GetSpilledRowNumInParallelSortForTest

func (e *SortExec) GetSpilledRowNumInParallelSortForTest() int64

GetSpilledRowNumInParallelSortForTest tells if spill is triggered in parallel sort.

func (*SortExec) InitInParallelModeForTest

func (e *SortExec) InitInParallelModeForTest()

InitInParallelModeForTest is a function for test After system variable is added, we can delete this function

func (*SortExec) IsSpillTriggeredInOnePartitionForTest

func (e *SortExec) IsSpillTriggeredInOnePartitionForTest(idx int) bool

IsSpillTriggeredInOnePartitionForTest tells if spill is triggered in a specific partition, it's only used in test.

func (*SortExec) IsSpillTriggeredInParallelSortForTest

func (e *SortExec) IsSpillTriggeredInParallelSortForTest() bool

IsSpillTriggeredInParallelSortForTest tells if spill is triggered in parallel sort.

func (*SortExec) Next

func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor Next interface. Sort constructs the result following these step in unparallel mode:

  1. Read as mush as rows into memory.
  2. If memory quota is triggered, sort these rows in memory and put them into disk as partition 1, then reset the memory quota trigger and return to step 1
  3. If memory quota is not triggered and child is consumed, sort these rows in memory as partition N.
  4. Merge sort if the count of partitions is larger than 1. If there is only one partition in step 4, it works just like in-memory sort before.

Here we explain the execution flow of the parallel sort implementation. There are 3 main components:

  1. Chunks Fetcher: Fetcher is responsible for fetching chunks from child and send them to channel.

  2. Parallel Sort Worker: Worker receives chunks from channel it will sort these chunks after the number of rows in these chunks exceeds limit, we call them as sorted rows after chunks are sorted. Then each worker will have several sorted rows, we use multi-way merge to sort them and each worker will have only one sorted rows in the end.

  3. Result Generator: Generator gets n sorted rows from n workers, it will use multi-way merge to sort these rows, once it gets the next row, it will send it into `resultChannel` and the goroutine who calls `Next()` will fetch result from `resultChannel`.

    ┌─────────┐ │ Child │ └────▲────┘ │ Fetch │ ┌───────┴───────┐ │ Chunk Fetcher │ └───────┬───────┘ │ Push │ ▼ ┌────────────────►Channel◄───────────────────┐ │ ▲ │ │ │ │ Fetch Fetch Fetch │ │ │ ┌────┴───┐ ┌───┴────┐ ┌───┴────┐ │ Worker │ │ Worker │ ...... │ Worker │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ Sort Sort Sort │ │ │ │ │ │ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │ Sorted Rows │ │ Sorted Rows │ ...... │ Sorted Rows │ └──────▲──────┘ └──────▲──────┘ └──────▲──────┘ │ │ │ Pull Pull Pull │ │ │ └────────────────────┼───────────────────────┘ │ Multi-way Merge │ ┌──────┴──────┐ │ Generator │ └──────┬──────┘ │ Push │ ▼ resultChannel

func (*SortExec) Open

func (e *SortExec) Open(ctx context.Context) error

Open implements the Executor Open interface.

type TopNExec

type TopNExec struct {
	SortExec
	Limit *plannercore.PhysicalLimit
	// contains filtered or unexported fields
}

TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.

func (*TopNExec) Next

func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor Next interface.

func (*TopNExec) Open

func (e *TopNExec) Open(ctx context.Context) error

Open implements the Executor Open interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL