Documentation ¶
Index ¶
- func IsRunnableTaskStatus(status api.TaskStatus) bool
- type ChangeBroadcaster
- type LogStorage
- type PersistenceService
- type StateMachine
- func (sm *StateMachine) CheckStuck(ctx context.Context)
- func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, ...) error
- func (sm *StateMachine) RequeueActiveTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error
- func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob(ctx context.Context, worker *persistence.Worker, job *persistence.Job, ...) error
- func (sm *StateMachine) TaskStatusChange(ctx context.Context, task *persistence.Task, newTaskStatus api.TaskStatus) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsRunnableTaskStatus ¶
func IsRunnableTaskStatus(status api.TaskStatus) bool
IsRunnableTaskStatus returns whether the given status is considered "runnable". In other words, workers are allowed to keep running such tasks.
Types ¶
type ChangeBroadcaster ¶
type ChangeBroadcaster interface { // BroadcastJobUpdate sends the job update to SocketIO clients. BroadcastJobUpdate(jobUpdate api.EventJobUpdate) // BroadcastTaskUpdate sends the task update to SocketIO clients. BroadcastTaskUpdate(jobUpdate api.EventTaskUpdate) }
type LogStorage ¶
type LogStorage interface {
WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error
}
LogStorage writes to task logs.
type PersistenceService ¶
type PersistenceService interface { SaveTask(ctx context.Context, task *persistence.Task) error SaveTaskStatus(ctx context.Context, t *persistence.Task) error SaveTaskActivity(ctx context.Context, t *persistence.Task) error SaveJobStatus(ctx context.Context, j *persistence.Job) error JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error) CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error) // UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`. UpdateJobsTaskStatuses(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus, activity string) error // UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`, // limited to those tasks with status in `statusesToUpdate`. UpdateJobsTaskStatusesConditional(ctx context.Context, job *persistence.Job, statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error) FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error) FetchTasksOfWorkerInStatusOfJob(context.Context, *persistence.Worker, api.TaskStatus, *persistence.Job) ([]*persistence.Task, error) }
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
StateMachine handles task and job status changes.
func NewStateMachine ¶
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine
func (*StateMachine) CheckStuck ¶
func (sm *StateMachine) CheckStuck(ctx context.Context)
CheckStuck finds jobs that are 'stuck' in their current status. This is meant to run at startup of Flamenco Manager, and checks to see if there are any jobs in a status that a human will not be able to fix otherwise.
func (*StateMachine) JobStatusChange ¶
func (sm *StateMachine) JobStatusChange( ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string, ) error
JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
func (*StateMachine) RequeueActiveTasksOfWorker ¶
func (sm *StateMachine) RequeueActiveTasksOfWorker( ctx context.Context, worker *persistence.Worker, reason string, ) error
RequeueActiveTasksOfWorker re-queues all active tasks (should be max one) of this worker.
`reason`: a string that can be appended to text like "Task requeued because "
func (*StateMachine) RequeueFailedTasksOfWorkerOfJob ¶
func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob( ctx context.Context, worker *persistence.Worker, job *persistence.Job, reason string, ) error
RequeueFailedTasksOfWorkerOfJob re-queues all failed tasks of this worker on this job.
`reason`: a string that can be appended to text like "Task requeued because "
func (*StateMachine) TaskStatusChange ¶
func (sm *StateMachine) TaskStatusChange( ctx context.Context, task *persistence.Task, newTaskStatus api.TaskStatus, ) error
TaskStatusChange updates the task's status to the new one. `task` is expected to still have its original status, and have a filled `Job` pointer.