exec

package
v0.0.0-...-c484601 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2015 License: MIT Imports: 15 Imported by: 0

README

Runtime

Execution consists of a DAG of Tasks called a Job

  • ExecMaster each DAG has a single master of that job
  • Planner creates a dag of tasks
  • TaskRunner runs a single node of a set of tasks, communicates between child tasks
  • Datasource supplies data to a task

Coercion

| Go Types | Value types |

| int(8,16,32,64) | IntValue | | float(32,64) | NumberValue | | string | StringValue | | []string | StringsValue | | boolean | BoolValue | | map[string]int | MapStringIntValue |

From | ToInt | ToString | ToBool | ToNumber | MapInt | MapString

| int(8,16,32,64) | y | y | y | y | N | uint(8,16,32,64) | y | y | y | y | N

Documentation

Index

Constants

View Source
const (
	ItemDefaultChannelSize = 50
)
View Source
const (
	MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)

Variables

View Source
var (
	ShuttingDownError = fmt.Errorf("Received Shutdown Signal")
)

Functions

func RegisterSqlDriver

func RegisterSqlDriver()

Types

type DeletionScanner

type DeletionScanner struct {
	*DeletionTask
}

func (*DeletionScanner) Run

func (m *DeletionScanner) Run(context *expr.Context) error

type DeletionTask

type DeletionTask struct {
	*TaskBase
	// contains filtered or unexported fields
}

Delete task

func NewDelete

func NewDelete(sql *expr.SqlDelete, db datasource.Deletion) *DeletionTask

An inserter to write to data source

func (*DeletionTask) Close

func (m *DeletionTask) Close() error

func (*DeletionTask) Copy

func (m *DeletionTask) Copy() *DeletionTask

func (*DeletionTask) Run

func (m *DeletionTask) Run(context *expr.Context) error

type ErrChan

type ErrChan chan error

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

This is a simple, single source Job Executor

we can create smarter ones but this is a basic implementation for

/ running in-process, not distributed

func NewJobBuilder

func NewJobBuilder(schema *datasource.RuntimeSchema, connInfo string) *JobBuilder

JobBuilder

@schema   = the config/runtime schema info
@connInfo = connection string info for original connection

func (*JobBuilder) VisitCommand

func (m *JobBuilder) VisitCommand(stmt *expr.SqlCommand) (expr.Task, error)

func (*JobBuilder) VisitDelete

func (m *JobBuilder) VisitDelete(stmt *expr.SqlDelete) (expr.Task, error)

func (*JobBuilder) VisitDescribe

func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (expr.Task, error)

func (*JobBuilder) VisitInsert

func (m *JobBuilder) VisitInsert(stmt *expr.SqlInsert) (expr.Task, error)

func (*JobBuilder) VisitJoin

func (m *JobBuilder) VisitJoin(from *expr.SqlSource) (expr.Task, error)

func (*JobBuilder) VisitPreparedStmt

func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (expr.Task, error)

func (*JobBuilder) VisitSelect

func (m *JobBuilder) VisitSelect(stmt *expr.SqlSelect) (expr.Task, error)

func (*JobBuilder) VisitShow

func (m *JobBuilder) VisitShow(stmt *expr.SqlShow) (expr.Task, error)

func (*JobBuilder) VisitSubselect

func (m *JobBuilder) VisitSubselect(from *expr.SqlSource) (expr.Task, error)

func (*JobBuilder) VisitUpdate

func (m *JobBuilder) VisitUpdate(stmt *expr.SqlUpdate) (expr.Task, error)

func (*JobBuilder) VisitUpsert

func (m *JobBuilder) VisitUpsert(stmt *expr.SqlUpsert) (expr.Task, error)

type JobRunner

type JobRunner interface {
	Setup() error
	Run() error
	Close() error
}

Job Runner is the main RunTime interface for running a SQL Job

type JoinKey

type JoinKey struct {
	*TaskBase
	// contains filtered or unexported fields
}

Evaluate messages to create JoinKey based message, where the

Join Key (composite of each value in join expr) hashes consistently

func NewJoinKey

func NewJoinKey(from *expr.SqlSource, conf *datasource.RuntimeSchema) (*JoinKey, error)

A JoinKey task that evaluates the compound JoinKey to allow

for parallelized join's

 source1   ->  JoinKey  ->  hash-route
                                       \
                                        --  join  -->
                                       /
 source2   ->  JoinKey  ->  hash-route

func (*JoinKey) Close

func (m *JoinKey) Close() error

func (*JoinKey) Copy

func (m *JoinKey) Copy() *JoinKey

func (*JoinKey) Run

func (m *JoinKey) Run(context *expr.Context) error

type JoinMerge

type JoinMerge struct {
	*TaskBase
	// contains filtered or unexported fields
}

Scan a data source for rows, feed into runner for join sources

  1. join SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name;

func NewJoinNaiveMerge

func NewJoinNaiveMerge(ltask, rtask TaskRunner, lfrom, rfrom *expr.SqlSource, conf *datasource.RuntimeSchema) (*JoinMerge, error)

A very stupid naive parallel join merge, uses Key() as value to merge

two different input channels

source1   ->
             \
               --  join  -->
             /
source2   ->

func (*JoinMerge) Close

func (m *JoinMerge) Close() error

func (*JoinMerge) Copy

func (m *JoinMerge) Copy() *JoinMerge

func (*JoinMerge) Run

func (m *JoinMerge) Run(context *expr.Context) error

type KeyEvaluator

type KeyEvaluator func(msg datasource.Message) driver.Value

type MessageChan

type MessageChan chan datasource.Message

type MessageHandler

type MessageHandler func(ctx *expr.Context, msg datasource.Message) bool

Handle/Forward a message for this Task

TODO:  this bool is either wrong, or not-used?   error?

func MakeHandler

func MakeHandler(task TaskRunner) MessageHandler

type Projection

type Projection struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewProjection

func NewProjection(sqlSelect *expr.SqlSelect) *Projection

type ResultBuffer

type ResultBuffer struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultBuffer

func NewResultBuffer(writeTo *[]datasource.Message) *ResultBuffer

func (*ResultBuffer) Close

func (m *ResultBuffer) Close() error

func (*ResultBuffer) Copy

func (m *ResultBuffer) Copy() *ResultBuffer

type ResultExecWriter

type ResultExecWriter struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultExecWriter

func NewResultExecWriter() *ResultExecWriter

func (*ResultExecWriter) Close

func (m *ResultExecWriter) Close() error

func (*ResultExecWriter) Copy

func (*ResultExecWriter) Result

func (m *ResultExecWriter) Result() driver.Result

type ResultWriter

type ResultWriter struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewResultRows

func NewResultRows(cols []string) *ResultWriter

func NewResultWriter

func NewResultWriter() *ResultWriter

func (*ResultWriter) Close

func (m *ResultWriter) Close() error

func (*ResultWriter) Columns

func (m *ResultWriter) Columns() []string

func (*ResultWriter) Copy

func (m *ResultWriter) Copy() *ResultWriter

func (*ResultWriter) Next

func (m *ResultWriter) Next(dest []driver.Value) error

Note, this is implementation of the sql/driver Rows() Next() interface

func (*ResultWriter) Run

func (m *ResultWriter) Run(ctx *expr.Context) error

For ResultWriter, since we are are not paging through messages

using this mesage channel, instead using Next() as defined by sql/driver
we don't read the input channel, just watch stop channels

type SigChan

type SigChan chan bool

type Source

type Source struct {
	*TaskBase

	JoinKey KeyEvaluator
	// contains filtered or unexported fields
}

Scan a data source for rows, feed into runner. The source scanner being

 a source is iter.Next() messages instead of sending them on input channel

1) table      -- FROM table
2) channels   -- FROM stream
3) join       -- SELECT t1.name, t2.salary
                     FROM employee AS t1
                     INNER JOIN info AS t2
                     ON t1.name = t2.name;
4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;

func NewSource

func NewSource(from *expr.SqlSource, source datasource.Scanner) *Source

A scanner to read from data source

func NewSourceJoin

func NewSourceJoin(from *expr.SqlSource, source datasource.Scanner) *Source

A scanner to read from sub-query data source (join, sub-query)

func (*Source) Close

func (m *Source) Close() error

func (*Source) Copy

func (m *Source) Copy() *Source

func (*Source) Run

func (m *Source) Run(context *expr.Context) error

type SourcePlan

type SourcePlan struct {
	SqlSource *expr.SqlSource
}

func NewSourcePlan

func NewSourcePlan(sql *expr.SqlSource) *SourcePlan

??? is this used?

func (*SourcePlan) Accept

func (m *SourcePlan) Accept(sub expr.SubVisitor) (expr.Task, error)

func (*SourcePlan) VisitJoin

func (m *SourcePlan) VisitJoin(stmt *expr.SqlSource) (expr.Task, error)

func (*SourcePlan) VisitSubselect

func (m *SourcePlan) VisitSubselect(stmt *expr.SqlSource) (expr.Task, error)

type SqlJob

type SqlJob struct {
	RootTask TaskRunner
	Stmt     expr.SqlStatement
	Conf     *datasource.RuntimeSchema
}

SqlJob is dag of tasks for sql execution

func BuildSqlJob

func BuildSqlJob(conf *datasource.RuntimeSchema, connInfo, sqlText string) (*SqlJob, error)

Create Job made up of sub-tasks in DAG that is the

plan for execution of this query/job

func (*SqlJob) Close

func (m *SqlJob) Close() error

func (*SqlJob) DrainChan

func (m *SqlJob) DrainChan() MessageChan

The drain is the last out channel, on last task

func (*SqlJob) Run

func (m *SqlJob) Run() error

func (*SqlJob) Setup

func (m *SqlJob) Setup() error

type TaskBase

type TaskBase struct {
	TaskType string
	Handler  MessageHandler
	// contains filtered or unexported fields
}

func NewTaskBase

func NewTaskBase(taskType string) *TaskBase

func (*TaskBase) Add

func (m *TaskBase) Add(task TaskRunner) error

func (*TaskBase) Children

func (m *TaskBase) Children() Tasks

func (*TaskBase) Close

func (m *TaskBase) Close() error

func (*TaskBase) ErrChan

func (m *TaskBase) ErrChan() ErrChan

func (*TaskBase) MessageIn

func (m *TaskBase) MessageIn() MessageChan

func (*TaskBase) MessageInSet

func (m *TaskBase) MessageInSet(ch MessageChan)

func (*TaskBase) MessageOut

func (m *TaskBase) MessageOut() MessageChan

func (*TaskBase) MessageOutSet

func (m *TaskBase) MessageOutSet(ch MessageChan)

func (*TaskBase) Run

func (m *TaskBase) Run(ctx *expr.Context) error

func (*TaskBase) Setup

func (m *TaskBase) Setup(depth int) error

func (*TaskBase) SigChan

func (m *TaskBase) SigChan() SigChan

func (*TaskBase) Type

func (m *TaskBase) Type() string

type TaskParallel

type TaskParallel struct {
	*TaskBase
	// contains filtered or unexported fields
}

A parallel set of tasks, this starts each child task and offers up

 an output channel that is a merger of each child

--> \
--> - ->
--> /

func NewTaskParallel

func NewTaskParallel(taskType string, input TaskRunner, tasks Tasks) *TaskParallel

func (*TaskParallel) Add

func (m *TaskParallel) Add(task TaskRunner) error

func (*TaskParallel) Children

func (m *TaskParallel) Children() Tasks

func (*TaskParallel) Close

func (m *TaskParallel) Close() error

func (*TaskParallel) Run

func (m *TaskParallel) Run(ctx *expr.Context) error

func (*TaskParallel) Setup

func (m *TaskParallel) Setup(depth int) error

type TaskRunner

type TaskRunner interface {
	expr.Task
	Children() Tasks
	Add(TaskRunner) error
	Type() string
	Setup(depth int) error
	MessageIn() MessageChan
	MessageOut() MessageChan
	MessageInSet(MessageChan)
	MessageOutSet(MessageChan)
	ErrChan() ErrChan
	SigChan() SigChan
}

TaskRunner is an interface for single dependent task in Dag of

Tasks necessary to execute a Job

- it may have children tasks - it may be parallel, distributed, etc

type TaskSequential

type TaskSequential struct {
	*TaskBase
	// contains filtered or unexported fields
}

func NewSequential

func NewSequential(taskType string, tasks Tasks) *TaskSequential

func (*TaskSequential) Add

func (m *TaskSequential) Add(task TaskRunner) error

func (*TaskSequential) Children

func (m *TaskSequential) Children() Tasks

func (*TaskSequential) Close

func (m *TaskSequential) Close() error

func (*TaskSequential) Run

func (m *TaskSequential) Run(ctx *expr.Context) error

func (*TaskSequential) Setup

func (m *TaskSequential) Setup(depth int) error

type TaskStepper

type TaskStepper struct {
	*TaskBase
}

On Task stepper we don't Run it, rather use a

Next() explicit call from end user

func NewTaskStepper

func NewTaskStepper(taskType string) *TaskStepper

func (*TaskStepper) Run

func (m *TaskStepper) Run(ctx *expr.Context) error

type Tasks

type Tasks []TaskRunner

func (*Tasks) Add

func (m *Tasks) Add(task TaskRunner)

Add a child Task

type Upsert

type Upsert struct {
	*TaskBase
	// contains filtered or unexported fields
}

Upsert data task

func NewInsertUpsert

func NewInsertUpsert(sql *expr.SqlInsert, db datasource.Upsert) *Upsert

An insert to write to data source

func NewUpdateUpsert

func NewUpdateUpsert(sql *expr.SqlUpdate, db datasource.Upsert) *Upsert

func NewUpsertUpsert

func NewUpsertUpsert(sql *expr.SqlUpsert, db datasource.Upsert) *Upsert

func (*Upsert) Close

func (m *Upsert) Close() error

func (*Upsert) Copy

func (m *Upsert) Copy() *Upsert

func (*Upsert) Run

func (m *Upsert) Run(ctx *expr.Context) error

type Visitor

type Visitor interface {
	VisitScan(v interface{}) (interface{}, error)
}

exec.Visitor defines standard Sql Visit() pattern to create

a job plan from sql statements

An implementation of Visitor() will be be able to execute/run a Statement

  • inproc: ie, in process
  • distributed: ie, run this job across multiple servers

type Where

type Where struct {
	*TaskBase
	// contains filtered or unexported fields
}

A filter to implement where clause

func NewWhereFilter

func NewWhereFilter(where expr.Node, stmt *expr.SqlSelect) *Where

Where-Filter

func NewWhereFinal

func NewWhereFinal(where expr.Node, stmt *expr.SqlSelect) *Where

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL