Documentation ¶
Index ¶
- func RegisterTaskdef(name string, f NewTaskFunc)
- type DatastoreTaskable
- type NewTaskFunc
- type Progress
- type Task
- func (t Task) DatastoreType() string
- func (t *Task) Delete(store datastore.Datastore) error
- func (task *Task) Do(store datastore.Datastore, tc chan *Task) error
- func (task *Task) Enqueue(store datastore.Datastore, amqpurl string) error
- func (t Task) GetId() string
- func (t Task) Key() datastore.Key
- func (t *Task) NewSQLModel(key datastore.Key) sql_datastore.Model
- func (t *Task) PubSubChannelName() string
- func (t *Task) QueueMsg() (amqp.Publishing, error)
- func (t *Task) Read(store datastore.Datastore) error
- func (t *Task) SQLParams(cmd sql_datastore.Cmd) []interface{}
- func (t *Task) SQLQuery(cmd sql_datastore.Cmd) string
- func (t *Task) Save(store datastore.Datastore) (err error)
- func (t *Task) StatusString() string
- func (t *Task) UnmarshalSQL(row sqlutil.Scannable) error
- type TaskRequests
- type Taskable
- type TasksEnqueueParams
- type TasksGetParams
- type TasksListParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterTaskdef ¶
func RegisterTaskdef(name string, f NewTaskFunc)
RegisterTaskdef registers a task type, must be called before a task can be used.
Types ¶
type DatastoreTaskable ¶
type DatastoreTaskable interface { Taskable SetDatastore(ds datastore.Datastore) }
SqlDbTaskable is a task that has a method for assigning a datastore to the task. If your task needs access to a datastore, implement DatastoreTaskable, task-orchestrators will detect this method and call it to set the datastore before calling Taskable.Do
type NewTaskFunc ¶
type NewTaskFunc func() Taskable
NewTaskFunc is a function that creates new task instances task-orchestrators use NewTaskFunc to create new Tasks, and then attempt to json.Unmarshal params into the task definition
type Progress ¶
type Progress struct { Percent float32 `json:"percent"` // percent complete between 0.0 & 1.0 Step int `json:"step"` // current Step Steps int `json:"steps"` // number of Steps in the task Status string `json:"status"` // status string that describes what is currently happening Done bool `json:"done"` // complete flag Dest string `json:"dest"` // place for sending users, could be a url, could be a relative path Error error `json:"error,omitempty"` // error message }
Progress represents the current state of a task tasks will be given a Progress channel to send updates
type Task ¶
type Task struct { // uuid identifier for task Id string `json:"id"` // created date rounded to secounds Created time.Time `json:"created"` // updated date rounded to secounds Updated time.Time `json:"updated"` // human-readable title for the task, meant to be descriptive & varied Title string `json:"title"` // id of user that submitted this task UserId string `json:"userId"` // Type of task to be executed Type string `json:"type"` // parameters supplied to the task, should be json bytes Params map[string]interface{} `json:"params"` // Status Message Status string `json:"status,omitempty"` // Error Message Error string `json:"error,omitempty"` // timstamp for when request was added to the tasks queue // nil if request hasn't been sent to the queue Enqueued *time.Time `json:"enqueued,omitempty"` // timestamp for when the task was removed from the queue // and started, nil if the request hasn't been started Started *time.Time `json:"started,omitempty"` // timestamp for when request succeeded // nil if task hasn't succeeded Succeeded *time.Time `json:"succeeded,omitempty"` // timestamp for when request failed // nil if task hasn't failed Failed *time.Time `json:"failed,omitempty"` // progress of this task's completion // progress may not be stored, but instead kept ephemerally Progress *Progress `json:"progress,omitempty"` }
Task represents the storable state of a task. Note this is not the "task" itself (the function that will be called to do the actual work associated with a task) but the state associated with performing a task. Task holds the type of work to be done, parameters to configure the work to be done, and the status of that work. different types of "work" are done by implementing the Taskable interface specified in taskdef.go lots of the methods on Task overlap with Taskable, this is on purpose, as Task wraps phases of task completion to track the state of a task
func TaskFromDelivery ¶
TaskFromDelivery reads a task from store based on an amqp.Delivery message
func (Task) DatastoreType ¶
DatastoreType is to fulfill the sql_datastore.Model interface It distinguishes "Task" as a storable type. "Task" is not (yet) intended for use outside of Datatogether servers.
func (*Task) Enqueue ¶
Enqueue adds a task to the queue located at ampqurl, writing creates/updates for the task to the given store
func (Task) Key ¶
func (t Task) Key() datastore.Key
Key is to fulfill the sql_datastore.Model interface
func (*Task) NewSQLModel ¶
func (t *Task) NewSQLModel(key datastore.Key) sql_datastore.Model
func (*Task) PubSubChannelName ¶
func (*Task) QueueMsg ¶
func (t *Task) QueueMsg() (amqp.Publishing, error)
QueueMsg formats the task as an amqp.Publishing message for adding to a queue
func (*Task) SQLParams ¶
func (t *Task) SQLParams(cmd sql_datastore.Cmd) []interface{}
func (*Task) StatusString ¶
StatusString returns a string representation of the status of a task based on the state of it's date stamps
type TaskRequests ¶
type TaskRequests struct { // url to amqp server for enqueuing tasks, only required // to fullfill requests, not submit them AmqpUrl string // Store to read / write tasks to only required // to fulfill requests, not submit them Store datastore.Datastore }
TaskRequests encapsulates all types of requests that can be made in relation to tasks, to be made available for RPC calls. TODO - should this internal state be moved into the package level via package-level setter funcs?
func (TaskRequests) Enqueue ¶
func (r TaskRequests) Enqueue(params *TasksEnqueueParams, task *Task) (err error)
Add a task to the queue for completion
func (TaskRequests) Get ¶
func (t TaskRequests) Get(args *TasksGetParams, res *Task) (err error)
func (TaskRequests) List ¶
func (t TaskRequests) List(args *TasksListParams, res *[]*Task) (err error)
type Taskable ¶
type Taskable interface { // are these task params valid? return error if not // this func will be called before adding the task to // the queue, and won't be added on failure. Valid() error // Do the task, returning incremental progress updates // it's expected that the func will send either // p.Done == true or p.Error != nil at least once // to signal that the task is either done or errored Do(updates chan Progress) }
Taskable anything that fits on a task queue, it is a type of "work" that can be performed. Lots of things
func NewTaskable ¶
NewTaskable generates a new Taskable instance from the registered types
type TasksEnqueueParams ¶
type TasksEnqueueParams struct { // Title of the task // Requesters should generate their own task title for now // tasks currently have no way of generating a sensible default title Title string // Type of task to perform Type string // User that initiated the request UserId string // Parameters to feed to the task Params map[string]interface{} }
TasksEnqueueParams are for enqueing a task.
type TasksGetParams ¶
type TasksGetParams struct {
Id string
}
Get a single Task, currently only lookup by ID is supported