Documentation ¶
Overview ¶
Package exec contains execution tasks to run each of the separate tasks (Source, Project, Where, Having, etc) of a SQL data of tasks. It does by defining interface, and base tasks, and a single-machine channel oriented DAG runner (Executor). The Executor interface allows other downstreams to implement a multi-node message passing oriented version while re-using most Tasks.
Index ¶
- Constants
- Variables
- func DisableRecover()
- func RegisterSqlDriver()
- type AggFunc
- type AggPartial
- type Aggregator
- type Alter
- type Command
- type Create
- type DeletionScanner
- type DeletionTask
- type Drop
- type ErrChan
- type Executor
- type ExecutorSource
- type GroupBy
- type GroupByFinal
- type JobExecutor
- func (m *JobExecutor) Close() error
- func (m *JobExecutor) DrainChan() MessageChan
- func (m *JobExecutor) NewTask(p plan.Task) Task
- func (m *JobExecutor) Run() error
- func (m *JobExecutor) Setup() error
- func (m *JobExecutor) WalkAlter(p *plan.Alter) (Task, error)
- func (m *JobExecutor) WalkChildren(p plan.Task, root Task) error
- func (m *JobExecutor) WalkCommand(p *plan.Command) (Task, error)
- func (m *JobExecutor) WalkCreate(p *plan.Create) (Task, error)
- func (m *JobExecutor) WalkDelete(p *plan.Delete) (Task, error)
- func (m *JobExecutor) WalkDrop(p *plan.Drop) (Task, error)
- func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error)
- func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error)
- func (m *JobExecutor) WalkInsert(p *plan.Insert) (Task, error)
- func (m *JobExecutor) WalkJoin(p *plan.JoinMerge) (Task, error)
- func (m *JobExecutor) WalkJoinKey(p *plan.JoinKey) (Task, error)
- func (m *JobExecutor) WalkOrder(p *plan.Order) (Task, error)
- func (m *JobExecutor) WalkPlan(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)
- func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error)
- func (m *JobExecutor) WalkSelect(p *plan.Select) (Task, error)
- func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error)
- func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error)
- func (m *JobExecutor) WalkUpdate(p *plan.Update) (Task, error)
- func (m *JobExecutor) WalkUpsert(p *plan.Upsert) (Task, error)
- func (m *JobExecutor) WalkWhere(p *plan.Where) (Task, error)
- type JobMaker
- type JobRunner
- type JoinKey
- type JoinMerge
- type KeyEvaluator
- type MessageChan
- type MessageHandler
- type Order
- type OrderMessages
- type Projection
- type RequiresContext
- type ResultBuffer
- type ResultExecWriter
- type ResultWriter
- type SigChan
- type Source
- type Task
- type TaskBase
- func (m *TaskBase) Add(task Task) error
- func (m *TaskBase) AddPlan(task plan.Task) error
- func (m *TaskBase) Children() []Task
- func (m *TaskBase) Close() error
- func (m *TaskBase) CloseFinal() error
- func (m *TaskBase) ErrChan() ErrChan
- func (m *TaskBase) MessageIn() MessageChan
- func (m *TaskBase) MessageInSet(ch MessageChan)
- func (m *TaskBase) MessageOut() MessageChan
- func (m *TaskBase) MessageOutSet(ch MessageChan)
- func (m *TaskBase) Quit()
- func (m *TaskBase) Run() error
- func (m *TaskBase) Setup(depth int) error
- func (m *TaskBase) SigChan() SigChan
- type TaskParallel
- type TaskPrinter
- type TaskRunner
- type TaskSequential
- type TaskStepper
- type Upsert
- type Where
Constants ¶
const (
// ItemDefaultChannelSize default channel buffer for task's
ItemDefaultChannelSize = 50
)
const (
MaxAllowedPacket = 1024 * 1024
)
const (
MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)
Variables ¶
var ( // ErrShuttingDown already shutting down error ErrShuttingDown = fmt.Errorf("Received Shutdown Signal") // ErrNotSupported statement not supported ErrNotSupported = fmt.Errorf("QLBridge: Not supported") // ErrNotImplemented exec not impelemented for statement ErrNotImplemented = fmt.Errorf("QLBridge: Not implemented") // ErrUnknownCommand unknown command error. ErrUnknownCommand = fmt.Errorf("QLBridge: Unknown Command") // ErrInternalError internal error ErrInternalError = fmt.Errorf("QLBridge: Internal Error") // ErrNoSchemaSelected no schema was selected when performing statement. ErrNoSchemaSelected = fmt.Errorf("No Schema Selected") )
Functions ¶
func DisableRecover ¶
func DisableRecover()
func RegisterSqlDriver ¶
func RegisterSqlDriver()
Types ¶
type AggPartial ¶
AggPartial is a struct to represent the partial aggregation that will be reduced on finalizer. IE, for consistent-hash based group-bys calculated across multiple nodes this holds info that needs to be further calculated it only represents this hash.
type Aggregator ¶
type Aggregator interface { Do(v value.Value) Result() interface{} Reset() Merge(*AggPartial) }
func NewCount ¶
func NewCount(col *rel.Column) Aggregator
func NewGroupByValue ¶
func NewGroupByValue(col *rel.Column) Aggregator
type Alter ¶
type Alter struct { *TaskBase // contains filtered or unexported fields }
Alter is executeable task for SQL ALTER.
type Command ¶
type Command struct { *TaskBase // contains filtered or unexported fields }
Command is executeable task for SET SQL commands
func NewCommand ¶
NewCommand creates new command exec task
type Create ¶
type Create struct { *TaskBase // contains filtered or unexported fields }
Create is executeable task for SQL Create, Alter, Schema, Source etc.
type DeletionScanner ¶
type DeletionScanner struct {
*DeletionTask
}
Delete scanner if we don't have a seek operation on this source
func (*DeletionScanner) Run ¶
func (m *DeletionScanner) Run() error
type DeletionTask ¶
type DeletionTask struct { *TaskBase // contains filtered or unexported fields }
Delete task for sources that natively support delete
func NewDelete ¶
func NewDelete(ctx *plan.Context, p *plan.Delete) *DeletionTask
An inserter to write to data source
func (*DeletionTask) Close ¶
func (m *DeletionTask) Close() error
func (*DeletionTask) Run ¶
func (m *DeletionTask) Run() error
type Drop ¶
type Drop struct { *TaskBase // contains filtered or unexported fields }
Drop is executeable task for SQL DROP.
type Executor ¶
type Executor interface { NewTask(p plan.Task) Task WalkPlan(p plan.Task) (Task, error) // DML Statements WalkSelect(p *plan.Select) (Task, error) WalkInsert(p *plan.Insert) (Task, error) WalkUpsert(p *plan.Upsert) (Task, error) WalkUpdate(p *plan.Update) (Task, error) WalkDelete(p *plan.Delete) (Task, error) // DML Child Tasks WalkSource(p *plan.Source) (Task, error) WalkJoin(p *plan.JoinMerge) (Task, error) WalkJoinKey(p *plan.JoinKey) (Task, error) WalkWhere(p *plan.Where) (Task, error) WalkHaving(p *plan.Having) (Task, error) WalkGroupBy(p *plan.GroupBy) (Task, error) WalkOrder(p *plan.Order) (Task, error) WalkProjection(p *plan.Projection) (Task, error) // Other Statements WalkCommand(p *plan.Command) (Task, error) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error) // DDL Tasks WalkCreate(p *plan.Create) (Task, error) WalkDrop(p *plan.Drop) (Task, error) WalkAlter(p *plan.Alter) (Task, error) }
Executor defines standard Walk() pattern to create a executeable task dag from a plan dag
An implementation of WalkPlan() will be be able to execute/run a Statement
- inproc: ie, in process. qlbridge/exec package implements a non-distributed query-planner
- distributed: ie, run this job across multiple servers dataux/planner implements a distributed query-planner that distributes/runs tasks across multiple nodes
type ExecutorSource ¶
type ExecutorSource interface { // WalkExecSource given our plan, turn that into a Task. WalkExecSource(p *plan.Source) (Task, error) }
ExecutorSource Sources can often do their own execution-plan for sub-select statements ie mysql can do its own (select, projection) mongo, es can as well - provide interface to allow passing down select planning to source
type GroupBy ¶
type GroupBy struct { *TaskBase // contains filtered or unexported fields }
Group by a Sql Group By task which creates a hashable key from row commposed of key = {each,value,of,column,in,groupby}
A very stupid naive parallel groupby holds values in memory. This is a toy implementation that is only useful for small cardinality group-bys, small number of rows.
type GroupByFinal ¶
type GroupByFinal struct { *TaskBase // contains filtered or unexported fields }
GroupByFinal a Sql Group By Operator finalizer for partials. IE, if group by is a distributed task, then this is the reducer for sub-tasks.
func NewGroupByFinal ¶
func NewGroupByFinal(ctx *plan.Context, p *plan.GroupBy) *GroupByFinal
NewGroupByFinal creates the group-by-finalizer task.
func (*GroupByFinal) Close ¶
func (m *GroupByFinal) Close() error
Close the task and cleanup. Trys to wait for the downstream reducer tasks to complete after flushing messages.
func (*GroupByFinal) Run ¶
func (m *GroupByFinal) Run() error
Run group-by-final Runs standard task interface.
type JobExecutor ¶
type JobExecutor struct { Planner plan.Planner Executor Executor RootTask TaskRunner Ctx *plan.Context // contains filtered or unexported fields }
JobExecutor translates a Sql Statement into a Execution DAG of tasks using the Planner, Executor supplied. This package implements default executor and uses the default Planner from plan. This will create a single node dag of Tasks.
func BuildSqlJob ¶
func BuildSqlJob(ctx *plan.Context) (*JobExecutor, error)
BuildSqlJob given a plan context (query statement, +context) create a JobExecutor and error if we can't.
func NewExecutor ¶
func NewExecutor(ctx *plan.Context, planner plan.Planner) *JobExecutor
NewExecutor creates a new Job Executor.
func (*JobExecutor) DrainChan ¶
func (m *JobExecutor) DrainChan() MessageChan
The drain is the last out channel, on last task
func (*JobExecutor) NewTask ¶
func (m *JobExecutor) NewTask(p plan.Task) Task
NewTask create new task (from current context).
func (*JobExecutor) WalkAlter ¶
func (m *JobExecutor) WalkAlter(p *plan.Alter) (Task, error)
WalkAlter walks the Alter plan.
func (*JobExecutor) WalkChildren ¶
func (m *JobExecutor) WalkChildren(p plan.Task, root Task) error
WalkChildren walk dag of plan tasks creating execution tasks
func (*JobExecutor) WalkCommand ¶
func (m *JobExecutor) WalkCommand(p *plan.Command) (Task, error)
WalkCommand walk Commands such as SET.
func (*JobExecutor) WalkCreate ¶
func (m *JobExecutor) WalkCreate(p *plan.Create) (Task, error)
WalkCreate walks the Create plan.
func (*JobExecutor) WalkDelete ¶
func (m *JobExecutor) WalkDelete(p *plan.Delete) (Task, error)
func (*JobExecutor) WalkDrop ¶
func (m *JobExecutor) WalkDrop(p *plan.Drop) (Task, error)
WalkDrop walks the Drop plan.
func (*JobExecutor) WalkGroupBy ¶
func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error)
func (*JobExecutor) WalkHaving ¶
func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error)
func (*JobExecutor) WalkInsert ¶
func (m *JobExecutor) WalkInsert(p *plan.Insert) (Task, error)
func (*JobExecutor) WalkJoinKey ¶
func (m *JobExecutor) WalkJoinKey(p *plan.JoinKey) (Task, error)
func (*JobExecutor) WalkPlan ¶
func (m *JobExecutor) WalkPlan(p plan.Task) (Task, error)
WalkPlan Main Entry point to take a Plan, and convert into Execution DAG
func (*JobExecutor) WalkPlanAll ¶
func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error)
func (*JobExecutor) WalkPlanTask ¶
func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error)
func (*JobExecutor) WalkPreparedStatement ¶
func (m *JobExecutor) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)
WalkPreparedStatement not implemented
func (*JobExecutor) WalkProjection ¶
func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error)
func (*JobExecutor) WalkSelect ¶
func (m *JobExecutor) WalkSelect(p *plan.Select) (Task, error)
WalkSelect create dag of plan Select.
func (*JobExecutor) WalkSource ¶
func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error)
func (*JobExecutor) WalkSourceExec ¶
func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error)
func (*JobExecutor) WalkUpdate ¶
func (m *JobExecutor) WalkUpdate(p *plan.Update) (Task, error)
func (*JobExecutor) WalkUpsert ¶
func (m *JobExecutor) WalkUpsert(p *plan.Upsert) (Task, error)
type JoinKey ¶
type JoinKey struct { *TaskBase // contains filtered or unexported fields }
Evaluate messages to create JoinKey based message, where the
Join Key (composite of each value in join expr) hashes consistently
func NewJoinKey ¶
A JoinKey task that evaluates the compound JoinKey to allow
for parallelized join's source1 -> JoinKey -> hash-route \ -- join --> / source2 -> JoinKey -> hash-route
type JoinMerge ¶
type JoinMerge struct { *TaskBase // contains filtered or unexported fields }
Scans 2 source tasks for rows, evaluate keys, use for join
func NewJoinNaiveMerge ¶
A very stupid naive parallel join merge, uses Key() as value to merge
two different input channels source1 -> \ -- join --> / source2 ->
Distributed:
source1a -> |-> -- join --> source1b -> key-hash-route |-> -- join --> reduce -> source1n -> |-> -- join --> |-> -- join --> source2a -> |-> -- join --> source2b -> key-hash-route |-> -- join --> source2n -> |-> -- join -->
type MessageHandler ¶
MessageHandler Handle/Forward a message for this Task
func MakeHandler ¶
func MakeHandler(task TaskRunner) MessageHandler
type Order ¶
type Order struct { *TaskBase // contains filtered or unexported fields }
Order
type OrderMessages ¶
type OrderMessages struct {
// contains filtered or unexported fields
}
func NewOrderMessages ¶
func NewOrderMessages(p *plan.Order) *OrderMessages
func (*OrderMessages) Len ¶
func (m *OrderMessages) Len() int
func (*OrderMessages) Less ¶
func (m *OrderMessages) Less(i, j int) bool
func (*OrderMessages) Swap ¶
func (m *OrderMessages) Swap(i, j int)
type Projection ¶
type Projection struct { *TaskBase // contains filtered or unexported fields }
Projection Execution Task
func NewProjection ¶
func NewProjection(ctx *plan.Context, p *plan.Projection) *Projection
In Process projections are used when mapping multiple sources together and additional columns such as those used in Where, GroupBy etc are used even if they will not be used in Final projection
func NewProjectionFinal ¶
func NewProjectionFinal(ctx *plan.Context, p *plan.Projection) *Projection
Final Projections project final select columns for result-writing
func NewProjectionInProcess ¶
func NewProjectionInProcess(ctx *plan.Context, p *plan.Projection) *Projection
In Process projections are used when mapping multiple sources together
and additional columns such as those used in Where, GroupBy etc are used even if they will not be used in Final projection
func NewProjectionLimit ¶
func NewProjectionLimit(ctx *plan.Context, p *plan.Projection) *Projection
NewProjectionLimit Only provides counting/limit projection
func (*Projection) CloseFinal ¶
func (m *Projection) CloseFinal() error
CloseFinal after exit, cleanup some more
type RequiresContext ¶
RequiresContext defines a Source which requires context.
type ResultBuffer ¶
type ResultBuffer struct { *TaskBase // contains filtered or unexported fields }
ResultBuffer for writing tasks results
func NewResultBuffer ¶
func NewResultBuffer(ctx *plan.Context, writeTo *[]schema.Message) *ResultBuffer
NewResultBuffer create a result buffer to write temp tasks into results.
type ResultExecWriter ¶
type ResultExecWriter struct { *TaskBase // contains filtered or unexported fields }
ResultExecWriter for writing tasks results
func NewResultExecWriter ¶
func NewResultExecWriter(ctx *plan.Context) *ResultExecWriter
NewResultExecWriter a result writer for exect task
func (*ResultExecWriter) Result ¶
func (m *ResultExecWriter) Result() driver.Result
Result of exec task
type ResultWriter ¶
type ResultWriter struct { *TaskBase // contains filtered or unexported fields }
ResultWriter for writing tasks results
func NewResultRows ¶
func NewResultRows(ctx *plan.Context, cols []string) *ResultWriter
NewResultRows a resultwriter
func NewResultWriter ¶
func NewResultWriter(ctx *plan.Context) *ResultWriter
NewResultWriter for a plan
func (*ResultWriter) Columns ¶
func (m *ResultWriter) Columns() []string
Columns list of column names
func (*ResultWriter) Next ¶
func (m *ResultWriter) Next(dest []driver.Value) error
Next his is implementation of the sql/driver Rows() Next() interface
func (*ResultWriter) Run ¶
func (m *ResultWriter) Run() error
Run For ResultWriter, since we are are not paging through messages using this mesage channel, instead using Next() as defined by sql/driver we don't read the input channel, just watch stop channels
type Source ¶
type Source struct { *TaskBase Scanner schema.ConnScanner ExecSource ExecutorSource JoinKey KeyEvaluator // contains filtered or unexported fields }
Source defines a datasource execution task. It will Scan a data source for rows to feed into exec dag of tasks. The source scanner uses iter.Next() messages. The source may optionally allow Predicate PushDown, that is use the SQL select/where to filter rows so its not a real table scan. This interface is called ExecutorSource.
Examples of Sources:
- table -- FROM table
- channels -- FROM stream
- join -- SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name;
- sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;
func NewSourceScanner ¶
NewSourceScanner A scanner to read from sub-query data source (join, sub-query, static)
type Task ¶
type Task interface { Run() error Close() error CloseFinal() error Children() []Task // children sub-tasks Add(Task) error // Add a child to this dag }
Task exec Tasks are inherently DAG's of task's implementing Run(), Close() etc to allow them to be executeable
type TaskBase ¶
type TaskBase struct { sync.Mutex Ctx *plan.Context Name string Handler MessageHandler // contains filtered or unexported fields }
TaskBase Base executeable task that implements Task interface, embedded into other channel based task runners
func NewTaskBase ¶
func (*TaskBase) CloseFinal ¶
func (*TaskBase) MessageIn ¶
func (m *TaskBase) MessageIn() MessageChan
func (*TaskBase) MessageInSet ¶
func (m *TaskBase) MessageInSet(ch MessageChan)
func (*TaskBase) MessageOut ¶
func (m *TaskBase) MessageOut() MessageChan
func (*TaskBase) MessageOutSet ¶
func (m *TaskBase) MessageOutSet(ch MessageChan)
type TaskParallel ¶
type TaskParallel struct { *TaskBase // contains filtered or unexported fields }
A parallel set of tasks, this starts each child task and offers up
an output channel that is a merger of each child --> \ --> - -> --> /
func NewTaskParallel ¶
func NewTaskParallel(ctx *plan.Context) *TaskParallel
func (*TaskParallel) Add ¶
func (m *TaskParallel) Add(task Task) error
func (*TaskParallel) Children ¶
func (m *TaskParallel) Children() []Task
func (*TaskParallel) Close ¶
func (m *TaskParallel) Close() error
func (*TaskParallel) PrintDag ¶
func (m *TaskParallel) PrintDag(depth int)
func (*TaskParallel) Run ¶
func (m *TaskParallel) Run() error
func (*TaskParallel) Setup ¶
func (m *TaskParallel) Setup(depth int) error
type TaskPrinter ¶
type TaskPrinter interface {
PrintDag(depth int)
}
TaskPrinter a debug printer for dag-shape.
type TaskRunner ¶
type TaskRunner interface { Task Setup(depth int) error MessageIn() MessageChan MessageOut() MessageChan MessageInSet(MessageChan) MessageOutSet(MessageChan) ErrChan() ErrChan SigChan() SigChan Quit() }
TaskRunner is an interface for a single task in Dag of Tasks necessary to execute a Job - it may have children tasks - it may be parallel, distributed, etc
type TaskSequential ¶
type TaskSequential struct { *TaskBase // contains filtered or unexported fields }
func NewTaskSequential ¶
func NewTaskSequential(ctx *plan.Context) *TaskSequential
func (*TaskSequential) Add ¶
func (m *TaskSequential) Add(task Task) error
func (*TaskSequential) Children ¶
func (m *TaskSequential) Children() []Task
func (*TaskSequential) Close ¶
func (m *TaskSequential) Close() error
func (*TaskSequential) PrintDag ¶
func (m *TaskSequential) PrintDag(depth int)
func (*TaskSequential) Run ¶
func (m *TaskSequential) Run() (err error)
func (*TaskSequential) Setup ¶
func (m *TaskSequential) Setup(depth int) error
type TaskStepper ¶
type TaskStepper struct {
*TaskBase
}
On Task stepper we don't Run it, rather use a
Next() explicit call from end user
func NewTaskStepper ¶
func NewTaskStepper(ctx *plan.Context) *TaskStepper
func (*TaskStepper) Run ¶
func (m *TaskStepper) Run() error
type Upsert ¶
type Upsert struct { *TaskBase // contains filtered or unexported fields }
Upsert task for insert, update, upsert
type Where ¶
type Where struct { *TaskBase // contains filtered or unexported fields }
Where execution of A filter to implement where clause
func NewWhere ¶
NewWhere create new Where Clause
filters vs final differ bc the Final does final column aliasing
func NewWhereFilter ¶
NewWhereFilter filters vs final differ bc the Final does final column aliasing