Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterSqlDriver()
- func RunJob(conf *datasource.RuntimeConfig, tasks Tasks) error
- func SetupTasks(tasks Tasks) error
- type Context
- type ErrChan
- type JobBuilder
- func (m *JobBuilder) VisitDelete(stmt *expr.SqlDelete) (interface{}, error)
- func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (interface{}, error)
- func (m *JobBuilder) VisitInsert(stmt *expr.SqlInsert) (interface{}, error)
- func (m *JobBuilder) VisitJoin(stmt *expr.SqlSource) (interface{}, error)
- func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (interface{}, error)
- func (m *JobBuilder) VisitSelect(stmt *expr.SqlSelect) (interface{}, error)
- func (m *JobBuilder) VisitShow(stmt *expr.SqlShow) (interface{}, error)
- func (m *JobBuilder) VisitSubselect(stmt *expr.SqlSource) (interface{}, error)
- func (m *JobBuilder) VisitUpdate(stmt *expr.SqlUpdate) (interface{}, error)
- func (m *JobBuilder) VisitUpsert(stmt *expr.SqlUpsert) (interface{}, error)
- type JobRunner
- type MessageChan
- type MessageHandler
- type Projection
- type ResultBuffer
- type ResultWriter
- type SigChan
- type Source
- type SourceJoin
- type SourcePlan
- type SqlJob
- type TaskBase
- func (m *TaskBase) Children() Tasks
- func (m *TaskBase) Close() error
- func (m *TaskBase) ErrChan() ErrChan
- func (m *TaskBase) MessageIn() MessageChan
- func (m *TaskBase) MessageInSet(ch MessageChan)
- func (m *TaskBase) MessageOut() MessageChan
- func (m *TaskBase) MessageOutSet(ch MessageChan)
- func (m *TaskBase) Run(ctx *Context) error
- func (m *TaskBase) SigChan() SigChan
- func (m *TaskBase) Type() string
- type TaskRunner
- type TaskStepper
- type Tasks
- type Visitor
- type Where
Constants ¶
const (
ItemDefaultChannelSize = 50
)
const (
MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)
Variables ¶
var (
ShuttingDownError = fmt.Errorf("Received Shutdown Signal")
)
Functions ¶
func RegisterSqlDriver ¶
func RegisterSqlDriver()
func RunJob ¶
func RunJob(conf *datasource.RuntimeConfig, tasks Tasks) error
Run a Sql Job, by running to completion each task
func SetupTasks ¶
Types ¶
type Context ¶
type Context struct { DisableRecover bool // contains filtered or unexported fields }
type JobBuilder ¶
type JobBuilder struct {
// contains filtered or unexported fields
}
This is a simple, single source Job Executor
we can create smarter ones but this is a basic implementation
func NewJobBuilder ¶
func NewJobBuilder(rtConf *datasource.RuntimeConfig, connInfo string) *JobBuilder
JobBuilder
@connInfo = connection string info for original connection
func (*JobBuilder) VisitDelete ¶
func (m *JobBuilder) VisitDelete(stmt *expr.SqlDelete) (interface{}, error)
func (*JobBuilder) VisitDescribe ¶
func (m *JobBuilder) VisitDescribe(stmt *expr.SqlDescribe) (interface{}, error)
func (*JobBuilder) VisitInsert ¶
func (m *JobBuilder) VisitInsert(stmt *expr.SqlInsert) (interface{}, error)
func (*JobBuilder) VisitJoin ¶
func (m *JobBuilder) VisitJoin(stmt *expr.SqlSource) (interface{}, error)
func (*JobBuilder) VisitPreparedStmt ¶
func (m *JobBuilder) VisitPreparedStmt(stmt *expr.PreparedStatement) (interface{}, error)
func (*JobBuilder) VisitSelect ¶
func (m *JobBuilder) VisitSelect(stmt *expr.SqlSelect) (interface{}, error)
func (*JobBuilder) VisitShow ¶
func (m *JobBuilder) VisitShow(stmt *expr.SqlShow) (interface{}, error)
func (*JobBuilder) VisitSubselect ¶
func (m *JobBuilder) VisitSubselect(stmt *expr.SqlSource) (interface{}, error)
func (*JobBuilder) VisitUpdate ¶
func (m *JobBuilder) VisitUpdate(stmt *expr.SqlUpdate) (interface{}, error)
func (*JobBuilder) VisitUpsert ¶
func (m *JobBuilder) VisitUpsert(stmt *expr.SqlUpsert) (interface{}, error)
type MessageChan ¶
type MessageChan chan datasource.Message
type MessageHandler ¶
type MessageHandler func(ctx *Context, msg datasource.Message) bool
func MakeHandler ¶
func MakeHandler(task TaskRunner) MessageHandler
type Projection ¶
type Projection struct { *TaskBase // contains filtered or unexported fields }
func NewProjection ¶
func NewProjection(sqlSelect *expr.SqlSelect) *Projection
type ResultBuffer ¶
type ResultBuffer struct { *TaskBase // contains filtered or unexported fields }
func NewResultBuffer ¶
func NewResultBuffer(writeTo *[]datasource.Message) *ResultBuffer
func (*ResultBuffer) Close ¶
func (m *ResultBuffer) Close() error
func (*ResultBuffer) Copy ¶
func (m *ResultBuffer) Copy() *ResultBuffer
type ResultWriter ¶
type ResultWriter struct { *TaskBase // contains filtered or unexported fields }
func NewResultRows ¶
func NewResultRows(cols []string) *ResultWriter
func NewResultWriter ¶
func NewResultWriter() *ResultWriter
func (*ResultWriter) Close ¶
func (m *ResultWriter) Close() error
func (*ResultWriter) Columns ¶
func (m *ResultWriter) Columns() []string
func (*ResultWriter) Copy ¶
func (m *ResultWriter) Copy() *ResultWriter
func (*ResultWriter) Next ¶
func (m *ResultWriter) Next(dest []driver.Value) error
Note, this is implementation of the sql/driver Rows() Next() interface
func (*ResultWriter) Run ¶
func (m *ResultWriter) Run(ctx *Context) error
For ResultWriter, since we are are not paging through messages
using this mesage channel, instead using Next() as defined by sql/driver we don't read the input channel, just watch stop channels
type Source ¶
type Source struct { *TaskBase // contains filtered or unexported fields }
Scan a data source for rows, feed into runner. The source scanner being
a source is iter.Next() messages instead of sending them on input channel 1) table -- FROM table 2) channels -- FROM stream 3) join -- SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name; 4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;
type SourceJoin ¶
type SourceJoin struct { *TaskBase // contains filtered or unexported fields }
Scan a data source for rows, feed into runner for join sources
- join SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name;
func NewSourceJoin ¶
func NewSourceJoin(builder expr.SubVisitor, leftFrom, rightFrom *expr.SqlSource, conf *datasource.RuntimeConfig) (*SourceJoin, error)
A scanner to read from data source
func (*SourceJoin) Close ¶
func (m *SourceJoin) Close() error
func (*SourceJoin) Copy ¶
func (m *SourceJoin) Copy() *Source
func (*SourceJoin) Run ¶
func (m *SourceJoin) Run(context *Context) error
type SourcePlan ¶
func NewSourcePlan ¶
func NewSourcePlan(sql *expr.SqlSource) *SourcePlan
func (*SourcePlan) Accept ¶
func (m *SourcePlan) Accept(sub expr.SubVisitor) (interface{}, error)
func (*SourcePlan) VisitJoin ¶
func (m *SourcePlan) VisitJoin(stmt *expr.SqlSource) (interface{}, error)
func (*SourcePlan) VisitSubselect ¶
func (m *SourcePlan) VisitSubselect(stmt *expr.SqlSource) (interface{}, error)
type SqlJob ¶
type SqlJob struct { Tasks Tasks Stmt expr.SqlStatement Conf *datasource.RuntimeConfig }
SqlJob is dag of tasks for sql execution
func BuildSqlJob ¶
func BuildSqlJob(conf *datasource.RuntimeConfig, connInfo, sqlText string) (*SqlJob, error)
Create Job made up of sub-tasks in DAG that is the
plan for execution of this query/job
func (*SqlJob) DrainChan ¶
func (m *SqlJob) DrainChan() MessageChan
The drain is the last out channel, on last task
type TaskBase ¶
type TaskBase struct { TaskType string Handler MessageHandler // contains filtered or unexported fields }
func NewTaskBase ¶
func (*TaskBase) MessageIn ¶
func (m *TaskBase) MessageIn() MessageChan
func (*TaskBase) MessageInSet ¶
func (m *TaskBase) MessageInSet(ch MessageChan)
func (*TaskBase) MessageOut ¶
func (m *TaskBase) MessageOut() MessageChan
func (*TaskBase) MessageOutSet ¶
func (m *TaskBase) MessageOutSet(ch MessageChan)
type TaskRunner ¶
type TaskRunner interface { Children() Tasks Type() string MessageIn() MessageChan MessageOut() MessageChan MessageInSet(MessageChan) MessageOutSet(MessageChan) ErrChan() ErrChan SigChan() SigChan Run(ctx *Context) error Close() error }
TaskRunner is an interface for single dependent task in Dag of
Tasks necessary to execute a Job
- it may have children tasks - it may be parallel, distributed, etc
type TaskStepper ¶
type TaskStepper struct {
*TaskBase
}
On Task stepper we don't Run it, rather use a
Next() explicit call from end user
func NewTaskStepper ¶
func NewTaskStepper(taskType string) *TaskStepper
func (*TaskStepper) Run ¶
func (m *TaskStepper) Run(ctx *Context) error
type Tasks ¶
type Tasks []TaskRunner
type Visitor ¶
type Visitor interface {
VisitScan(v interface{}) (interface{}, error)
}
exec.Visitor defines standard Sql Visit() pattern to create
a job plan from sql statements
An implementation of Visitor() will be be able to execute/run a Statement
- inproc: ie, in process
- distributed: ie, run this job across multiple servers