Documentation ¶
Index ¶
- Constants
- func ClearDAGState(RootRunner TaskRunner)
- func CreateAndSetTaskParamsFromHash(tr TaskRunner, paramHash string) error
- func CreateTaskRunnerFromParams(tr TaskRunner, params []*TaskParam) error
- func ResetDAGResultChannels(RootRunner TaskRunner)
- func RunTaskRunner(tRunner TaskRunner, wg *sync.WaitGroup, TokenReturn chan struct{})
- func SetParents(tRunner TaskRunner)
- func SetTaskPriorities(rootTask *Task) error
- type Task
- func (ts *Task) AddChildren(children ...TaskRunner) []TaskRunner
- func (ts *Task) End() time.Time
- func (ts *Task) GetHash() string
- func (ts *Task) GetSerializedParams() string
- func (ts *Task) SetEnd(e time.Time)
- func (ts *Task) SetStart(s time.Time)
- func (ts *Task) SetState(newState string) (string, error)
- func (ts *Task) Start() time.Time
- type TaskParam
- type TaskRunner
Constants ¶
const ( WAITING = "waiting" RUNNING = "running" COMPLETE = "complete" )
Variables ¶
This section is empty.
Functions ¶
func CreateAndSetTaskParamsFromHash ¶
func CreateAndSetTaskParamsFromHash(tr TaskRunner, paramHash string) error
CreateAndSetTaskParamsFromHash ....
func CreateTaskRunnerFromParams ¶
func CreateTaskRunnerFromParams(tr TaskRunner, params []*TaskParam) error
CreateTaskRunnerFromParams ... Given Params and a TaskRunner, sets all TaskRunner fields marked as param Note: The struct satisfying the TaskRunner interface MUST be passed to this function as a reference. See these articles for a more thorough explanation: https://stackoverflow.com/questions/6395076/using-reflect-how-do-you-set-the-value-of-a-struct-field http://speakmy.name/2014/09/14/modifying-interfaced-go-struct/
func ResetDAGResultChannels ¶
func ResetDAGResultChannels(RootRunner TaskRunner)
ResetDAGResultChannels ... Need to recreate result channels after each scheduler run, because they are close in RunTaskRunner
func RunTaskRunner ¶
func RunTaskRunner(tRunner TaskRunner, wg *sync.WaitGroup, TokenReturn chan struct{})
RunTaskRunner ... Runs a TaskRunner, sets state and notifies waiting group when run is done
func SetParents ¶
func SetParents(tRunner TaskRunner)
Types ¶
type Task ¶
type Task struct { Name string Children []TaskRunner Parent TaskRunner ResultsChannel chan string WorkerTokens chan struct{} State string Priority int Params []*TaskParam DataProcessed int Logger *log.Logger // contains filtered or unexported fields }
Task ...
func (*Task) AddChildren ¶
func (ts *Task) AddChildren(children ...TaskRunner) []TaskRunner
AddChildren ...
func (*Task) GetHash ¶
GetHash ... Returns a hash representation of the task. The elements that comprise the hash are
Task.Name
Serialized Task Params
All Child Serialized Params
These elements are joined together and hashed, which creates a fairly unique value. This is used primarily to rebuild a TaskRunner from its metadata table or determine whether two task DAGs are equal.
func (*Task) GetSerializedParams ¶
Given a Task, return a serialized string to represent it.
This is useful for storing the state of each Task that is run in the dag, and can
be used to re-create a previously run task for DAG re-runs, backfills, or some other use case. Example: type TestTask struct { *Task N int `task_param:""` X string `task_param:""` Z int } new_test_task := TestTask{ 5, "HI", 0, } serialized_test_task := new_test_task.GetSerializedParams() fmt.Println(serialized_test_task) > N:INT:5_X:STR:HI This function uses a lot of reflection / is kind of tricky. Would be good to document exactly how this works because I constantly forget what these variables mean :P
type TaskParam ¶
func CreateAndSetTaskParams ¶
func CreateAndSetTaskParams(tr TaskRunner) ([]*TaskParam, error)
CreateAndSetTaskParams ... Uses reflection to inspect struct elements for 'task_param' tag and sets tr.Task.Params accordingly
func DeserializeTaskParams ¶
type TaskRunner ¶
type TaskRunner interface { // TODO: have Run() return an error, which could be given to scheduler Run() // requires TaskRuner to have an embedded Task GetTask() *Task }
TaskRunner ...