Documentation ¶
Index ¶
- Constants
- func GetResultForTest(topnExec *TopNExec) []int64
- func InitTopNExecForTest(topnExec *TopNExec, offset uint64, sortedRowsInDisk *chunk.DataInDiskByChunks)
- func NewDataCursor() *dataCursor
- func SetSmallSpillChunkSizeForTest()
- func TestKillSignalInTopN(t *testing.T, topnExec *TopNExec)
- type SortExec
- func (e *SortExec) Close() error
- func (e *SortExec) GetRowNumInOnePartitionDiskForTest(idx int) int64
- func (e *SortExec) GetRowNumInOnePartitionMemoryForTest(idx int) int64
- func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)
- func (e *SortExec) GetSortPartitionListLenForTest() int
- func (e *SortExec) GetSpilledRowNumInParallelSortForTest() int64
- func (e *SortExec) InitUnparallelModeForTest()
- func (e *SortExec) IsSpillTriggeredInOnePartitionForTest(idx int) bool
- func (e *SortExec) IsSpillTriggeredInParallelSortForTest() bool
- func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error
- func (e *SortExec) Open(ctx context.Context) error
- type TopNExec
- func (e *TopNExec) Close() error
- func (e *TopNExec) GenerateTopNResultsWhenSpillOnlyOnce() error
- func (e *TopNExec) GetInMemoryThenSpillFlagForTest() bool
- func (e *TopNExec) GetIsSpillTriggeredInStage1ForTest() bool
- func (e *TopNExec) GetIsSpillTriggeredInStage2ForTest() bool
- func (e *TopNExec) IsSpillTriggeredForTest() bool
- func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error
- func (e *TopNExec) Open(ctx context.Context) error
Constants ¶
const SignalCheckpointForSort uint = 20000
SignalCheckpointForSort indicates the times of row comparation that a signal detection will be triggered.
Variables ¶
This section is empty.
Functions ¶
func GetResultForTest ¶
GetResultForTest gets result, only for test.
func InitTopNExecForTest ¶
func InitTopNExecForTest(topnExec *TopNExec, offset uint64, sortedRowsInDisk *chunk.DataInDiskByChunks)
InitTopNExecForTest initializes TopN executors, only for test.
func SetSmallSpillChunkSizeForTest ¶
func SetSmallSpillChunkSizeForTest()
SetSmallSpillChunkSizeForTest set spill chunk size for test.
func TestKillSignalInTopN ¶
TestKillSignalInTopN is for test
Types ¶
type SortExec ¶
type SortExec struct { exec.BaseExecutor ByItems []*plannerutil.ByItems ExecSchema *expression.Schema // TODO delete this variable in the future and remove the unparallel sort IsUnparallel bool Unparallel struct { Idx int // contains filtered or unexported fields } Parallel struct { // contains filtered or unexported fields } // contains filtered or unexported fields }
SortExec represents sorting executor.
func (*SortExec) GetRowNumInOnePartitionDiskForTest ¶
GetRowNumInOnePartitionDiskForTest returns number of rows a partition holds in disk, it's only used in test.
func (*SortExec) GetRowNumInOnePartitionMemoryForTest ¶
GetRowNumInOnePartitionMemoryForTest returns number of rows a partition holds in memory, it's only used in test.
func (*SortExec) GetSortMetaForTest ¶
func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)
GetSortMetaForTest returns some sort meta, it's only used in test.
func (*SortExec) GetSortPartitionListLenForTest ¶
GetSortPartitionListLenForTest returns the number of partitions, it's only used in test.
func (*SortExec) GetSpilledRowNumInParallelSortForTest ¶
GetSpilledRowNumInParallelSortForTest tells if spill is triggered in parallel sort.
func (*SortExec) InitUnparallelModeForTest ¶
func (e *SortExec) InitUnparallelModeForTest()
InitUnparallelModeForTest is for unit test
func (*SortExec) IsSpillTriggeredInOnePartitionForTest ¶
IsSpillTriggeredInOnePartitionForTest tells if spill is triggered in a specific partition, it's only used in test.
func (*SortExec) IsSpillTriggeredInParallelSortForTest ¶
IsSpillTriggeredInParallelSortForTest tells if spill is triggered in parallel sort.
func (*SortExec) Next ¶
Next implements the Executor Next interface. Sort constructs the result following these step in unparallel mode:
- Read as mush as rows into memory.
- If memory quota is triggered, sort these rows in memory and put them into disk as partition 1, then reset the memory quota trigger and return to step 1
- If memory quota is not triggered and child is consumed, sort these rows in memory as partition N.
- Merge sort if the count of partitions is larger than 1. If there is only one partition in step 4, it works just like in-memory sort before.
Here we explain the execution flow of the parallel sort implementation. There are 3 main components:
Chunks Fetcher: Fetcher is responsible for fetching chunks from child and send them to channel.
Parallel Sort Worker: Worker receives chunks from channel it will sort these chunks after the number of rows in these chunks exceeds limit, we call them as sorted rows after chunks are sorted. Then each worker will have several sorted rows, we use multi-way merge to sort them and each worker will have only one sorted rows in the end.
Result Generator: Generator gets n sorted rows from n workers, it will use multi-way merge to sort these rows, once it gets the next row, it will send it into `resultChannel` and the goroutine who calls `Next()` will fetch result from `resultChannel`.
┌─────────┐ │ Child │ └────▲────┘ │ Fetch │ ┌───────┴───────┐ │ Chunk Fetcher │ └───────┬───────┘ │ Push │ ▼ ┌────────────────►Channel◄───────────────────┐ │ ▲ │ │ │ │ Fetch Fetch Fetch │ │ │ ┌────┴───┐ ┌───┴────┐ ┌───┴────┐ │ Worker │ │ Worker │ ...... │ Worker │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ Sort Sort Sort │ │ │ │ │ │ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │ Sorted Rows │ │ Sorted Rows │ ...... │ Sorted Rows │ └──────▲──────┘ └──────▲──────┘ └──────▲──────┘ │ │ │ Pull Pull Pull │ │ │ └────────────────────┼───────────────────────┘ │ Multi-way Merge │ ┌──────┴──────┐ │ Generator │ └──────┬──────┘ │ Push │ ▼ resultChannel
type TopNExec ¶
type TopNExec struct { SortExec Limit *plannercore.PhysicalLimit Concurrency int // ColumnIdxsUsedByChild keep column indexes of child executor used for inline projection ColumnIdxsUsedByChild []int // contains filtered or unexported fields }
TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.
func (*TopNExec) GenerateTopNResultsWhenSpillOnlyOnce ¶
GenerateTopNResultsWhenSpillOnlyOnce generates results with this function when we trigger spill only once. It's a public function as we need to test it in ut.
func (*TopNExec) GetInMemoryThenSpillFlagForTest ¶
GetInMemoryThenSpillFlagForTest shows if results are in memory before they are spilled, only used for test
func (*TopNExec) GetIsSpillTriggeredInStage1ForTest ¶
GetIsSpillTriggeredInStage1ForTest shows if spill is triggered in stage 1, only used for test.
func (*TopNExec) GetIsSpillTriggeredInStage2ForTest ¶
GetIsSpillTriggeredInStage2ForTest shows if spill is triggered in stage 2, only used for test.
func (*TopNExec) IsSpillTriggeredForTest ¶
IsSpillTriggeredForTest shows if spill is triggered, used for test.
func (*TopNExec) Next ¶
Next implements the Executor Next interface.
The following picture shows the procedure of topn when spill is triggered.
Spill Stage:
┌─────────┐ │ Child │ └────▲────┘ │ Fetch │ ┌───────┴───────┐ │ Chunk Fetcher │ └───────┬───────┘ │ │ ▼ Check Spill──────►Spill Triggered─────────►Spill │ │ ▼ │ Spill Not Triggered │ │ │ ▼ │ Push Chunk◄─────────────────────────────────┘ │ ▼ ┌────────────────►Channel◄───────────────────┐ │ ▲ │ │ │ │ Fetch Fetch Fetch │ │ │ ┌────┴───┐ ┌───┴────┐ ┌───┴────┐ │ Worker │ │ Worker │ ...... │ Worker │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ │ ▼ │ └───────────► Multi-way Merge◄───────────────┘ │ │ ▼ Output
Restore Stage:
┌────────┐ ┌────────┐ ┌────────┐ │ Heap │ │ Heap │ ...... │ Heap │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ │ ▼ │ └───────────► Multi-way Merge◄───────────────┘ │ │ ▼ Output