Documentation ¶
Index ¶
- Variables
- func MakeExpression(expr parser.TypedExpr, indexVarMap []int) distsqlrun.Expression
- func MergePlans(left, right *PhysicalPlan) (mergedPlan PhysicalPlan, leftRouters []ProcessorIdx, ...)
- type DistAggregationInfo
- type LeaseHolderChoosingPolicy
- type PhysicalPlan
- func (p *PhysicalPlan) AddFilter(expr parser.TypedExpr, indexVarMap []int)
- func (p *PhysicalPlan) AddLimit(count int64, offset int64, node roachpb.NodeID) error
- func (p *PhysicalPlan) AddNoGroupingStage(core distsqlrun.ProcessorCoreUnion, post distsqlrun.PostProcessSpec, ...)
- func (p *PhysicalPlan) AddProcessor(proc Processor) ProcessorIdx
- func (p *PhysicalPlan) AddProjection(columns []uint32)
- func (p *PhysicalPlan) AddRendering(exprs []parser.TypedExpr, indexVarMap []int, outTypes []sqlbase.ColumnType)
- func (p *PhysicalPlan) AddSingleGroupStage(nodeID roachpb.NodeID, core distsqlrun.ProcessorCoreUnion, ...)
- func (p *PhysicalPlan) GetLastStagePost() distsqlrun.PostProcessSpec
- func (p *PhysicalPlan) MergeResultStreams(resultRouters []ProcessorIdx, sourceRouterSlot int, ...)
- func (p *PhysicalPlan) PopulateEndpoints(nodeAddresses map[roachpb.NodeID]string)
- func (p *PhysicalPlan) SetLastStagePost(post distsqlrun.PostProcessSpec, outputTypes []sqlbase.ColumnType)
- func (p *PhysicalPlan) SetMergeOrdering(o distsqlrun.Ordering)
- type Processor
- type ProcessorIdx
- type SpanResolver
- type SpanResolverIterator
- type Stream
Constants ¶
This section is empty.
Variables ¶
var DistAggregationTable = map[distsqlrun.AggregatorSpec_Func]DistAggregationInfo{ distsqlrun.AggregatorSpec_IDENT: { LocalStage: distsqlrun.AggregatorSpec_IDENT, FinalStage: distsqlrun.AggregatorSpec_IDENT, }, distsqlrun.AggregatorSpec_BOOL_AND: { LocalStage: distsqlrun.AggregatorSpec_BOOL_AND, FinalStage: distsqlrun.AggregatorSpec_BOOL_AND, }, distsqlrun.AggregatorSpec_BOOL_OR: { LocalStage: distsqlrun.AggregatorSpec_BOOL_OR, FinalStage: distsqlrun.AggregatorSpec_BOOL_OR, }, distsqlrun.AggregatorSpec_COUNT: { LocalStage: distsqlrun.AggregatorSpec_COUNT, FinalStage: distsqlrun.AggregatorSpec_SUM_INT, }, distsqlrun.AggregatorSpec_MAX: { LocalStage: distsqlrun.AggregatorSpec_MAX, FinalStage: distsqlrun.AggregatorSpec_MAX, }, distsqlrun.AggregatorSpec_MIN: { LocalStage: distsqlrun.AggregatorSpec_MIN, FinalStage: distsqlrun.AggregatorSpec_MIN, }, distsqlrun.AggregatorSpec_SUM: { LocalStage: distsqlrun.AggregatorSpec_SUM, FinalStage: distsqlrun.AggregatorSpec_SUM, }, }
DistAggregationTable is DistAggregationInfo look-up table. Functions that don't have an entry in the table are not optimized with a local stage.
Functions ¶
func MakeExpression ¶
func MakeExpression(expr parser.TypedExpr, indexVarMap []int) distsqlrun.Expression
MakeExpression creates a distsqlrun.Expression.
The distsqlrun.Expression uses the placeholder syntax (@1, @2, @3..) to refer to columns.
The expr uses IndexedVars to refer to columns. The caller can optionally remap these columns by passing an indexVarMap: an IndexedVar with index i becomes column indexVarMap[i].
func MergePlans ¶
func MergePlans( left, right *PhysicalPlan, ) (mergedPlan PhysicalPlan, leftRouters []ProcessorIdx, rightRouters []ProcessorIdx)
MergePlans merges the processors and streams of two plan into a new plan. The result routers for each side are also returned (they point at processors in the merged plan).
Types ¶
type DistAggregationInfo ¶
type DistAggregationInfo struct { LocalStage distsqlrun.AggregatorSpec_Func FinalStage distsqlrun.AggregatorSpec_Func }
DistAggregationInfo describes how we can best compute a distributed aggregation. If we have multiple sources of data, we first compute a function on each group ("local stage") and then we aggregate those results (final "stage").
type LeaseHolderChoosingPolicy ¶
type LeaseHolderChoosingPolicy byte
LeaseHolderChoosingPolicy enumerates the implementors of leaseHolderOracle.
const ( // RandomLeaseHolderChoice chooses lease holders randomly. RandomLeaseHolderChoice LeaseHolderChoosingPolicy = iota // BinPackingLeaseHolderChoice bin-packs the choices. BinPackingLeaseHolderChoice )
type PhysicalPlan ¶
type PhysicalPlan struct { // Processors in the plan. Processors []Processor // Streams accumulates the streams in the plan - both local (intra-node) and // remote (inter-node); when we have a final plan, the streams are used to // generate processor input and output specs (see PopulateEndpoints). Streams []Stream // ResultRouters identifies the output routers which output the results of the // plan. These are the routers to which we have to connect new streams in // order to extend the plan. // // The processors which have this routers are all part of the same "stage": // they have the same "schema" and PostProcessSpec. // // We assume all processors have a single output so we only need the processor // index. ResultRouters []ProcessorIdx // ResultTypes is the schema (column types) of the rows produced by the // ResultRouters. // // This is aliased with InputSyncSpec.ColumnTypes, so it must not be modified // in-place during planning. ResultTypes []sqlbase.ColumnType // MergeOrdering is the ordering guarantee for the result streams that must be // maintained when the streams eventually merge. The column indexes refer to // columns for the rows produced by ResultRouters. // // Empty when there is a single result router. The reason is that maintaining // an ordering sometimes requires to add columns to streams for the sole // reason of correctly merging the streams later (see AddProjection); we don't // want to pay this cost if we don't have multiple streams to merge. MergeOrdering distsqlrun.Ordering }
PhysicalPlan represents a network of processors and streams along with information about the results output by this network. The results come from unconnected output routers of a subset of processors; all these routers output the same kind of data (same schema).
func (*PhysicalPlan) AddFilter ¶
func (p *PhysicalPlan) AddFilter(expr parser.TypedExpr, indexVarMap []int)
AddFilter adds a filter on the output of a plan. The filter is added either as a post-processing step to the last stage or to a new "no-op" stage, as necessary.
See MakeExpression for a description of indexVarMap.
func (*PhysicalPlan) AddLimit ¶
AddLimit adds a limit and/or offset to the results of the current plan. If there are multiple result streams, they are joined into a single processor that is placed on the given node.
For no limit, count should be MaxInt64.
func (*PhysicalPlan) AddNoGroupingStage ¶
func (p *PhysicalPlan) AddNoGroupingStage( core distsqlrun.ProcessorCoreUnion, post distsqlrun.PostProcessSpec, outputTypes []sqlbase.ColumnType, newOrdering distsqlrun.Ordering, )
AddNoGroupingStage adds a processor for each result router, on the same node with the source of the stream; all processors have the same core. This is for stages that correspond to logical blocks that don't require any grouping (e.g. evaluator, sorting, etc).
func (*PhysicalPlan) AddProcessor ¶
func (p *PhysicalPlan) AddProcessor(proc Processor) ProcessorIdx
AddProcessor adds a processor to a PhysicalPlan and returns the index that can be used to refer to that processor.
func (*PhysicalPlan) AddProjection ¶
func (p *PhysicalPlan) AddProjection(columns []uint32)
AddProjection applies a projection to a plan. The new plan outputs the columns of the old plan as listed in the slice. The Ordering is updated; columns in the ordering are added to the projection as needed.
Note: the columns slice is relinquished to this function, which can modify it or use it directly in specs.
func (*PhysicalPlan) AddRendering ¶
func (p *PhysicalPlan) AddRendering( exprs []parser.TypedExpr, indexVarMap []int, outTypes []sqlbase.ColumnType, )
AddRendering adds a rendering (expression evaluation) to the output of a plan. The rendering is achieved either through an adjustment on the last stage post-process spec, or via a new stage.
The Ordering is updated; columns in the ordering are added to the render expressions as necessary.
See MakeExpression for a description of indexVarMap.
func (*PhysicalPlan) AddSingleGroupStage ¶
func (p *PhysicalPlan) AddSingleGroupStage( nodeID roachpb.NodeID, core distsqlrun.ProcessorCoreUnion, post distsqlrun.PostProcessSpec, outputTypes []sqlbase.ColumnType, )
AddSingleGroupStage adds a "single group" stage (one that cannot be parallelized) which consists of a single processor on the specified node. The previous stage (ResultRouters) are all connected to this processor.
func (*PhysicalPlan) GetLastStagePost ¶
func (p *PhysicalPlan) GetLastStagePost() distsqlrun.PostProcessSpec
GetLastStagePost returns the PostProcessSpec for the processors in the last stage (ResultRouters).
func (*PhysicalPlan) MergeResultStreams ¶
func (p *PhysicalPlan) MergeResultStreams( resultRouters []ProcessorIdx, sourceRouterSlot int, ordering distsqlrun.Ordering, destProcessor ProcessorIdx, destInput int, )
MergeResultStreams connects a set of resultRouters to a synchronizer. The synchronizer is configured with the provided ordering.
func (*PhysicalPlan) PopulateEndpoints ¶
func (p *PhysicalPlan) PopulateEndpoints(nodeAddresses map[roachpb.NodeID]string)
PopulateEndpoints processes p.Streams and adds the corresponding StreamEndpointSpecs to the processors' input and output specs. This should be used when the plan is completed and ready to be executed.
The nodeAddresses map contains the address of all the nodes referenced in the plan.
func (*PhysicalPlan) SetLastStagePost ¶
func (p *PhysicalPlan) SetLastStagePost( post distsqlrun.PostProcessSpec, outputTypes []sqlbase.ColumnType, )
SetLastStagePost changes the PostProcess spec of the processors in the last stage (ResultRouters). The caller must update the ordering via SetOrdering.
func (*PhysicalPlan) SetMergeOrdering ¶
func (p *PhysicalPlan) SetMergeOrdering(o distsqlrun.Ordering)
SetMergeOrdering sets p.MergeOrdering.
type Processor ¶
type Processor struct { // Node where the processor must be instantiated. Node roachpb.NodeID // Spec for the processor; note that the StreamEndpointSpecs in the input // synchronizers and output routers are not set until the end of the planning // process. Spec distsqlrun.ProcessorSpec }
Processor contains the information associated with a processor in a plan.
type ProcessorIdx ¶
type ProcessorIdx int
ProcessorIdx identifies a processor by its index in PhysicalPlan.Processors.
type SpanResolver ¶
type SpanResolver interface { // NewSpanResolverIterator creates a new SpanResolverIterator. // The txn is only used by the "fake" implementation (used for testing). NewSpanResolverIterator(txn *client.Txn) SpanResolverIterator }
SpanResolver resolves key spans to their respective ranges and lease holders. Used for planning physical execution of distributed SQL queries.
Sample usage for resolving a bunch of spans:
func resolveSpans(
ctx context.Context, it *distsql.SpanResolverIterator, spans ...spanWithDir,
) ([][]kv.ReplicaInfo, error) { lr := distsql.NewSpanResolver( distSender, gossip, nodeDescriptor, distsql.BinPackingLeaseHolderChoice) it := lr.NewSpanResolverIterator(nil) res := make([][]kv.ReplicaInfo, 0) for _, span := range spans { repls := make([]kv.ReplicaInfo, 0) for it.Seek(ctx, span.Span, span.dir); ; it.Next(ctx) { if !it.Valid() { return nil, it.Error() } repl, err := it.ReplicaInfo(ctx) if err != nil { return nil, err } repls = append(repls, repl) if !it.NeedAnother() { break } } res = append(res, repls) } return res, nil }
func NewFakeSpanResolver ¶
func NewFakeSpanResolver(nodes []*roachpb.NodeDescriptor) SpanResolver
NewFakeSpanResolver creates a fake span resolver.
func NewSpanResolver ¶
func NewSpanResolver( distSender *kv.DistSender, gossip *gossip.Gossip, nodeDesc roachpb.NodeDescriptor, choosingPolicy LeaseHolderChoosingPolicy, ) SpanResolver
NewSpanResolver creates a new spanResolver.
type SpanResolverIterator ¶
type SpanResolverIterator interface { // Seek positions the iterator on the start of a span (span.Key or // span.EndKey, depending on ScanDir). Note that span.EndKey is exclusive, // regardless of scanDir. // // After calling this, ReplicaInfo() will return information about the range // containing the start key of the span (or the end key, if the direction is // Descending). // // NeedAnother() will return true until the iterator is positioned on or after // the end of the span. Possible errors encountered should be checked for // with Valid(). // // Seek can be called repeatedly on the same iterator. To make optimal uses of // caches, Seek()s should be performed on spans sorted according to the // scanDir (if Descending, then the span with the highest keys should be // Seek()ed first). // // scanDir changes the direction in which Next() will advance the iterator. Seek(ctx context.Context, span roachpb.Span, scanDir kv.ScanDirection) // NeedAnother returns true if the current range is not the last for the span // that was last Seek()ed. NeedAnother() bool // Next advances the iterator to the next range. // Possible errors encountered should be checked for with Valid(). Next(ctx context.Context) // Valid returns false if an error was encountered by the last Seek() or Next(). Valid() bool // Error returns any error encountered by the last Seek() or Next(). Error() error // Desc returns the current RangeDescriptor. Desc() roachpb.RangeDescriptor // ReplicaInfo returns information about the replica that has been picked for // the current range. // A RangeUnavailableError is returned if there's no information in gossip // about any of the replicas. ReplicaInfo(ctx context.Context) (kv.ReplicaInfo, error) }
SpanResolverIterator is used to iterate over the ranges composing a key span.
type Stream ¶
type Stream struct { // SourceProcessor index (within the same plan). SourceProcessor ProcessorIdx // SourceRouterSlot identifies the position of this stream among the streams // that originate from the same router. This is important when routing by hash // where the order of the streams in the OutputRouterSpec matters. SourceRouterSlot int // DestProcessor index (within the same plan). DestProcessor ProcessorIdx // DestInput identifies the input of DestProcessor (some processors have // multiple inputs). DestInput int }
Stream connects the output router of one processor to an input synchronizer of another processor.