Documentation ¶
Index ¶
- Constants
- Variables
- func Invoke(any reflect.Value, name string, args ...interface{})
- func UnmarhsaledJSONtoTask(taskData map[string]interface{}, dag *DAG)
- type BaseTask
- func (task *BaseTask) AddDownstream(sibling TaskInterface)
- func (task *BaseTask) AddUpstream(sibling TaskInterface)
- func (task *BaseTask) GetDag() *DAG
- func (task *BaseTask) GetDescription() string
- func (task *BaseTask) GetDownstream() map[string]struct{}
- func (task *BaseTask) GetKwargs() Dict
- func (task *BaseTask) GetName() string
- func (task *BaseTask) GetRegistryName() string
- func (task *BaseTask) GetUpstream() map[string]struct{}
- func (task *BaseTask) MarshalJSON() ([]byte, error)
- func (task *BaseTask) Run()
- func (task *BaseTask) SetDag(dag *DAG)
- func (task *BaseTask) SetDescription(description string)
- func (task *BaseTask) SetKwargs(kwargs Dict)
- func (task *BaseTask) SetMetadata(name string, description string, dag *DAG, kwargs Dict, taskType string)
- func (task *BaseTask) SetName(name string)
- func (task *BaseTask) SetTaskType(taskType string)
- func (task *BaseTask) String() string
- func (task *BaseTask) Void()
- type Command
- type CommandInterface
- type DAG
- func (dag *DAG) DetectCycles() error
- func (dag *DAG) GetTasks() map[string]TaskInterface
- func (dag *DAG) LogInfo()
- func (dag *DAG) MarshalJSON() ([]byte, error)
- func (dag *DAG) SetTasks(tasks map[string]TaskInterface)
- func (dag *DAG) TopologicalSortedTasks() chan TaskInterface
- func (dag *DAG) UnmarshalJSON(b []byte) error
- type DagRunType
- type Dict
- type TaskInstance
- type TaskInterface
Constants ¶
const TASK_COMPLETED taskState = 3
const TASK_FAILED taskState = 4
const TASK_READY_TO_RUN taskState = 1
const TASK_RUNNING taskState = 2
const TASK_SCHEDULED taskState = 0
Variables ¶
Functions ¶
func UnmarhsaledJSONtoTask ¶
UnmarhsaledJSONtoTask converts output of MarshalJSON() to a task. Make sure to add the task to the passed dag.tasks as well.
Types ¶
type BaseTask ¶
type BaseTask struct {
// contains filtered or unexported fields
}
BaseTask contains the basic task attributes, and implements the basic methods that can be used. After deriving this struct using composition, you are expected to implement another `NewTask` and the `Execute` method. NOTE: Always use the `NewTask` method to create an instance of this struct.
func (*BaseTask) AddDownstream ¶
func (task *BaseTask) AddDownstream(sibling TaskInterface)
func (*BaseTask) AddUpstream ¶
func (task *BaseTask) AddUpstream(sibling TaskInterface)
func (*BaseTask) GetDescription ¶
func (*BaseTask) GetDownstream ¶
func (*BaseTask) GetRegistryName ¶
func (*BaseTask) GetUpstream ¶
func (*BaseTask) MarshalJSON ¶
func (*BaseTask) SetDescription ¶
func (*BaseTask) SetMetadata ¶
func (task *BaseTask) SetMetadata(name string, description string, dag *DAG, kwargs Dict, taskType string)
Only for Operators which inherit BaseTask.
func (*BaseTask) SetTaskType ¶
type Command ¶
Command is a struct view of a commandline command.
func (*Command) GetCommandString ¶
func (*Command) GetFlagSet ¶
func (*Command) RunCommand ¶
type CommandInterface ¶
type CommandInterface interface { RunCommand() error GetCommandString() string GetFlagSet() *flag.FlagSet }
CommandInterface is set of methods that need to be implemented for packages that want to expose a command for the command line.
type DAG ¶
type DAG struct { Name string Schedule time.Duration DefaultArgs Dict Description string StartDate time.Time // contains filtered or unexported fields }
DAG is a basic collection of tasks that we want to run, organised in a way that specifies their dependencies and relationships.
func (*DAG) DetectCycles ¶
func (*DAG) GetTasks ¶
func (dag *DAG) GetTasks() map[string]TaskInterface
func (*DAG) MarshalJSON ¶
func (*DAG) SetTasks ¶
func (dag *DAG) SetTasks(tasks map[string]TaskInterface)
func (*DAG) TopologicalSortedTasks ¶
func (dag *DAG) TopologicalSortedTasks() chan TaskInterface
TopologicalSortedTasks : Modify this so that it sends out new tasks in a channel on updates of finished tasks from a channel received in function param.
func (*DAG) UnmarshalJSON ¶
UnmarshalJSON contains the logic of Unmarshaling the dag marshaled by MarshalDag. Check BaseTask.MarshalTask as well.
type DagRunType ¶
func (*DagRunType) MarshalBinary ¶
func (dr *DagRunType) MarshalBinary() ([]byte, error)
func (*DagRunType) UnmarshalBinary ¶
func (dr *DagRunType) UnmarshalBinary(val []byte) error
type TaskInstance ¶
type TaskInstance struct { // 0 for scheduled, 1 for running, 2 for completed. State taskState Time time.Time TaskName string DagName string }
TODO: Add Execution Start Time
func (*TaskInstance) MarshalBinary ¶
func (taskIn *TaskInstance) MarshalBinary() ([]byte, error)
func (*TaskInstance) UnmarshalBinary ¶
func (taskIn *TaskInstance) UnmarshalBinary(val []byte) error
type TaskInterface ¶
type TaskInterface interface { AddUpstream(TaskInterface) AddDownstream(TaskInterface) GetName() string GetDescription() string GetDag() *DAG GetKwargs() Dict String() string GetDownstream() map[string]struct{} GetUpstream() map[string]struct{} MarshalJSON() ([]byte, error) // Void does nothing. Call this method to silence errors on tasks in dags. Void() Run() }