sortexec

package
v1.1.0-beta.0...-0c6681f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const SignalCheckpointForSort uint = 20000

SignalCheckpointForSort indicates the times of row comparation that a signal detection will be triggered.

Variables

This section is empty.

Functions

func GetResultForTest

func GetResultForTest(topnExec *TopNExec) []int64

GetResultForTest gets result, only for test.

func InitTopNExecForTest

func InitTopNExecForTest(topnExec *TopNExec, offset uint64, sortedRowsInDisk *chunk.DataInDiskByChunks)

InitTopNExecForTest initializes TopN executors, only for test.

func NewDataCursor

func NewDataCursor() *dataCursor

NewDataCursor creates a new dataCursor

func SetSmallSpillChunkSizeForTest

func SetSmallSpillChunkSizeForTest()

SetSmallSpillChunkSizeForTest set spill chunk size for test.

func TestKillSignalInTopN

func TestKillSignalInTopN(t *testing.T, topnExec *TopNExec)

TestKillSignalInTopN is for test

Types

type SortExec

type SortExec struct {
	exec.BaseExecutor

	ByItems []*plannerutil.ByItems

	ExecSchema *expression.Schema

	// TODO delete this variable in the future and remove the unparallel sort
	IsUnparallel bool

	Unparallel struct {
		Idx int
		// contains filtered or unexported fields
	}

	Parallel struct {
		// contains filtered or unexported fields
	}
	// contains filtered or unexported fields
}

SortExec represents sorting executor.

func (*SortExec) Close

func (e *SortExec) Close() error

Close implements the Executor Close interface.

func (*SortExec) GetRowNumInOnePartitionDiskForTest

func (e *SortExec) GetRowNumInOnePartitionDiskForTest(idx int) int64

GetRowNumInOnePartitionDiskForTest returns number of rows a partition holds in disk, it's only used in test.

func (*SortExec) GetRowNumInOnePartitionMemoryForTest

func (e *SortExec) GetRowNumInOnePartitionMemoryForTest(idx int) int64

GetRowNumInOnePartitionMemoryForTest returns number of rows a partition holds in memory, it's only used in test.

func (*SortExec) GetSortMetaForTest

func (e *SortExec) GetSortMetaForTest() (keyColumns []int, keyCmpFuncs []chunk.CompareFunc, byItemsDesc []bool)

GetSortMetaForTest returns some sort meta, it's only used in test.

func (*SortExec) GetSortPartitionListLenForTest

func (e *SortExec) GetSortPartitionListLenForTest() int

GetSortPartitionListLenForTest returns the number of partitions, it's only used in test.

func (*SortExec) GetSpilledRowNumInParallelSortForTest

func (e *SortExec) GetSpilledRowNumInParallelSortForTest() int64

GetSpilledRowNumInParallelSortForTest tells if spill is triggered in parallel sort.

func (*SortExec) InitUnparallelModeForTest

func (e *SortExec) InitUnparallelModeForTest()

InitUnparallelModeForTest is for unit test

func (*SortExec) IsSpillTriggeredInOnePartitionForTest

func (e *SortExec) IsSpillTriggeredInOnePartitionForTest(idx int) bool

IsSpillTriggeredInOnePartitionForTest tells if spill is triggered in a specific partition, it's only used in test.

func (*SortExec) IsSpillTriggeredInParallelSortForTest

func (e *SortExec) IsSpillTriggeredInParallelSortForTest() bool

IsSpillTriggeredInParallelSortForTest tells if spill is triggered in parallel sort.

func (*SortExec) Next

func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor Next interface. Sort constructs the result following these step in unparallel mode:

  1. Read as mush as rows into memory.
  2. If memory quota is triggered, sort these rows in memory and put them into disk as partition 1, then reset the memory quota trigger and return to step 1
  3. If memory quota is not triggered and child is consumed, sort these rows in memory as partition N.
  4. Merge sort if the count of partitions is larger than 1. If there is only one partition in step 4, it works just like in-memory sort before.

Here we explain the execution flow of the parallel sort implementation. There are 3 main components:

  1. Chunks Fetcher: Fetcher is responsible for fetching chunks from child and send them to channel.

  2. Parallel Sort Worker: Worker receives chunks from channel it will sort these chunks after the number of rows in these chunks exceeds limit, we call them as sorted rows after chunks are sorted. Then each worker will have several sorted rows, we use multi-way merge to sort them and each worker will have only one sorted rows in the end.

  3. Result Generator: Generator gets n sorted rows from n workers, it will use multi-way merge to sort these rows, once it gets the next row, it will send it into `resultChannel` and the goroutine who calls `Next()` will fetch result from `resultChannel`.

    ┌─────────┐ │ Child │ └────▲────┘ │ Fetch │ ┌───────┴───────┐ │ Chunk Fetcher │ └───────┬───────┘ │ Push │ ▼ ┌────────────────►Channel◄───────────────────┐ │ ▲ │ │ │ │ Fetch Fetch Fetch │ │ │ ┌────┴───┐ ┌───┴────┐ ┌───┴────┐ │ Worker │ │ Worker │ ...... │ Worker │ └────┬───┘ └───┬────┘ └───┬────┘ │ │ │ │ │ │ Sort Sort Sort │ │ │ │ │ │ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │ Sorted Rows │ │ Sorted Rows │ ...... │ Sorted Rows │ └──────▲──────┘ └──────▲──────┘ └──────▲──────┘ │ │ │ Pull Pull Pull │ │ │ └────────────────────┼───────────────────────┘ │ Multi-way Merge │ ┌──────┴──────┐ │ Generator │ └──────┬──────┘ │ Push │ ▼ resultChannel

func (*SortExec) Open

func (e *SortExec) Open(ctx context.Context) error

Open implements the Executor Open interface.

type TopNExec

type TopNExec struct {
	SortExec
	Limit *plannercore.PhysicalLimit

	Concurrency int

	// ColumnIdxsUsedByChild keep column indexes of child executor used for inline projection
	ColumnIdxsUsedByChild []int
	// contains filtered or unexported fields
}

TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.

func (*TopNExec) Close

func (e *TopNExec) Close() error

Close implements the Executor Close interface.

func (*TopNExec) GenerateTopNResultsWhenSpillOnlyOnce

func (e *TopNExec) GenerateTopNResultsWhenSpillOnlyOnce() error

GenerateTopNResultsWhenSpillOnlyOnce generates results with this function when we trigger spill only once. It's a public function as we need to test it in ut.

func (*TopNExec) GetInMemoryThenSpillFlagForTest

func (e *TopNExec) GetInMemoryThenSpillFlagForTest() bool

GetInMemoryThenSpillFlagForTest shows if results are in memory before they are spilled, only used for test

func (*TopNExec) GetIsSpillTriggeredInStage1ForTest

func (e *TopNExec) GetIsSpillTriggeredInStage1ForTest() bool

GetIsSpillTriggeredInStage1ForTest shows if spill is triggered in stage 1, only used for test.

func (*TopNExec) GetIsSpillTriggeredInStage2ForTest

func (e *TopNExec) GetIsSpillTriggeredInStage2ForTest() bool

GetIsSpillTriggeredInStage2ForTest shows if spill is triggered in stage 2, only used for test.

func (*TopNExec) IsSpillTriggeredForTest

func (e *TopNExec) IsSpillTriggeredForTest() bool

IsSpillTriggeredForTest shows if spill is triggered, used for test.

func (*TopNExec) Next

func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor Next interface.

The following picture shows the procedure of topn when spill is triggered.

Spill Stage:

                     ┌─────────┐
                     │  Child  │
                     └────▲────┘
                          │
                        Fetch
                          │
                  ┌───────┴───────┐
                  │ Chunk Fetcher │
                  └───────┬───────┘
                          │
                          │
                          ▼
                     Check Spill──────►Spill Triggered─────────►Spill
                          │                                       │
                          ▼                                       │
                  Spill Not Triggered                             │
                          │                                       │
                          ▼                                       │
                      Push Chunk◄─────────────────────────────────┘
                          │
                          ▼
     ┌────────────────►Channel◄───────────────────┐
     │                    ▲                       │
     │                    │                       │
   Fetch                Fetch                   Fetch
     │                    │                       │
┌────┴───┐            ┌───┴────┐              ┌───┴────┐
│ Worker │            │ Worker │   ......     │ Worker │
└────┬───┘            └───┬────┘              └───┬────┘
     │                    │                       │
     │                    │                       │
     │                    ▼                       │
     └───────────► Multi-way Merge◄───────────────┘
                          │
                          │
                          ▼
                       Output

Restore Stage:

┌────────┐            ┌────────┐              ┌────────┐
│  Heap  │            │  Heap  │   ......     │  Heap  │
└────┬───┘            └───┬────┘              └───┬────┘
     │                    │                       │
     │                    │                       │
     │                    ▼                       │
     └───────────► Multi-way Merge◄───────────────┘
                          │
                          │
                          ▼
                       Output

func (*TopNExec) Open

func (e *TopNExec) Open(ctx context.Context) error

Open implements the Executor Open interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL