Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterSqlDriver()
- type DeletionScanner
- type DeletionTask
- type ErrChan
- type JobBuilder
- func (m *JobBuilder) VisitCommand(stmt *expr.SqlCommand) (expr.Task, error)
- func (m *JobBuilder) VisitDelete(stmt *expr.SqlDelete) (expr.Task, error)
- func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (expr.Task, error)
- func (m *JobBuilder) VisitInsert(stmt *expr.SqlInsert) (expr.Task, error)
- func (m *JobBuilder) VisitJoin(from *expr.SqlSource) (expr.Task, error)
- func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (expr.Task, error)
- func (m *JobBuilder) VisitSelect(stmt *expr.SqlSelect) (expr.Task, error)
- func (m *JobBuilder) VisitShow(stmt *expr.SqlShow) (expr.Task, error)
- func (m *JobBuilder) VisitSubselect(from *expr.SqlSource) (expr.Task, error)
- func (m *JobBuilder) VisitUpdate(stmt *expr.SqlUpdate) (expr.Task, error)
- func (m *JobBuilder) VisitUpsert(stmt *expr.SqlUpsert) (expr.Task, error)
- type JobRunner
- type JoinKey
- type JoinMerge
- type KeyEvaluator
- type MessageChan
- type MessageHandler
- type Projection
- type ResultBuffer
- type ResultExecWriter
- type ResultWriter
- type SigChan
- type Source
- type SourcePlan
- type SqlJob
- type TaskBase
- func (m *TaskBase) Add(task TaskRunner) error
- func (m *TaskBase) Children() Tasks
- func (m *TaskBase) Close() error
- func (m *TaskBase) ErrChan() ErrChan
- func (m *TaskBase) MessageIn() MessageChan
- func (m *TaskBase) MessageInSet(ch MessageChan)
- func (m *TaskBase) MessageOut() MessageChan
- func (m *TaskBase) MessageOutSet(ch MessageChan)
- func (m *TaskBase) Run(ctx *expr.Context) error
- func (m *TaskBase) Setup(depth int) error
- func (m *TaskBase) SigChan() SigChan
- func (m *TaskBase) Type() string
- type TaskParallel
- type TaskRunner
- type TaskSequential
- type TaskStepper
- type Tasks
- type Upsert
- type Visitor
- type Where
Constants ¶
const (
ItemDefaultChannelSize = 50
)
const (
MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)
Variables ¶
var (
ShuttingDownError = fmt.Errorf("Received Shutdown Signal")
)
Functions ¶
func RegisterSqlDriver ¶
func RegisterSqlDriver()
Types ¶
type DeletionScanner ¶
type DeletionScanner struct {
*DeletionTask
}
type DeletionTask ¶
type DeletionTask struct { *TaskBase // contains filtered or unexported fields }
Delete task
func NewDelete ¶
func NewDelete(sql *expr.SqlDelete, db datasource.Deletion) *DeletionTask
An inserter to write to data source
func (*DeletionTask) Close ¶
func (m *DeletionTask) Close() error
func (*DeletionTask) Copy ¶
func (m *DeletionTask) Copy() *DeletionTask
type JobBuilder ¶
type JobBuilder struct {
// contains filtered or unexported fields
}
This is a simple, single source Job Executor
we can create smarter ones but this is a basic implementation for
/ running in-process, not distributed
func NewJobBuilder ¶
func NewJobBuilder(schema *datasource.RuntimeSchema, connInfo string) *JobBuilder
JobBuilder
@schema = the config/runtime schema info @connInfo = connection string info for original connection
func (*JobBuilder) VisitCommand ¶
func (m *JobBuilder) VisitCommand(stmt *expr.SqlCommand) (expr.Task, error)
func (*JobBuilder) VisitDelete ¶
func (*JobBuilder) VisitDescribe ¶
func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (expr.Task, error)
func (*JobBuilder) VisitInsert ¶
func (*JobBuilder) VisitPreparedStmt ¶
func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (expr.Task, error)
func (*JobBuilder) VisitSelect ¶
func (*JobBuilder) VisitSubselect ¶
func (*JobBuilder) VisitUpdate ¶
func (*JobBuilder) VisitUpsert ¶
type JoinKey ¶
type JoinKey struct { *TaskBase // contains filtered or unexported fields }
Evaluate messages to create JoinKey based message, where the
Join Key (composite of each value in join expr) hashes consistently
func NewJoinKey ¶
func NewJoinKey(from *expr.SqlSource, conf *datasource.RuntimeSchema) (*JoinKey, error)
A JoinKey task that evaluates the compound JoinKey to allow
for parallelized join's source1 -> JoinKey -> hash-route \ -- join --> / source2 -> JoinKey -> hash-route
type JoinMerge ¶
type JoinMerge struct { *TaskBase // contains filtered or unexported fields }
Scan a data source for rows, feed into runner for join sources
- join SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name;
func NewJoinNaiveMerge ¶
func NewJoinNaiveMerge(ltask, rtask TaskRunner, lfrom, rfrom *expr.SqlSource, conf *datasource.RuntimeSchema) (*JoinMerge, error)
A very stupid naive parallel join merge, uses Key() as value to merge
two different input channels source1 -> \ -- join --> / source2 ->
type KeyEvaluator ¶
type KeyEvaluator func(msg datasource.Message) driver.Value
type MessageChan ¶
type MessageChan chan datasource.Message
type MessageHandler ¶
type MessageHandler func(ctx *expr.Context, msg datasource.Message) bool
Handle/Forward a message for this Task
TODO: this bool is either wrong, or not-used? error?
func MakeHandler ¶
func MakeHandler(task TaskRunner) MessageHandler
type Projection ¶
type Projection struct { *TaskBase // contains filtered or unexported fields }
func NewProjection ¶
func NewProjection(sqlSelect *expr.SqlSelect) *Projection
type ResultBuffer ¶
type ResultBuffer struct { *TaskBase // contains filtered or unexported fields }
func NewResultBuffer ¶
func NewResultBuffer(writeTo *[]datasource.Message) *ResultBuffer
func (*ResultBuffer) Close ¶
func (m *ResultBuffer) Close() error
func (*ResultBuffer) Copy ¶
func (m *ResultBuffer) Copy() *ResultBuffer
type ResultExecWriter ¶
type ResultExecWriter struct { *TaskBase // contains filtered or unexported fields }
func NewResultExecWriter ¶
func NewResultExecWriter() *ResultExecWriter
func (*ResultExecWriter) Close ¶
func (m *ResultExecWriter) Close() error
func (*ResultExecWriter) Copy ¶
func (m *ResultExecWriter) Copy() *ResultExecWriter
func (*ResultExecWriter) Result ¶
func (m *ResultExecWriter) Result() driver.Result
type ResultWriter ¶
type ResultWriter struct { *TaskBase // contains filtered or unexported fields }
func NewResultRows ¶
func NewResultRows(cols []string) *ResultWriter
func NewResultWriter ¶
func NewResultWriter() *ResultWriter
func (*ResultWriter) Close ¶
func (m *ResultWriter) Close() error
func (*ResultWriter) Columns ¶
func (m *ResultWriter) Columns() []string
func (*ResultWriter) Copy ¶
func (m *ResultWriter) Copy() *ResultWriter
type Source ¶
type Source struct { *TaskBase JoinKey KeyEvaluator // contains filtered or unexported fields }
Scan a data source for rows, feed into runner. The source scanner being
a source is iter.Next() messages instead of sending them on input channel 1) table -- FROM table 2) channels -- FROM stream 3) join -- SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name; 4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;
func NewSource ¶
func NewSource(from *expr.SqlSource, source datasource.Scanner) *Source
A scanner to read from data source
func NewSourceJoin ¶
func NewSourceJoin(from *expr.SqlSource, source datasource.Scanner) *Source
A scanner to read from sub-query data source (join, sub-query)
type SourcePlan ¶
func (*SourcePlan) Accept ¶
func (m *SourcePlan) Accept(sub expr.SubVisitor) (expr.Task, error)
func (*SourcePlan) VisitSubselect ¶
type SqlJob ¶
type SqlJob struct { RootTask TaskRunner Stmt expr.SqlStatement Conf *datasource.RuntimeSchema }
SqlJob is dag of tasks for sql execution
func BuildSqlJob ¶
func BuildSqlJob(conf *datasource.RuntimeSchema, connInfo, sqlText string) (*SqlJob, error)
Create Job made up of sub-tasks in DAG that is the
plan for execution of this query/job
func (*SqlJob) DrainChan ¶
func (m *SqlJob) DrainChan() MessageChan
The drain is the last out channel, on last task
type TaskBase ¶
type TaskBase struct { TaskType string Handler MessageHandler // contains filtered or unexported fields }
func NewTaskBase ¶
func (*TaskBase) Add ¶
func (m *TaskBase) Add(task TaskRunner) error
func (*TaskBase) MessageIn ¶
func (m *TaskBase) MessageIn() MessageChan
func (*TaskBase) MessageInSet ¶
func (m *TaskBase) MessageInSet(ch MessageChan)
func (*TaskBase) MessageOut ¶
func (m *TaskBase) MessageOut() MessageChan
func (*TaskBase) MessageOutSet ¶
func (m *TaskBase) MessageOutSet(ch MessageChan)
type TaskParallel ¶
type TaskParallel struct { *TaskBase // contains filtered or unexported fields }
A parallel set of tasks, this starts each child task and offers up
an output channel that is a merger of each child --> \ --> - -> --> /
func NewTaskParallel ¶
func NewTaskParallel(taskType string, input TaskRunner, tasks Tasks) *TaskParallel
func (*TaskParallel) Add ¶
func (m *TaskParallel) Add(task TaskRunner) error
func (*TaskParallel) Children ¶
func (m *TaskParallel) Children() Tasks
func (*TaskParallel) Close ¶
func (m *TaskParallel) Close() error
func (*TaskParallel) Setup ¶
func (m *TaskParallel) Setup(depth int) error
type TaskRunner ¶
type TaskRunner interface { expr.Task Children() Tasks Add(TaskRunner) error Type() string Setup(depth int) error MessageIn() MessageChan MessageOut() MessageChan MessageInSet(MessageChan) MessageOutSet(MessageChan) ErrChan() ErrChan SigChan() SigChan }
TaskRunner is an interface for single dependent task in Dag of
Tasks necessary to execute a Job
- it may have children tasks - it may be parallel, distributed, etc
type TaskSequential ¶
type TaskSequential struct { *TaskBase // contains filtered or unexported fields }
func NewSequential ¶
func NewSequential(taskType string, tasks Tasks) *TaskSequential
func (*TaskSequential) Add ¶
func (m *TaskSequential) Add(task TaskRunner) error
func (*TaskSequential) Children ¶
func (m *TaskSequential) Children() Tasks
func (*TaskSequential) Close ¶
func (m *TaskSequential) Close() error
func (*TaskSequential) Setup ¶
func (m *TaskSequential) Setup(depth int) error
type TaskStepper ¶
type TaskStepper struct {
*TaskBase
}
On Task stepper we don't Run it, rather use a
Next() explicit call from end user
func NewTaskStepper ¶
func NewTaskStepper(taskType string) *TaskStepper
type Tasks ¶
type Tasks []TaskRunner
type Upsert ¶
type Upsert struct { *TaskBase // contains filtered or unexported fields }
Upsert data task
func NewInsertUpsert ¶
func NewInsertUpsert(sql *expr.SqlInsert, db datasource.Upsert) *Upsert
An insert to write to data source
func NewUpdateUpsert ¶
func NewUpdateUpsert(sql *expr.SqlUpdate, db datasource.Upsert) *Upsert
func NewUpsertUpsert ¶
func NewUpsertUpsert(sql *expr.SqlUpsert, db datasource.Upsert) *Upsert
type Visitor ¶
type Visitor interface {
VisitScan(v interface{}) (interface{}, error)
}
exec.Visitor defines standard Sql Visit() pattern to create
a job plan from sql statements
An implementation of Visitor() will be be able to execute/run a Statement
- inproc: ie, in process
- distributed: ie, run this job across multiple servers
type Where ¶
type Where struct { *TaskBase // contains filtered or unexported fields }
A filter to implement where clause
func NewWhereFilter ¶
Where-Filter