core

package
v0.0.0-...-6fe1999 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const TASK_COMPLETED taskState = 3
View Source
const TASK_FAILED taskState = 4
View Source
const TASK_READY_TO_RUN taskState = 1
View Source
const TASK_RUNNING taskState = 2
View Source
const TASK_SCHEDULED taskState = 0

Variables

View Source
var OperatorsRegistry map[string]reflect.Type = map[string]reflect.Type{}

Functions

func Invoke

func Invoke(any reflect.Value, name string, args ...interface{})

func UnmarhsaledJSONtoTask

func UnmarhsaledJSONtoTask(taskData map[string]interface{}, dag *DAG)

UnmarhsaledJSONtoTask converts output of MarshalJSON() to a task. Make sure to add the task to the passed dag.tasks as well.

Types

type BaseTask

type BaseTask struct {
	// contains filtered or unexported fields
}

BaseTask contains the basic task attributes, and implements the basic methods that can be used. After deriving this struct using composition, you are expected to implement another `NewTask` and the `Execute` method. NOTE: Always use the `NewTask` method to create an instance of this struct.

func (*BaseTask) AddDownstream

func (task *BaseTask) AddDownstream(sibling TaskInterface)

func (*BaseTask) AddUpstream

func (task *BaseTask) AddUpstream(sibling TaskInterface)

func (*BaseTask) GetDag

func (task *BaseTask) GetDag() *DAG

func (*BaseTask) GetDescription

func (task *BaseTask) GetDescription() string

func (*BaseTask) GetDownstream

func (task *BaseTask) GetDownstream() map[string]struct{}

func (*BaseTask) GetKwargs

func (task *BaseTask) GetKwargs() Dict

func (*BaseTask) GetName

func (task *BaseTask) GetName() string

func (*BaseTask) GetRegistryName

func (task *BaseTask) GetRegistryName() string

func (*BaseTask) GetUpstream

func (task *BaseTask) GetUpstream() map[string]struct{}

func (*BaseTask) MarshalJSON

func (task *BaseTask) MarshalJSON() ([]byte, error)

func (*BaseTask) Run

func (task *BaseTask) Run()

func (*BaseTask) SetDag

func (task *BaseTask) SetDag(dag *DAG)

func (*BaseTask) SetDescription

func (task *BaseTask) SetDescription(description string)

func (*BaseTask) SetKwargs

func (task *BaseTask) SetKwargs(kwargs Dict)

func (*BaseTask) SetMetadata

func (task *BaseTask) SetMetadata(name string, description string, dag *DAG, kwargs Dict, taskType string)

Only for Operators which inherit BaseTask.

func (*BaseTask) SetName

func (task *BaseTask) SetName(name string)

func (*BaseTask) SetTaskType

func (task *BaseTask) SetTaskType(taskType string)

func (*BaseTask) String

func (task *BaseTask) String() string

func (*BaseTask) Void

func (task *BaseTask) Void()

Void does nothing. However this method is necessary to fill the voids in our DAGs.

type Command

type Command struct {
	CommandString string
	Flagset       *flag.FlagSet
}

Command is a struct view of a commandline command.

func (*Command) GetCommandString

func (comm *Command) GetCommandString() string

func (*Command) GetFlagSet

func (comm *Command) GetFlagSet() *flag.FlagSet

func (*Command) RunCommand

func (comm *Command) RunCommand() error

type CommandInterface

type CommandInterface interface {
	RunCommand() error
	GetCommandString() string
	GetFlagSet() *flag.FlagSet
}

CommandInterface is set of methods that need to be implemented for packages that want to expose a command for the command line.

type DAG

type DAG struct {
	Name        string
	Schedule    time.Duration
	DefaultArgs Dict
	Description string
	StartDate   time.Time
	// contains filtered or unexported fields
}

DAG is a basic collection of tasks that we want to run, organised in a way that specifies their dependencies and relationships.

func (*DAG) DetectCycles

func (dag *DAG) DetectCycles() error

func (*DAG) GetTasks

func (dag *DAG) GetTasks() map[string]TaskInterface

func (*DAG) LogInfo

func (dag *DAG) LogInfo()

LogInfo logs DAG info.

func (*DAG) MarshalJSON

func (dag *DAG) MarshalJSON() ([]byte, error)

func (*DAG) SetTasks

func (dag *DAG) SetTasks(tasks map[string]TaskInterface)

func (*DAG) TopologicalSortedTasks

func (dag *DAG) TopologicalSortedTasks() chan TaskInterface

TopologicalSortedTasks : Modify this so that it sends out new tasks in a channel on updates of finished tasks from a channel received in function param.

func (*DAG) UnmarshalJSON

func (dag *DAG) UnmarshalJSON(b []byte) error

UnmarshalJSON contains the logic of Unmarshaling the dag marshaled by MarshalDag. Check BaseTask.MarshalTask as well.

type DagRunType

type DagRunType struct {
	SchTime   int64
	Completed bool
}

func (*DagRunType) MarshalBinary

func (dr *DagRunType) MarshalBinary() ([]byte, error)

func (*DagRunType) UnmarshalBinary

func (dr *DagRunType) UnmarshalBinary(val []byte) error

type Dict

type Dict map[string]interface{}

Dict type, can be used to hold arbitrary data.

type TaskInstance

type TaskInstance struct {
	// 0 for scheduled, 1 for running, 2 for completed.
	State    taskState
	Time     time.Time
	TaskName string
	DagName  string
}

TODO: Add Execution Start Time

func (*TaskInstance) MarshalBinary

func (taskIn *TaskInstance) MarshalBinary() ([]byte, error)

func (*TaskInstance) UnmarshalBinary

func (taskIn *TaskInstance) UnmarshalBinary(val []byte) error

type TaskInterface

type TaskInterface interface {
	AddUpstream(TaskInterface)
	AddDownstream(TaskInterface)
	GetName() string
	GetDescription() string
	GetDag() *DAG
	GetKwargs() Dict
	String() string
	GetDownstream() map[string]struct{}
	GetUpstream() map[string]struct{}
	MarshalJSON() ([]byte, error)
	// Void does nothing. Call this method to silence errors on tasks in dags.
	Void()
	Run()
}

func NewTask

func NewTask(name string, description string, dag *DAG, kwargs Dict) (TaskInterface, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL