Documentation ¶
Index ¶
- Constants
- Variables
- func IsHashJoinV2Supported() bool
- func JoinerType(j Joiner) logicalop.JoinType
- func NewConcurrentMapHashTable() *concurrentMapHashTable
- func NewJoinRuntimeStats() *joinRuntimeStats
- type BaseHashTable
- type BuildWorkerV1
- type BuildWorkerV2
- type HashContext
- type HashJoinCtxV1
- type HashJoinCtxV2
- type HashJoinV1Exec
- type HashJoinV2Exec
- type IndexJoinExecutorBuilder
- type IndexJoinLookUpContent
- type IndexLookUpJoin
- type IndexLookUpMergeJoin
- type IndexNestedLoopHashJoin
- type InnerCtx
- type InnerMergeCtx
- type IterCb
- type Joiner
- type MergeJoinExec
- type MergeJoinTable
- type NAAJType
- type NestedLoopApplyExec
- type OuterCtx
- type OuterMergeCtx
- type ProbeSideTupleFetcherV1
- type ProbeSideTupleFetcherV2
- type ProbeV2
- type ProbeWorkerV1
- type ProbeWorkerV2
- type UpsertCb
Constants ¶
const ( // OneInt64 mean the key contains only one Int64 OneInt64 keyMode = iota // FixedSerializedKey mean the key has fixed length FixedSerializedKey // VariableSerializedKey mean the key has variable length VariableSerializedKey )
Variables ¶
var ( // EnableHashJoinV2 enable hash join v2, used for test EnableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionOptimized // DisableHashJoinV2 disable hash join v2, used for test DisableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionLegacy // HashJoinV2Strings is used for test HashJoinV2Strings = []string{DisableHashJoinV2, EnableHashJoinV2} )
var ShardCount = 320
ShardCount controls the shard maps within the concurrent map
Functions ¶
func IsHashJoinV2Supported ¶
func IsHashJoinV2Supported() bool
IsHashJoinV2Supported return true if hash join v2 is supported in current env
func JoinerType ¶
JoinerType returns the join type of a Joiner.
func NewConcurrentMapHashTable ¶
func NewConcurrentMapHashTable() *concurrentMapHashTable
NewConcurrentMapHashTable creates a concurrentMapHashTable
func NewJoinRuntimeStats ¶
func NewJoinRuntimeStats() *joinRuntimeStats
NewJoinRuntimeStats returns a new joinRuntimeStats
Types ¶
type BaseHashTable ¶
type BaseHashTable interface { Put(hashKey uint64, rowPtr chunk.RowPtr) // e := Get(hashKey) // for ; e != nil; e = e.Next { // rowPtr := e.Ptr // ... // } Get(hashKey uint64) *entry Len() uint64 // GetAndCleanMemoryDelta gets and cleans the memDelta of the BaseHashTable. Memory delta will be cleared after each fetch. // It indicates the memory delta of the BaseHashTable since the last calling GetAndCleanMemoryDelta(). GetAndCleanMemoryDelta() int64 Iter(func(uint64, *entry)) }
BaseHashTable is the interface of the hash table used in hash join
type BuildWorkerV1 ¶
type BuildWorkerV1 struct { HashJoinCtx *HashJoinCtxV1 BuildNAKeyColIdx []int // contains filtered or unexported fields }
BuildWorkerV1 is the build side worker in hash join
func (*BuildWorkerV1) BuildHashTableForList ¶
func (w *BuildWorkerV1) BuildHashTableForList(buildSideResultCh <-chan *chunk.Chunk) error
BuildHashTableForList builds hash table from `list`.
type BuildWorkerV2 ¶
type BuildWorkerV2 struct { HashJoinCtx *HashJoinCtxV2 BuildTypes []*types.FieldType HasNullableKey bool WorkerID uint // contains filtered or unexported fields }
BuildWorkerV2 is the build worker used in hash join v2
func NewJoinBuildWorkerV2 ¶
func NewJoinBuildWorkerV2(ctx *HashJoinCtxV2, workID uint, buildSideExec exec.Executor, buildKeyColIdx []int, buildTypes []*types.FieldType) *BuildWorkerV2
NewJoinBuildWorkerV2 create a BuildWorkerV2
type HashContext ¶
type HashContext struct { // AllTypes one-to-one correspondence with KeyColIdx AllTypes []*types.FieldType KeyColIdx []int NaKeyColIdx []int Buf []byte HashVals []hash.Hash64 HasNull []bool // contains filtered or unexported fields }
HashContext keeps the needed hash context of a db table in hash join.
type HashJoinCtxV1 ¶
type HashJoinCtxV1 struct { UseOuterToBuild bool IsOuterJoin bool RowContainer *hashRowContainer ProbeTypes []*types.FieldType BuildTypes []*types.FieldType OuterFilter expression.CNFExprs // contains filtered or unexported fields }
HashJoinCtxV1 is the context used in hash join
type HashJoinCtxV2 ¶
type HashJoinCtxV2 struct { ProbeKeyTypes []*types.FieldType BuildKeyTypes []*types.FieldType RightAsBuildSide bool BuildFilter expression.CNFExprs ProbeFilter expression.CNFExprs OtherCondition expression.CNFExprs LUsed, RUsed []int LUsedInOtherCondition, RUsedInOtherCondition []int // contains filtered or unexported fields }
HashJoinCtxV2 is the hash join ctx used in hash join v2
func (*HashJoinCtxV2) SetupPartitionInfo ¶
func (hCtx *HashJoinCtxV2) SetupPartitionInfo()
SetupPartitionInfo set up partitionNumber and partitionMaskOffset based on concurrency
type HashJoinV1Exec ¶
type HashJoinV1Exec struct { exec.BaseExecutor *HashJoinCtxV1 ProbeSideTupleFetcher *ProbeSideTupleFetcherV1 ProbeWorkers []*ProbeWorkerV1 BuildWorker *BuildWorkerV1 Prepared bool // contains filtered or unexported fields }
HashJoinV1Exec implements the hash join algorithm.
func (*HashJoinV1Exec) Close ¶
func (e *HashJoinV1Exec) Close() error
Close implements the Executor Close interface.
func (*HashJoinV1Exec) Next ¶
Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.
type HashJoinV2Exec ¶
type HashJoinV2Exec struct { exec.BaseExecutor *HashJoinCtxV2 ProbeSideTupleFetcher *ProbeSideTupleFetcherV2 ProbeWorkers []*ProbeWorkerV2 BuildWorkers []*BuildWorkerV2 // contains filtered or unexported fields }
HashJoinV2Exec implements the hash join algorithm.
func (*HashJoinV2Exec) Close ¶
func (e *HashJoinV2Exec) Close() error
Close implements the Executor Close interface.
func (*HashJoinV2Exec) Next ¶
Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.
type IndexJoinExecutorBuilder ¶
type IndexJoinExecutorBuilder interface { BuildExecutorForIndexJoin(ctx context.Context, lookUpContents []*IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error) }
IndexJoinExecutorBuilder is the interface used by index lookup join to build the executor, this interface is added to avoid cycle import
type IndexJoinLookUpContent ¶
type IndexJoinLookUpContent struct { Keys []types.Datum Row chunk.Row KeyColIDs []int64 // the original ID in its table, used by dynamic partition pruning // contains filtered or unexported fields }
IndexJoinLookUpContent is the content used in index lookup join
type IndexLookUpJoin ¶
type IndexLookUpJoin struct { exec.BaseExecutor WorkerWg *sync.WaitGroup OuterCtx OuterCtx InnerCtx InnerCtx JoinResult *chunk.Chunk Joiner Joiner IsOuterJoin bool IndexRanges ranger.MutableRanges KeyOff2IdxOff []int // LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100. LastColHelper *plannercore.ColWithCmpFuncManager Finished *atomic.Value // contains filtered or unexported fields }
IndexLookUpJoin employs one outer worker and N innerWorkers to execute concurrently. It preserves the order of the outer table and support batch lookup.
The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, builds inner row hash map. 3. main thread receives the task, waits for inner worker finish handling the task. 4. main thread join each outer row by look up the inner rows hash map in the task.
func (*IndexLookUpJoin) Close ¶
func (e *IndexLookUpJoin) Close() error
Close implements the Executor interface.
type IndexLookUpMergeJoin ¶
type IndexLookUpMergeJoin struct { exec.BaseExecutor WorkerWg *sync.WaitGroup OuterMergeCtx OuterMergeCtx InnerMergeCtx InnerMergeCtx Joiners []Joiner IsOuterJoin bool IndexRanges ranger.MutableRanges KeyOff2IdxOff []int // LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100. LastColHelper *plannercore.ColWithCmpFuncManager // contains filtered or unexported fields }
IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join It preserves the order of the outer table and support batch lookup.
The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join. 3. main thread receives the task and fetch results from the channel in task one by one. 4. If channel has been closed, main thread receives the next task.
func (*IndexLookUpMergeJoin) Close ¶
func (e *IndexLookUpMergeJoin) Close() error
Close implements the Executor interface.
type IndexNestedLoopHashJoin ¶
type IndexNestedLoopHashJoin struct { IndexLookUpJoin // We build individual joiner for each inner worker when using chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. Joiners []Joiner KeepOuterOrder bool // contains filtered or unexported fields }
IndexNestedLoopHashJoin employs one outer worker and N inner workers to execute concurrently. The output order is not promised.
The execution flow is very similar to IndexLookUpReader: 1. The outer worker reads N outer rows, builds a task and sends it to the inner worker channel. 2. The inner worker receives the tasks and does 3 things for every task:
- builds hash table from the outer rows
- builds key ranges from outer rows and fetches inner rows
- probes the hash table and sends the join result to the main thread channel. Note: step 1 and step 2 runs concurrently.
3. The main thread receives the join results.
func (*IndexNestedLoopHashJoin) Close ¶
func (e *IndexNestedLoopHashJoin) Close() error
Close implements the IndexNestedLoopHashJoin Executor interface.
type InnerCtx ¶
type InnerCtx struct { ReaderBuilder IndexJoinExecutorBuilder RowTypes []*types.FieldType KeyCols []int KeyColIDs []int64 // the original ID in its table, used by dynamic partition pruning KeyCollators []collate.Collator HashTypes []*types.FieldType HashCols []int HashCollators []collate.Collator ColLens []int HasPrefixCol bool }
InnerCtx is the inner side ctx used in index lookup join
type InnerMergeCtx ¶
type InnerMergeCtx struct { ReaderBuilder IndexJoinExecutorBuilder RowTypes []*types.FieldType JoinKeys []*expression.Column KeyCols []int KeyCollators []collate.Collator CompareFuncs []expression.CompareFunc ColLens []int Desc bool KeyOff2KeyOffOrderByIdx []int }
InnerMergeCtx is the inner side ctx of merge join
type IterCb ¶
type IterCb func(key uint64, e *entry)
IterCb :Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type Joiner ¶
type Joiner interface { // TryToMatchInners tries to join an outer row with a batch of inner rows. When // 'inners.Len != 0' but all the joined rows are filtered, the outer row is // considered unmatched. Otherwise, the outer row is matched and some joined // rows are appended to `chk`. The size of `chk` is limited to MaxChunkSize. // Note that when the outer row is considered unmatched, we need to differentiate // whether the join conditions return null or false, because that matters for // AntiSemiJoin/LeftOuterSemiJoin/AntiLeftOuterSemiJoin, by setting the return // value isNull; for other join types, isNull is always false. // // NOTE: Callers need to call this function multiple times to consume all // the inner rows for an outer row, and decide whether the outer row can be // matched with at lease one inner row. TryToMatchInners(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk, opt ...NAAJType) (matched bool, isNull bool, err error) // TryToMatchOuters tries to join a batch of outer rows with one inner row. // It's used when the join is an outer join and the hash table is built // using the outer side. TryToMatchOuters(outer chunk.Iterator, inner chunk.Row, chk *chunk.Chunk, outerRowStatus []outerRowStatusFlag) (_ []outerRowStatusFlag, err error) // OnMissMatch operates on the unmatched outer row according to the join // type. An outer row can be considered miss matched if: // 1. it can not pass the filter on the outer table side. // 2. there is no inner row with the same join key. // 3. all the joined rows can not pass the filter on the join result. // // On these conditions, the caller calls this function to handle the // unmatched outer rows according to the current join type: // 1. 'SemiJoin': ignores the unmatched outer row. // 2. 'AntiSemiJoin': appends the unmatched outer row to the result buffer. // 3. 'LeftOuterSemiJoin': concats the unmatched outer row with 0 and // appends it to the result buffer. // 4. 'AntiLeftOuterSemiJoin': concats the unmatched outer row with 1 and // appends it to the result buffer. // 5. 'LeftOuterJoin': concats the unmatched outer row with a row of NULLs // and appends it to the result buffer. // 6. 'RightOuterJoin': concats the unmatched outer row with a row of NULLs // and appends it to the result buffer. // 7. 'InnerJoin': ignores the unmatched outer row. // // Note that, for LeftOuterSemiJoin, AntiSemiJoin and AntiLeftOuterSemiJoin, // we need to know the reason of outer row being treated as unmatched: // whether the join condition returns false, or returns null, because // it decides if this outer row should be outputted, hence we have a `hasNull` // parameter passed to `OnMissMatch`. OnMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk) // Clone deep copies a Joiner. Clone() Joiner // contains filtered or unexported methods }
Joiner is used to generate join results according to the join type. A typical instruction flow is:
hasMatch, HasNull := false, false for innerIter.Current() != innerIter.End() { matched, isNull, err := j.TryToMatchInners(Outer, innerIter, chk) // handle err hasMatch = hasMatch || matched HasNull = HasNull || isNull } if !hasMatch { j.OnMissMatch(HasNull, Outer, chk) }
NOTE: This interface is **not** thread-safe. TODO: unit test for all join type
- no filter, no inline projection
- no filter, inline projection
- no filter, inline projection to empty column
- filter, no inline projection
- filter, inline projection
- filter, inline projection to empty column
type MergeJoinExec ¶
type MergeJoinExec struct { exec.BaseExecutor StmtCtx *stmtctx.StatementContext CompareFuncs []expression.CompareFunc Joiner Joiner IsOuterJoin bool Desc bool InnerTable *MergeJoinTable OuterTable *MergeJoinTable // contains filtered or unexported fields }
MergeJoinExec implements the merge join algorithm. This operator assumes that two iterators of both sides will provide required order on join condition: 1. For equal-join, one of the join key from each side matches the order given. 2. For other cases its preferred not to use SMJ and operator will throw error.
func (*MergeJoinExec) Close ¶
func (e *MergeJoinExec) Close() error
Close implements the Executor Close interface.
type MergeJoinTable ¶
type MergeJoinTable struct { IsInner bool ChildIndex int JoinKeys []*expression.Column Filters []expression.Expression // contains filtered or unexported fields }
MergeJoinTable is used for merge join
type NAAJType ¶
type NAAJType byte
NAAJType is join detail type only used by null-aware AntiLeftOuterSemiJoin.
const ( // Unknown for those default value. Unknown NAAJType = 0 // LeftHasNullRightNotNull means lhs is a null key, and rhs is not a null key. LeftHasNullRightNotNull NAAJType = 1 // LeftHasNullRightHasNull means lhs is a null key, and rhs is a null key. LeftHasNullRightHasNull NAAJType = 2 // LeftNotNullRightNotNull means lhs is in not a null key, and rhs is not a null key. LeftNotNullRightNotNull NAAJType = 3 // LeftNotNullRightHasNull means lhs is in not a null key, and rhs is a null key. LeftNotNullRightHasNull NAAJType = 4 )
type NestedLoopApplyExec ¶
type NestedLoopApplyExec struct { exec.BaseExecutor Sctx sessionctx.Context InnerExec exec.Executor OuterExec exec.Executor InnerFilter expression.CNFExprs OuterFilter expression.CNFExprs Joiner Joiner CanUseCache bool OuterSchema []*expression.CorrelatedColumn OuterChunk *chunk.Chunk InnerList *chunk.List InnerChunk *chunk.Chunk Outer bool // contains filtered or unexported fields }
NestedLoopApplyExec is the executor for apply.
func (*NestedLoopApplyExec) Close ¶
func (e *NestedLoopApplyExec) Close() error
Close implements the Executor interface.
type OuterCtx ¶
type OuterCtx struct { RowTypes []*types.FieldType KeyCols []int HashTypes []*types.FieldType HashCols []int Filter expression.CNFExprs }
OuterCtx is the outer ctx used in index lookup join
type OuterMergeCtx ¶
type OuterMergeCtx struct { RowTypes []*types.FieldType JoinKeys []*expression.Column KeyCols []int Filter expression.CNFExprs NeedOuterSort bool CompareFuncs []expression.CompareFunc }
OuterMergeCtx is the outer side ctx of merge join
type ProbeSideTupleFetcherV1 ¶
type ProbeSideTupleFetcherV1 struct { *HashJoinCtxV1 // contains filtered or unexported fields }
ProbeSideTupleFetcherV1 reads tuples from ProbeSideExec and send them to ProbeWorkers.
type ProbeSideTupleFetcherV2 ¶
type ProbeSideTupleFetcherV2 struct { *HashJoinCtxV2 // contains filtered or unexported fields }
ProbeSideTupleFetcherV2 reads tuples from ProbeSideExec and send them to ProbeWorkers.
type ProbeV2 ¶
type ProbeV2 interface { // SetChunkForProbe will do some pre-work when start probing a chunk SetChunkForProbe(chunk *chunk.Chunk) error // SetRestoredChunkForProbe will do some pre-work for a chunk resoted from disk SetRestoredChunkForProbe(chunk *chunk.Chunk) error // Probe is to probe current chunk, the result chunk is set in result.chk, and Probe need to make sure result.chk.NumRows() <= result.chk.RequiredRows() Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, result *hashjoinWorkerResult) // IsCurrentChunkProbeDone returns true if current probe chunk is all probed IsCurrentChunkProbeDone() bool // SpillRemainingProbeChunks spills remaining probe chunks SpillRemainingProbeChunks() error // ScanRowTable is called after all the probe chunks are probed. It is used in some special joins, like left outer join with left side to build, after all // the probe side chunks are handled, it needs to scan the row table to return the un-matched rows ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (result *hashjoinWorkerResult) // IsScanRowTableDone returns true after scan row table is done IsScanRowTableDone() bool // NeedScanRowTable returns true if current join need to scan row table after all the probe side chunks are handled NeedScanRowTable() bool // InitForScanRowTable do some pre-work before ScanRowTable, it must be called before ScanRowTable InitForScanRowTable() // Return probe collsion GetProbeCollision() uint64 // Reset probe collsion ResetProbeCollision() // Reset some probe variables ResetProbe() }
ProbeV2 is the interface used to do probe in hash join v2
type ProbeWorkerV1 ¶
type ProbeWorkerV1 struct { HashJoinCtx *HashJoinCtxV1 ProbeKeyColIdx []int ProbeNAKeyColIdx []int // We build individual joiner for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. Joiner Joiner // contains filtered or unexported fields }
ProbeWorkerV1 is the probe side worker in hash join
type ProbeWorkerV2 ¶
type ProbeWorkerV2 struct { HashJoinCtx *HashJoinCtxV2 // We build individual joinProbe for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. JoinProbe ProbeV2 // contains filtered or unexported fields }
ProbeWorkerV2 is the probe worker used in hash join v2
type UpsertCb ¶
type UpsertCb func(exist bool, valueInMap, newValue *entry) *entry
UpsertCb : Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant
Source Files ¶
- base_join_probe.go
- concurrent_map.go
- hash_join_base.go
- hash_join_spill.go
- hash_join_spill_helper.go
- hash_join_test_util.go
- hash_join_v1.go
- hash_join_v2.go
- hash_table_v1.go
- hash_table_v2.go
- index_lookup_hash_join.go
- index_lookup_join.go
- index_lookup_merge_join.go
- inner_join_probe.go
- join_row_table.go
- join_table_meta.go
- joiner.go
- left_outer_semi_join_probe.go
- merge_join.go
- outer_join_probe.go
- row_table_builder.go
- tagged_ptr.go