Documentation ¶
Index ¶
- func BuildRestApi(e *echo.Echo, prefix string, controller *TaskController, ...)
- func ServeRestApi(wg *sync.WaitGroup, controller *TaskController, ...) (*http.Server, *echo.Echo)
- type ChildTask
- type HttpPostClient
- type MemoryTaskStorage
- func (storage *MemoryTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)
- func (storage *MemoryTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)
- func (storage *MemoryTaskStorage) DeleteTask(taskId string) (err error)
- func (storage *MemoryTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)
- func (storage *MemoryTaskStorage) FindTask(taskId string) (task *Task, err error)
- func (storage *MemoryTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)
- func (storage *MemoryTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)
- func (storage *MemoryTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)
- func (storage *MemoryTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)
- func (storage *MemoryTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)
- func (storage *MemoryTaskStorage) SaveTask(task *Task, create bool) (err error)
- func (storage *MemoryTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)
- func (storage *MemoryTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)
- type RedisTaskStorage
- func (storage *RedisTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)
- func (storage *RedisTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) AllTasksInList(path string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) DeleteTask(taskId string) (err error)
- func (storage *RedisTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)
- func (storage *RedisTaskStorage) FindTask(taskId string) (task *Task, err error)
- func (storage *RedisTaskStorage) FindTaskAtPath(path string) (task *Task, err error)
- func (storage *RedisTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)
- func (storage *RedisTaskStorage) FindTaskGroupAtPath(path string) (taskGroup *TaskGroup, err error)
- func (storage *RedisTaskStorage) GetExpiration() time.Duration
- func (storage *RedisTaskStorage) GetLockExpiration() time.Duration
- func (storage *RedisTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)
- func (storage *RedisTaskStorage) SaveTask(task *Task, create bool) (err error)
- func (storage *RedisTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)
- func (storage *RedisTaskStorage) TaskGroupKey(taskGroupId string) string
- func (storage *RedisTaskStorage) TaskGroupsPrefix() string
- func (storage *RedisTaskStorage) TaskKey(taskId string) string
- func (storage *RedisTaskStorage) TaskMutexKey(taskId string) string
- func (storage *RedisTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)
- type Task
- type TaskClient
- type TaskController
- func (controller *TaskController) CreateTask(task *Task) (err error)
- func (controller *TaskController) CreateTaskGroup(taskGroup *TaskGroup) (err error)
- func (controller *TaskController) DeleteTask(id string) (err error)
- func (controller *TaskController) DeleteTaskGroup(id string) (err error)
- func (controller *TaskController) EmitTaskFeedEvent(event string, task *Task)
- func (controller *TaskController) EmitTaskGroupFeedEvent(event string, taskGroup *TaskGroup)
- func (controller *TaskController) Evaluate(task *Task)
- func (controller *TaskController) Execute(taskToExecute *Task)
- func (controller *TaskController) GetTask(id string) (task *Task, err error)
- func (controller *TaskController) GetTaskGroup(id string) (taskGroup *TaskGroup, err error)
- func (controller *TaskController) GetTaskGroupProgress(id string) (completedPercent float64, err error)
- func (controller *TaskController) GetTaskGroups(page int, pageSize int, search string) (taskGroups []*TaskGroup, total int, err error)
- func (controller *TaskController) GetTasksInGroup(taskGroupId string, page int, pageSize int, search string, skipCompleted bool) (tasks []*Task, total int, err error)
- func (controller *TaskController) HandleExecuteError(task *Task, message string)
- func (controller *TaskController) PauseOrResumeTaskGroup(id string, isPaused bool) (err error)
- func (controller *TaskController) ResetTask(task *Task, remainingAttempts int)
- func (controller *TaskController) ResetTaskById(id string, remainingAttempts int) (task *Task, err error)
- func (controller *TaskController) ResetTaskGroup(id string, remainingAttempts int) (err error)
- func (controller *TaskController) RetryTaskById(id string, remainingAttempts int) (task *Task, err error)
- func (controller *TaskController) RetryTaskGroup(id string, remainingAttempts int) (err error)
- func (controller *TaskController) Shutdown() (err error)
- func (controller *TaskController) Startup() (err error)
- func (controller *TaskController) TriggerTaskEvaluate(id string) (err error)
- func (controller *TaskController) UpdateTask(id string, update map[string]interface{}) (updatedTask *Task, err error)
- func (controller *TaskController) UpdateTaskGroup(id string, update map[string]interface{}) (taskGroup *TaskGroup, err error)
- type TaskFeedEvent
- type TaskGroup
- type TaskGroupFeedEvent
- type TaskGroupWatcher
- type TaskStorage
- type ThrottlePopQuery
- type ThrottlePushQuery
- type Throttler
- type WorkerPayload
- type WorkerPayloadParentResult
- type WorkerResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildRestApi ¶
func BuildRestApi(e *echo.Echo, prefix string, controller *TaskController, authMiddleware echo.MiddlewareFunc, loginFunc func(c echo.Context) error, inShutdown *bool, watchers map[string]TaskGroupWatcher)
func ServeRestApi ¶
func ServeRestApi(wg *sync.WaitGroup, controller *TaskController, authMiddleware echo.MiddlewareFunc, loginFunc func(c echo.Context) error) (*http.Server, *echo.Echo)
ServeRestApi starts the REST API server. wg: A waitgroup that the server can use to signal when it is done. controller: The root task controller to use to manage all tasks and task groups. authMiddleware: The echo middleware function that will be used to authenticate API calls. loginFunc: The function that will be used to handle login requests.
Types ¶
type ChildTask ¶
type ChildTask struct { Id string `json:"id"` Name string `json:"name"` Worker string `json:"worker"` Workgroup string `json:"workgroup"` Key string `json:"key"` RemainingAttempts int `json:"remainingAttempts"` IsPaused bool `json:"isPaused"` RunAfter time.Time `json:"runAfter"` ErrorDelayInSeconds int `json:"errorDelayInSeconds"` Input interface{} `json:"input"` ParentIds []string `json:"parentIds"` }
type HttpPostClient ¶
HttpPostClient delivers tasks to workers via http post.
func NewHttpPostClient ¶
func NewHttpPostClient() *HttpPostClient
NewHttpPostClient creates a new HttpPostClient.
func (*HttpPostClient) Post ¶
func (client *HttpPostClient) Post(task *Task, parents []*Task) (response WorkerResponse, err error)
Post delivers a task to a worker.
type MemoryTaskStorage ¶
type MemoryTaskStorage struct {
// contains filtered or unexported fields
}
MemoryTaskStorage is a task storage that only stores state in memory.
func NewMemoryTaskStorage ¶
func NewMemoryTaskStorage() *MemoryTaskStorage
NewMemoryTaskStorage creates a new MemoryTaskStorage.
func (*MemoryTaskStorage) AllTaskGroups ¶
func (storage *MemoryTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)
All TaskGroups returns all task groups.
func (*MemoryTaskStorage) AllTasksInGroup ¶
func (storage *MemoryTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)
All AllTasksInGroup returns all tasks within a group.
func (*MemoryTaskStorage) DeleteTask ¶
func (storage *MemoryTaskStorage) DeleteTask(taskId string) (err error)
Delete task deletes a task by task id.
func (*MemoryTaskStorage) DeleteTaskGroup ¶
func (storage *MemoryTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)
DeleteTaskGroup deletes a task group by task group id.
func (*MemoryTaskStorage) FindTask ¶
func (storage *MemoryTaskStorage) FindTask(taskId string) (task *Task, err error)
FindTask finds a task by task group id and task id.
func (*MemoryTaskStorage) FindTaskGroup ¶
func (storage *MemoryTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)
FindTaskGroup finds a task group by task group id.
func (*MemoryTaskStorage) GetTaskChildren ¶
func (storage *MemoryTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)
GetTaskChildren returns the children of a task.
func (*MemoryTaskStorage) GetTaskParents ¶
func (storage *MemoryTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)
GetTaskParents returns the parents of a task.
func (*MemoryTaskStorage) GetTasksInWorkgroup ¶
func (storage *MemoryTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)
func (*MemoryTaskStorage) GetTasksWithKey ¶
func (storage *MemoryTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)
func (*MemoryTaskStorage) SaveTask ¶
func (storage *MemoryTaskStorage) SaveTask(task *Task, create bool) (err error)
SaveTask saves a task.
func (*MemoryTaskStorage) SaveTaskGroup ¶
func (storage *MemoryTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)
SaveTaskGroup doesn't do anything for memory storage.
func (*MemoryTaskStorage) TryLockTask ¶
func (storage *MemoryTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)
type RedisTaskStorage ¶
type RedisTaskStorage struct { Client *goredislib.Client RedSync *redsync.Redsync }
RedisTaskStorage stores tasks in the filesystem as JSON files.
func NewRedisTaskStorage ¶
func NewRedisTaskStorage(Addr string, Password string, DB int) *RedisTaskStorage
NewRedisTaskStorage creates a new RedisTaskStorage.
func (*RedisTaskStorage) AllTaskGroups ¶
func (storage *RedisTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)
All TaskGroups returns all task groups.
func (*RedisTaskStorage) AllTasksInGroup ¶
func (storage *RedisTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)
All AllTasksInGroup returns all tasks within a group.
func (*RedisTaskStorage) AllTasksInList ¶
func (storage *RedisTaskStorage) AllTasksInList(path string) (tasks []*Task, err error)
func (*RedisTaskStorage) DeleteTask ¶
func (storage *RedisTaskStorage) DeleteTask(taskId string) (err error)
Delete task deletes a task by task id.
func (*RedisTaskStorage) DeleteTaskGroup ¶
func (storage *RedisTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)
DeleteTaskGroup deletes a task group by task group id.
func (*RedisTaskStorage) FindTask ¶
func (storage *RedisTaskStorage) FindTask(taskId string) (task *Task, err error)
FindTask finds a task by task group id and task id.
func (*RedisTaskStorage) FindTaskAtPath ¶
func (storage *RedisTaskStorage) FindTaskAtPath(path string) (task *Task, err error)
func (*RedisTaskStorage) FindTaskGroup ¶
func (storage *RedisTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)
FindTaskGroup finds a task group by task group id.
func (*RedisTaskStorage) FindTaskGroupAtPath ¶
func (storage *RedisTaskStorage) FindTaskGroupAtPath(path string) (taskGroup *TaskGroup, err error)
func (*RedisTaskStorage) GetExpiration ¶
func (storage *RedisTaskStorage) GetExpiration() time.Duration
func (*RedisTaskStorage) GetLockExpiration ¶
func (storage *RedisTaskStorage) GetLockExpiration() time.Duration
func (*RedisTaskStorage) GetTaskChildren ¶
func (storage *RedisTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)
GetTaskChildren returns the children of a task.
func (*RedisTaskStorage) GetTaskParents ¶
func (storage *RedisTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)
GetTaskParents returns the parents of a task.
func (*RedisTaskStorage) GetTasksInWorkgroup ¶
func (storage *RedisTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)
func (*RedisTaskStorage) GetTasksWithKey ¶
func (storage *RedisTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)
func (*RedisTaskStorage) SaveTask ¶
func (storage *RedisTaskStorage) SaveTask(task *Task, create bool) (err error)
SaveTask saves a task.
func (*RedisTaskStorage) SaveTaskGroup ¶
func (storage *RedisTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)
SaveTaskGroup doesn't do anything for memory storage.
func (*RedisTaskStorage) TaskGroupKey ¶
func (storage *RedisTaskStorage) TaskGroupKey(taskGroupId string) string
TaskGroupKey returns the key for a task group.
func (*RedisTaskStorage) TaskGroupsPrefix ¶
func (storage *RedisTaskStorage) TaskGroupsPrefix() string
func (*RedisTaskStorage) TaskKey ¶
func (storage *RedisTaskStorage) TaskKey(taskId string) string
TaskKey returns the key for a task.
func (*RedisTaskStorage) TaskMutexKey ¶
func (storage *RedisTaskStorage) TaskMutexKey(taskId string) string
TaskMutexKey returns the key for a task's lock.
func (*RedisTaskStorage) TryLockTask ¶
func (storage *RedisTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)
type Task ¶
type Task struct { Id string `json:"id"` TaskGroupId string `json:"taskGroupId"` Name string `json:"name"` Worker string `json:"worker"` Workgroup string `json:"workgroup"` Key string `json:"key"` RemainingAttempts int `json:"remainingAttempts"` IsPaused bool `json:"isPaused"` IsComplete bool `json:"isComplete"` RunAfter time.Time `json:"runAfter"` IsSeed bool `json:"isSeed"` ErrorDelayInSeconds int `json:"errorDelayInSeconds"` Input interface{} `json:"input"` Output interface{} `json:"output"` Errors []string `json:"errors"` CreatedAt time.Time `json:"createdAt"` ParentIds []string `json:"parentIds"` BusyExecuting bool `json:"busyExecuting"` Storage TaskStorage `json:"-"` }
A Task represents a unit of work that can be completed by a worker. IMPORTANT! If you change task's fields, also update Task.ts in crew-go-javascript
func (*Task) CanExecute ¶
CanExecute determines if a Task is in a state where it can be executed.
type TaskClient ¶
type TaskClient interface {
Post(task *Task, parents []*Task) (response WorkerResponse, err error)
}
TaskClient defines the interface for delivering tasks to workers.
type TaskController ¶
type TaskController struct { Storage TaskStorage Client TaskClient Feed chan interface{} Throttler *Throttler Pending *sync.WaitGroup AbandonedCheckScheduler *gocron.Scheduler AbandonedCheckMutex *sync.Mutex }
TaskGroup represents a group of tasks.
func NewTaskController ¶
func NewTaskController(storage TaskStorage, client TaskClient, throttler *Throttler) *TaskController
NewTaskController returns a new TaskController.
func (*TaskController) CreateTask ¶
func (controller *TaskController) CreateTask(task *Task) (err error)
func (*TaskController) CreateTaskGroup ¶
func (controller *TaskController) CreateTaskGroup(taskGroup *TaskGroup) (err error)
func (*TaskController) DeleteTask ¶
func (controller *TaskController) DeleteTask(id string) (err error)
func (*TaskController) DeleteTaskGroup ¶
func (controller *TaskController) DeleteTaskGroup(id string) (err error)
func (*TaskController) EmitTaskFeedEvent ¶
func (controller *TaskController) EmitTaskFeedEvent(event string, task *Task)
func (*TaskController) EmitTaskGroupFeedEvent ¶
func (controller *TaskController) EmitTaskGroupFeedEvent(event string, taskGroup *TaskGroup)
func (*TaskController) Evaluate ¶
func (controller *TaskController) Evaluate(task *Task)
func (*TaskController) Execute ¶
func (controller *TaskController) Execute(taskToExecute *Task)
func (*TaskController) GetTask ¶
func (controller *TaskController) GetTask(id string) (task *Task, err error)
func (*TaskController) GetTaskGroup ¶
func (controller *TaskController) GetTaskGroup(id string) (taskGroup *TaskGroup, err error)
func (*TaskController) GetTaskGroupProgress ¶
func (controller *TaskController) GetTaskGroupProgress(id string) (completedPercent float64, err error)
func (*TaskController) GetTaskGroups ¶
func (*TaskController) GetTasksInGroup ¶
func (*TaskController) HandleExecuteError ¶
func (controller *TaskController) HandleExecuteError(task *Task, message string)
func (*TaskController) PauseOrResumeTaskGroup ¶
func (controller *TaskController) PauseOrResumeTaskGroup(id string, isPaused bool) (err error)
func (*TaskController) ResetTask ¶
func (controller *TaskController) ResetTask(task *Task, remainingAttempts int)
func (*TaskController) ResetTaskById ¶
func (controller *TaskController) ResetTaskById(id string, remainingAttempts int) (task *Task, err error)
func (*TaskController) ResetTaskGroup ¶
func (controller *TaskController) ResetTaskGroup(id string, remainingAttempts int) (err error)
func (*TaskController) RetryTaskById ¶
func (controller *TaskController) RetryTaskById(id string, remainingAttempts int) (task *Task, err error)
func (*TaskController) RetryTaskGroup ¶
func (controller *TaskController) RetryTaskGroup(id string, remainingAttempts int) (err error)
func (*TaskController) Shutdown ¶
func (controller *TaskController) Shutdown() (err error)
func (*TaskController) Startup ¶
func (controller *TaskController) Startup() (err error)
func (*TaskController) TriggerTaskEvaluate ¶
func (controller *TaskController) TriggerTaskEvaluate(id string) (err error)
func (*TaskController) UpdateTask ¶
func (controller *TaskController) UpdateTask(id string, update map[string]interface{}) (updatedTask *Task, err error)
func (*TaskController) UpdateTaskGroup ¶
func (controller *TaskController) UpdateTaskGroup(id string, update map[string]interface{}) (taskGroup *TaskGroup, err error)
type TaskFeedEvent ¶
type TaskGroup ¶
type TaskGroup struct { Id string `json:"id"` Name string `json:"name"` CreatedAt time.Time `json:"createdAt"` }
TaskGroup represents a group of tasks. IMPORTANT! If you change task's fields, also update TaskGroup.ts in crew-go-javascript
func NewTaskGroup ¶
NewTaskGroup creates a new TaskGroup.
type TaskGroupFeedEvent ¶
type TaskGroupWatcher ¶
type TaskGroupWatcher struct { TaskGroupId string Channel chan string RequestId string Socket *websocket.Conn }
TaskGroupWatcher is used to collect events from the task group controller and deliver them to a websocket.
type TaskStorage ¶
type TaskStorage interface { SaveTask(task *Task, create bool) (err error) FindTask(taskId string) (task *Task, err error) TryLockTask(taskId string) (unlocker func() error, err error) // UnlockTask(taskId string) (err error) DeleteTask(taskId string) (err error) GetTaskChildren(taskId string) (tasks []*Task, err error) GetTaskParents(taskId string) (tasks []*Task, err error) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error) GetTasksWithKey(key string) (tasks []*Task, err error) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error) AllTaskGroups() (taskGroups []*TaskGroup, err error) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error) DeleteTaskGroup(taskGroupId string) (err error) }
TaskStorage defines the methods required for implementing crew's task storage interface.
type ThrottlePopQuery ¶
ThrottlePopQuery is a request to the throttler to notify that a worker is done.
type ThrottlePushQuery ¶
A ThrottlePushQuery is a request to the throttler to see if there is enough bandwidth for a worker to run.
type Throttler ¶
type Throttler struct { Push chan ThrottlePushQuery Pop chan ThrottlePopQuery }
type WorkerPayload ¶
type WorkerPayload struct { Input interface{} `json:"input"` Worker string `json:"worker"` Parents []WorkerPayloadParentResult `json:"parents"` TaskId string `json:"taskId"` }
WorkerPayload defines the input sent to a worker (post body).
type WorkerPayloadParentResult ¶
type WorkerPayloadParentResult struct { TaskId string `json:"taskId"` Worker string `json:"worker"` Input interface{} `json:"input"` Output interface{} `json:"output"` }
WorkerPayloadParentResult defines the schema for output from a worker.
type WorkerResponse ¶
type WorkerResponse struct { Output interface{} `json:"output"` Children []*ChildTask `json:"children"` WorkgroupDelayInSeconds int `json:"workgroupDelayInSeconds"` ChildrenDelayInSeconds int `json:"childrenDelayInSeconds"` Error interface{} `json:"error"` }
WorkerResponse defines the schema of output returned from workers.