Documentation ¶
Index ¶
- Constants
- func NewDataCursor() *dataCursor
- func SetSmallSpillChunkSizeForTest()
- type SortExec
- func (e *SortExec) Close() error
- func (e *SortExec) GetRowNumInOnePartitionDiskForTest(idx int) int64
- func (e *SortExec) GetRowNumInOnePartitionMemoryForTest(idx int) int64
- func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)
- func (e *SortExec) GetSortPartitionListLenForTest() int
- func (e *SortExec) GetSpilledRowNumInParallelSortForTest() int64
- func (e *SortExec) InitInParallelModeForTest()
- func (e *SortExec) IsSpillTriggeredInOnePartitionForTest(idx int) bool
- func (e *SortExec) IsSpillTriggeredInParallelSortForTest() bool
- func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error
- func (e *SortExec) Open(ctx context.Context) error
- type TopNExec
Constants ¶
const SignalCheckpointForSort uint = 20000
SignalCheckpointForSort indicates the times of row comparation that a signal detection will be triggered.
Variables ¶
This section is empty.
Functions ¶
func SetSmallSpillChunkSizeForTest ¶
func SetSmallSpillChunkSizeForTest()
SetSmallSpillChunkSizeForTest set spill chunk size for test.
Types ¶
type SortExec ¶
type SortExec struct { exec.BaseExecutor ByItems []*plannerutil.ByItems ExecSchema *expression.Schema IsUnparallel bool Unparallel struct { Idx int // contains filtered or unexported fields } Parallel struct { // contains filtered or unexported fields } // contains filtered or unexported fields }
SortExec represents sorting executor.
func (*SortExec) GetRowNumInOnePartitionDiskForTest ¶
GetRowNumInOnePartitionDiskForTest returns number of rows a partition holds in disk, it's only used in test.
func (*SortExec) GetRowNumInOnePartitionMemoryForTest ¶
GetRowNumInOnePartitionMemoryForTest returns number of rows a partition holds in memory, it's only used in test.
func (*SortExec) GetSortMetaForTest ¶
func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)
GetSortMetaForTest returns some sort meta, it's only used in test.
func (*SortExec) GetSortPartitionListLenForTest ¶
GetSortPartitionListLenForTest returns the number of partitions, it's only used in test.
func (*SortExec) GetSpilledRowNumInParallelSortForTest ¶
GetSpilledRowNumInParallelSortForTest tells if spill is triggered in parallel sort.
func (*SortExec) InitInParallelModeForTest ¶
func (e *SortExec) InitInParallelModeForTest()
InitInParallelModeForTest is a function for test After system variable is added, we can delete this function
func (*SortExec) IsSpillTriggeredInOnePartitionForTest ¶
IsSpillTriggeredInOnePartitionForTest tells if spill is triggered in a specific partition, it's only used in test.
func (*SortExec) IsSpillTriggeredInParallelSortForTest ¶
IsSpillTriggeredInParallelSortForTest tells if spill is triggered in parallel sort.
func (*SortExec) Next ¶
Next implements the Executor Next interface. Sort constructs the result following these step in unparallel mode:
- Read as mush as rows into memory.
- If memory quota is triggered, sort these rows in memory and put them into disk as partition 1, then reset the memory quota trigger and return to step 1
- If memory quota is not triggered and child is consumed, sort these rows in memory as partition N.
- Merge sort if the count of partitions is larger than 1. If there is only one partition in step 4, it works just like in-memory sort before.
Here we explain the execution flow of the parallel sort implementation. There are 3 main components:
Chunks Fetcher: Fetcher is responsible for fetching chunks from child and send them to channel.
Parallel Sort Worker: Worker receives chunks from channel it will sort these chunks after the number of rows in these chunks exceeds limit, we call them as sorted rows after chunks are sorted. Then each worker will have several sorted rows, we use multi-way merge to sort them and each worker will have only one sorted rows in the end.
Result Generator: Generator gets n sorted rows from n workers, it will use multi-way merge to sort these rows, once it gets the next row, it will send it into `resultChannel` and the goroutine who calls `Next()` will fetch result from `resultChannel`.
┌─────────┐ │ Child │ └────▲────┘ │ Fetch │ ┌───────┴───────┐ │ Chunk Fetcher │ └───────┬───────┘ │ Push │ ▼ ┌────────────────►Channel◄───────────────────┐ │ ▲ │ │ │ │ Fetch Fetch Fetch │ │ │ ┌────┴───┐ ┌───┴────┐ ┌───┴────┐ │ Worker │ │ Worker │ ...... │ Worker │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ Sort Sort Sort │ │ │ │ │ │ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │ Sorted Rows │ │ Sorted Rows │ ...... │ Sorted Rows │ └──────▲──────┘ └──────▲──────┘ └──────▲──────┘ │ │ │ Pull Pull Pull │ │ │ └────────────────────┼───────────────────────┘ │ Multi-way Merge │ ┌──────┴──────┐ │ Generator │ └──────┬──────┘ │ Push │ ▼ resultChannel
type TopNExec ¶
type TopNExec struct { SortExec Limit *plannercore.PhysicalLimit // contains filtered or unexported fields }
TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.