join

package
v1.1.0-beta.0...-0c6681f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: Apache-2.0 Imports: 58 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OneInt64 mean the key contains only one Int64
	OneInt64 keyMode = iota
	// FixedSerializedKey mean the key has fixed length
	FixedSerializedKey
	// VariableSerializedKey mean the key has variable length
	VariableSerializedKey
)

Variables

View Source
var (

	// EnableHashJoinV2 enable hash join v2, used for test
	EnableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionOptimized
	// DisableHashJoinV2 disable hash join v2, used for test
	DisableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionLegacy
	// HashJoinV2Strings is used for test
	HashJoinV2Strings = []string{DisableHashJoinV2, EnableHashJoinV2}
)
View Source
var ShardCount = 320

ShardCount controls the shard maps within the concurrent map

Functions

func JoinerType

func JoinerType(j Joiner) logicalop.JoinType

JoinerType returns the join type of a Joiner.

func NewConcurrentMapHashTable

func NewConcurrentMapHashTable() *concurrentMapHashTable

NewConcurrentMapHashTable creates a concurrentMapHashTable

func NewJoinRuntimeStats

func NewJoinRuntimeStats() *joinRuntimeStats

NewJoinRuntimeStats returns a new joinRuntimeStats

Types

type BaseHashTable

type BaseHashTable interface {
	Put(hashKey uint64, rowPtr chunk.RowPtr)
	// e := Get(hashKey)
	// for ; e != nil; e = e.Next {
	//    rowPtr := e.Ptr
	//    ...
	// }
	Get(hashKey uint64) *entry
	Len() uint64
	// GetAndCleanMemoryDelta gets and cleans the memDelta of the BaseHashTable. Memory delta will be cleared after each fetch.
	// It indicates the memory delta of the BaseHashTable since the last calling GetAndCleanMemoryDelta().
	GetAndCleanMemoryDelta() int64
	Iter(func(uint64, *entry))
}

BaseHashTable is the interface of the hash table used in hash join

type BuildWorkerV1

type BuildWorkerV1 struct {
	HashJoinCtx      *HashJoinCtxV1
	BuildNAKeyColIdx []int
	// contains filtered or unexported fields
}

BuildWorkerV1 is the build side worker in hash join

func (*BuildWorkerV1) BuildHashTableForList

func (w *BuildWorkerV1) BuildHashTableForList(buildSideResultCh <-chan *chunk.Chunk) error

BuildHashTableForList builds hash table from `list`.

type BuildWorkerV2

type BuildWorkerV2 struct {
	HashJoinCtx    *HashJoinCtxV2
	BuildTypes     []*types.FieldType
	HasNullableKey bool
	WorkerID       uint
	// contains filtered or unexported fields
}

BuildWorkerV2 is the build worker used in hash join v2

func NewJoinBuildWorkerV2

func NewJoinBuildWorkerV2(ctx *HashJoinCtxV2, workID uint, buildSideExec exec.Executor, buildKeyColIdx []int, buildTypes []*types.FieldType) *BuildWorkerV2

NewJoinBuildWorkerV2 create a BuildWorkerV2

type HashContext

type HashContext struct {
	// AllTypes one-to-one correspondence with KeyColIdx
	AllTypes    []*types.FieldType
	KeyColIdx   []int
	NaKeyColIdx []int
	Buf         []byte
	HashVals    []hash.Hash64
	HasNull     []bool
	// contains filtered or unexported fields
}

HashContext keeps the needed hash context of a db table in hash join.

func (*HashContext) InitHash

func (hc *HashContext) InitHash(rows int)

InitHash init HashContext

type HashJoinCtxV1

type HashJoinCtxV1 struct {
	UseOuterToBuild bool
	IsOuterJoin     bool
	RowContainer    *hashRowContainer

	ProbeTypes  []*types.FieldType
	BuildTypes  []*types.FieldType
	OuterFilter expression.CNFExprs
	// contains filtered or unexported fields
}

HashJoinCtxV1 is the context used in hash join

type HashJoinCtxV2

type HashJoinCtxV2 struct {
	ProbeKeyTypes []*types.FieldType
	BuildKeyTypes []*types.FieldType

	RightAsBuildSide bool
	BuildFilter      expression.CNFExprs
	ProbeFilter      expression.CNFExprs
	OtherCondition   expression.CNFExprs

	LUsed, RUsed                                 []int
	LUsedInOtherCondition, RUsedInOtherCondition []int
	// contains filtered or unexported fields
}

HashJoinCtxV2 is the hash join ctx used in hash join v2

func (*HashJoinCtxV2) SetupPartitionInfo

func (hCtx *HashJoinCtxV2) SetupPartitionInfo()

SetupPartitionInfo set up partitionNumber and partitionMaskOffset based on concurrency

type HashJoinV1Exec

type HashJoinV1Exec struct {
	exec.BaseExecutor
	*HashJoinCtxV1

	ProbeSideTupleFetcher *ProbeSideTupleFetcherV1
	ProbeWorkers          []*ProbeWorkerV1
	BuildWorker           *BuildWorkerV1

	Prepared bool
	// contains filtered or unexported fields
}

HashJoinV1Exec implements the hash join algorithm.

func (*HashJoinV1Exec) Close

func (e *HashJoinV1Exec) Close() error

Close implements the Executor Close interface.

func (*HashJoinV1Exec) Next

func (e *HashJoinV1Exec) Next(ctx context.Context, req *chunk.Chunk) (err error)

Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.

func (*HashJoinV1Exec) Open

func (e *HashJoinV1Exec) Open(ctx context.Context) error

Open implements the Executor Open interface.

type HashJoinV2Exec

type HashJoinV2Exec struct {
	exec.BaseExecutor
	*HashJoinCtxV2

	ProbeSideTupleFetcher *ProbeSideTupleFetcherV2
	ProbeWorkers          []*ProbeWorkerV2
	BuildWorkers          []*BuildWorkerV2
	// contains filtered or unexported fields
}

HashJoinV2Exec implements the hash join algorithm.

func (*HashJoinV2Exec) Close

func (e *HashJoinV2Exec) Close() error

Close implements the Executor Close interface.

func (*HashJoinV2Exec) Next

func (e *HashJoinV2Exec) Next(ctx context.Context, req *chunk.Chunk) (err error)

Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.

func (*HashJoinV2Exec) Open

func (e *HashJoinV2Exec) Open(ctx context.Context) error

Open implements the Executor Open interface.

type IndexJoinExecutorBuilder

type IndexJoinExecutorBuilder interface {
	BuildExecutorForIndexJoin(ctx context.Context, lookUpContents []*IndexJoinLookUpContent,
		indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error)
}

IndexJoinExecutorBuilder is the interface used by index lookup join to build the executor, this interface is added to avoid cycle import

type IndexJoinLookUpContent

type IndexJoinLookUpContent struct {
	Keys []types.Datum
	Row  chunk.Row

	KeyColIDs []int64 // the original ID in its table, used by dynamic partition pruning
	// contains filtered or unexported fields
}

IndexJoinLookUpContent is the content used in index lookup join

type IndexLookUpJoin

type IndexLookUpJoin struct {
	exec.BaseExecutor

	WorkerWg *sync.WaitGroup

	OuterCtx OuterCtx
	InnerCtx InnerCtx

	JoinResult *chunk.Chunk

	Joiner      Joiner
	IsOuterJoin bool

	IndexRanges   ranger.MutableRanges
	KeyOff2IdxOff []int

	// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
	LastColHelper *plannercore.ColWithCmpFuncManager

	Finished *atomic.Value
	// contains filtered or unexported fields
}

IndexLookUpJoin employs one outer worker and N innerWorkers to execute concurrently. It preserves the order of the outer table and support batch lookup.

The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, builds inner row hash map. 3. main thread receives the task, waits for inner worker finish handling the task. 4. main thread join each outer row by look up the inner rows hash map in the task.

func (*IndexLookUpJoin) Close

func (e *IndexLookUpJoin) Close() error

Close implements the Executor interface.

func (*IndexLookUpJoin) Next

func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor interface.

func (*IndexLookUpJoin) Open

func (e *IndexLookUpJoin) Open(ctx context.Context) error

Open implements the Executor interface.

type IndexLookUpMergeJoin

type IndexLookUpMergeJoin struct {
	exec.BaseExecutor

	WorkerWg *sync.WaitGroup

	OuterMergeCtx OuterMergeCtx
	InnerMergeCtx InnerMergeCtx

	Joiners []Joiner

	IsOuterJoin bool

	IndexRanges   ranger.MutableRanges
	KeyOff2IdxOff []int

	// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
	LastColHelper *plannercore.ColWithCmpFuncManager
	// contains filtered or unexported fields
}

IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join It preserves the order of the outer table and support batch lookup.

The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join. 3. main thread receives the task and fetch results from the channel in task one by one. 4. If channel has been closed, main thread receives the next task.

func (*IndexLookUpMergeJoin) Close

func (e *IndexLookUpMergeJoin) Close() error

Close implements the Executor interface.

func (*IndexLookUpMergeJoin) Next

func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error

Next implements the Executor interface

func (*IndexLookUpMergeJoin) Open

Open implements the Executor interface

type IndexNestedLoopHashJoin

type IndexNestedLoopHashJoin struct {
	IndexLookUpJoin

	// We build individual joiner for each inner worker when using chunk-based
	// execution, to avoid the concurrency of joiner.chk and joiner.selected.
	Joiners        []Joiner
	KeepOuterOrder bool
	// contains filtered or unexported fields
}

IndexNestedLoopHashJoin employs one outer worker and N inner workers to execute concurrently. The output order is not promised.

The execution flow is very similar to IndexLookUpReader: 1. The outer worker reads N outer rows, builds a task and sends it to the inner worker channel. 2. The inner worker receives the tasks and does 3 things for every task:

  1. builds hash table from the outer rows
  2. builds key ranges from outer rows and fetches inner rows
  3. probes the hash table and sends the join result to the main thread channel. Note: step 1 and step 2 runs concurrently.

3. The main thread receives the join results.

func (*IndexNestedLoopHashJoin) Close

func (e *IndexNestedLoopHashJoin) Close() error

Close implements the IndexNestedLoopHashJoin Executor interface.

func (*IndexNestedLoopHashJoin) Next

Next implements the IndexNestedLoopHashJoin Executor interface.

func (*IndexNestedLoopHashJoin) Open

Open implements the IndexNestedLoopHashJoin Executor interface.

type InnerCtx

type InnerCtx struct {
	ReaderBuilder IndexJoinExecutorBuilder
	RowTypes      []*types.FieldType
	KeyCols       []int
	KeyColIDs     []int64 // the original ID in its table, used by dynamic partition pruning
	KeyCollators  []collate.Collator
	HashTypes     []*types.FieldType
	HashCols      []int
	HashCollators []collate.Collator
	ColLens       []int
	HasPrefixCol  bool
}

InnerCtx is the inner side ctx used in index lookup join

type InnerMergeCtx

type InnerMergeCtx struct {
	ReaderBuilder           IndexJoinExecutorBuilder
	RowTypes                []*types.FieldType
	JoinKeys                []*expression.Column
	KeyCols                 []int
	KeyCollators            []collate.Collator
	CompareFuncs            []expression.CompareFunc
	ColLens                 []int
	Desc                    bool
	KeyOff2KeyOffOrderByIdx []int
}

InnerMergeCtx is the inner side ctx of merge join

type IterCb

type IterCb func(key uint64, e *entry)

IterCb :Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type Joiner

type Joiner interface {
	// TryToMatchInners tries to join an outer row with a batch of inner rows. When
	// 'inners.Len != 0' but all the joined rows are filtered, the outer row is
	// considered unmatched. Otherwise, the outer row is matched and some joined
	// rows are appended to `chk`. The size of `chk` is limited to MaxChunkSize.
	// Note that when the outer row is considered unmatched, we need to differentiate
	// whether the join conditions return null or false, because that matters for
	// AntiSemiJoin/LeftOuterSemiJoin/AntiLeftOuterSemiJoin, by setting the return
	// value isNull; for other join types, isNull is always false.
	//
	// NOTE: Callers need to call this function multiple times to consume all
	// the inner rows for an outer row, and decide whether the outer row can be
	// matched with at lease one inner row.
	TryToMatchInners(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk, opt ...NAAJType) (matched bool, isNull bool, err error)

	// TryToMatchOuters tries to join a batch of outer rows with one inner row.
	// It's used when the join is an outer join and the hash table is built
	// using the outer side.
	TryToMatchOuters(outer chunk.Iterator, inner chunk.Row, chk *chunk.Chunk, outerRowStatus []outerRowStatusFlag) (_ []outerRowStatusFlag, err error)

	// OnMissMatch operates on the unmatched outer row according to the join
	// type. An outer row can be considered miss matched if:
	//   1. it can not pass the filter on the outer table side.
	//   2. there is no inner row with the same join key.
	//   3. all the joined rows can not pass the filter on the join result.
	//
	// On these conditions, the caller calls this function to handle the
	// unmatched outer rows according to the current join type:
	//   1. 'SemiJoin': ignores the unmatched outer row.
	//   2. 'AntiSemiJoin': appends the unmatched outer row to the result buffer.
	//   3. 'LeftOuterSemiJoin': concats the unmatched outer row with 0 and
	//      appends it to the result buffer.
	//   4. 'AntiLeftOuterSemiJoin': concats the unmatched outer row with 1 and
	//      appends it to the result buffer.
	//   5. 'LeftOuterJoin': concats the unmatched outer row with a row of NULLs
	//      and appends it to the result buffer.
	//   6. 'RightOuterJoin': concats the unmatched outer row with a row of NULLs
	//      and appends it to the result buffer.
	//   7. 'InnerJoin': ignores the unmatched outer row.
	//
	// Note that, for LeftOuterSemiJoin, AntiSemiJoin and AntiLeftOuterSemiJoin,
	// we need to know the reason of outer row being treated as unmatched:
	// whether the join condition returns false, or returns null, because
	// it decides if this outer row should be outputted, hence we have a `hasNull`
	// parameter passed to `OnMissMatch`.
	OnMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk)

	// Clone deep copies a Joiner.
	Clone() Joiner
	// contains filtered or unexported methods
}

Joiner is used to generate join results according to the join type. A typical instruction flow is:

hasMatch, HasNull := false, false
for innerIter.Current() != innerIter.End() {
    matched, isNull, err := j.TryToMatchInners(Outer, innerIter, chk)
    // handle err
    hasMatch = hasMatch || matched
    HasNull = HasNull || isNull
}
if !hasMatch {
    j.OnMissMatch(HasNull, Outer, chk)
}

NOTE: This interface is **not** thread-safe. TODO: unit test for all join type

  1. no filter, no inline projection
  2. no filter, inline projection
  3. no filter, inline projection to empty column
  4. filter, no inline projection
  5. filter, inline projection
  6. filter, inline projection to empty column

func NewJoiner

func NewJoiner(ctx sessionctx.Context, joinType logicalop.JoinType,
	outerIsRight bool, defaultInner []types.Datum, filter []expression.Expression,
	lhsColTypes, rhsColTypes []*types.FieldType, childrenUsed [][]int, isNA bool) Joiner

NewJoiner create a joiner

type MergeJoinExec

type MergeJoinExec struct {
	exec.BaseExecutor

	StmtCtx      *stmtctx.StatementContext
	CompareFuncs []expression.CompareFunc
	Joiner       Joiner
	IsOuterJoin  bool
	Desc         bool

	InnerTable *MergeJoinTable
	OuterTable *MergeJoinTable
	// contains filtered or unexported fields
}

MergeJoinExec implements the merge join algorithm. This operator assumes that two iterators of both sides will provide required order on join condition: 1. For equal-join, one of the join key from each side matches the order given. 2. For other cases its preferred not to use SMJ and operator will throw error.

func (*MergeJoinExec) Close

func (e *MergeJoinExec) Close() error

Close implements the Executor Close interface.

func (*MergeJoinExec) Next

func (e *MergeJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error)

Next implements the Executor Next interface. Note the inner group collects all identical keys in a group across multiple chunks, but the outer group just covers the identical keys within a chunk, so identical keys may cover more than one chunk.

func (*MergeJoinExec) Open

func (e *MergeJoinExec) Open(ctx context.Context) error

Open implements the Executor Open interface.

type MergeJoinTable

type MergeJoinTable struct {
	IsInner    bool
	ChildIndex int
	JoinKeys   []*expression.Column
	Filters    []expression.Expression
	// contains filtered or unexported fields
}

MergeJoinTable is used for merge join

type NAAJType

type NAAJType byte

NAAJType is join detail type only used by null-aware AntiLeftOuterSemiJoin.

const (
	// Unknown for those default value.
	Unknown NAAJType = 0
	// LeftHasNullRightNotNull means lhs is a null key, and rhs is not a null key.
	LeftHasNullRightNotNull NAAJType = 1
	// LeftHasNullRightHasNull means lhs is a null key, and rhs is a null key.
	LeftHasNullRightHasNull NAAJType = 2
	// LeftNotNullRightNotNull means lhs is in not a null key, and rhs is not a null key.
	LeftNotNullRightNotNull NAAJType = 3
	// LeftNotNullRightHasNull means lhs is in not a null key, and rhs is a null key.
	LeftNotNullRightHasNull NAAJType = 4
)

type NestedLoopApplyExec

type NestedLoopApplyExec struct {
	exec.BaseExecutor

	Sctx sessionctx.Context

	InnerExec   exec.Executor
	OuterExec   exec.Executor
	InnerFilter expression.CNFExprs
	OuterFilter expression.CNFExprs

	Joiner Joiner

	CanUseCache bool

	OuterSchema []*expression.CorrelatedColumn

	OuterChunk *chunk.Chunk

	InnerList  *chunk.List
	InnerChunk *chunk.Chunk

	Outer bool
	// contains filtered or unexported fields
}

NestedLoopApplyExec is the executor for apply.

func (*NestedLoopApplyExec) Close

func (e *NestedLoopApplyExec) Close() error

Close implements the Executor interface.

func (*NestedLoopApplyExec) Next

func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err error)

Next implements the Executor interface.

func (*NestedLoopApplyExec) Open

Open implements the Executor interface.

type OuterCtx

type OuterCtx struct {
	RowTypes  []*types.FieldType
	KeyCols   []int
	HashTypes []*types.FieldType
	HashCols  []int
	Filter    expression.CNFExprs
}

OuterCtx is the outer ctx used in index lookup join

type OuterMergeCtx

type OuterMergeCtx struct {
	RowTypes      []*types.FieldType
	JoinKeys      []*expression.Column
	KeyCols       []int
	Filter        expression.CNFExprs
	NeedOuterSort bool
	CompareFuncs  []expression.CompareFunc
}

OuterMergeCtx is the outer side ctx of merge join

type ProbeSideTupleFetcherV1

type ProbeSideTupleFetcherV1 struct {
	*HashJoinCtxV1
	// contains filtered or unexported fields
}

ProbeSideTupleFetcherV1 reads tuples from ProbeSideExec and send them to ProbeWorkers.

type ProbeSideTupleFetcherV2

type ProbeSideTupleFetcherV2 struct {
	*HashJoinCtxV2
	// contains filtered or unexported fields
}

ProbeSideTupleFetcherV2 reads tuples from ProbeSideExec and send them to ProbeWorkers.

type ProbeV2

type ProbeV2 interface {
	// SetChunkForProbe will do some pre-work when start probing a chunk
	SetChunkForProbe(chunk *chunk.Chunk) error
	// SetRestoredChunkForProbe will do some pre-work for a chunk resoted from disk
	SetRestoredChunkForProbe(chunk *chunk.Chunk) error
	// Probe is to probe current chunk, the result chunk is set in result.chk, and Probe need to make sure result.chk.NumRows() <= result.chk.RequiredRows()
	Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, result *hashjoinWorkerResult)
	// IsCurrentChunkProbeDone returns true if current probe chunk is all probed
	IsCurrentChunkProbeDone() bool
	// SpillRemainingProbeChunks spills remaining probe chunks
	SpillRemainingProbeChunks() error
	// ScanRowTable is called after all the probe chunks are probed. It is used in some special joins, like left outer join with left side to build, after all
	// the probe side chunks are handled, it needs to scan the row table to return the un-matched rows
	ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (result *hashjoinWorkerResult)
	// IsScanRowTableDone returns true after scan row table is done
	IsScanRowTableDone() bool
	// NeedScanRowTable returns true if current join need to scan row table after all the probe side chunks are handled
	NeedScanRowTable() bool
	// InitForScanRowTable do some pre-work before ScanRowTable, it must be called before ScanRowTable
	InitForScanRowTable()
	// Return probe collsion
	GetProbeCollision() uint64
	// Reset probe collsion
	ResetProbeCollision()
	// Reset some probe variables
	ResetProbe()
}

ProbeV2 is the interface used to do probe in hash join v2

func NewJoinProbe

func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType, keyIndex []int, joinedColumnTypes, probeKeyTypes []*types.FieldType, rightAsBuildSide bool) ProbeV2

NewJoinProbe create a join probe used for hash join v2

type ProbeWorkerV1

type ProbeWorkerV1 struct {
	HashJoinCtx      *HashJoinCtxV1
	ProbeKeyColIdx   []int
	ProbeNAKeyColIdx []int

	// We build individual joiner for each join worker when use chunk-based
	// execution, to avoid the concurrency of joiner.chk and joiner.selected.
	Joiner Joiner
	// contains filtered or unexported fields
}

ProbeWorkerV1 is the probe side worker in hash join

type ProbeWorkerV2

type ProbeWorkerV2 struct {
	HashJoinCtx *HashJoinCtxV2
	// We build individual joinProbe for each join worker when use chunk-based
	// execution, to avoid the concurrency of joiner.chk and joiner.selected.
	JoinProbe ProbeV2
	// contains filtered or unexported fields
}

ProbeWorkerV2 is the probe worker used in hash join v2

type UpsertCb

type UpsertCb func(exist bool, valueInMap, newValue *entry) *entry

UpsertCb : Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL