task_state_machine

package
v0.0.0-...-dfed899 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsRunnableTaskStatus

func IsRunnableTaskStatus(status api.TaskStatus) bool

IsRunnableTaskStatus returns whether the given status is considered "runnable". In other words, workers are allowed to keep running such tasks.

Types

type ChangeBroadcaster

type ChangeBroadcaster interface {
	// BroadcastJobUpdate sends the job update to SocketIO clients.
	BroadcastJobUpdate(jobUpdate api.EventJobUpdate)

	// BroadcastTaskUpdate sends the task update to SocketIO clients.
	BroadcastTaskUpdate(jobUpdate api.EventTaskUpdate)
}

type LogStorage

type LogStorage interface {
	WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error
}

LogStorage writes to task logs.

type PersistenceService

type PersistenceService interface {
	SaveTask(ctx context.Context, task *persistence.Task) error
	SaveTaskStatus(ctx context.Context, t *persistence.Task) error
	SaveTaskActivity(ctx context.Context, t *persistence.Task) error
	SaveJobStatus(ctx context.Context, j *persistence.Job) error

	JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error)
	CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error)

	// UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`.
	UpdateJobsTaskStatuses(ctx context.Context, job *persistence.Job,
		taskStatus api.TaskStatus, activity string) error

	// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
	// limited to those tasks with status in `statusesToUpdate`.
	UpdateJobsTaskStatusesConditional(ctx context.Context, job *persistence.Job,
		statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error

	FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error)
	FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error)
	FetchTasksOfWorkerInStatusOfJob(context.Context, *persistence.Worker, api.TaskStatus, *persistence.Job) ([]*persistence.Task, error)
}

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

StateMachine handles task and job status changes.

func NewStateMachine

func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine

func (*StateMachine) CheckStuck

func (sm *StateMachine) CheckStuck(ctx context.Context)

CheckStuck finds jobs that are 'stuck' in their current status. This is meant to run at startup of Flamenco Manager, and checks to see if there are any jobs in a status that a human will not be able to fix otherwise.

func (*StateMachine) JobStatusChange

func (sm *StateMachine) JobStatusChange(
	ctx context.Context,
	job *persistence.Job,
	newJobStatus api.JobStatus,
	reason string,
) error

JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.

func (*StateMachine) RequeueActiveTasksOfWorker

func (sm *StateMachine) RequeueActiveTasksOfWorker(
	ctx context.Context,
	worker *persistence.Worker,
	reason string,
) error

RequeueActiveTasksOfWorker re-queues all active tasks (should be max one) of this worker.

`reason`: a string that can be appended to text like "Task requeued because "

func (*StateMachine) RequeueFailedTasksOfWorkerOfJob

func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob(
	ctx context.Context,
	worker *persistence.Worker,
	job *persistence.Job,
	reason string,
) error

RequeueFailedTasksOfWorkerOfJob re-queues all failed tasks of this worker on this job.

`reason`: a string that can be appended to text like "Task requeued because "

func (*StateMachine) TaskStatusChange

func (sm *StateMachine) TaskStatusChange(
	ctx context.Context,
	task *persistence.Task,
	newTaskStatus api.TaskStatus,
) error

TaskStatusChange updates the task's status to the new one. `task` is expected to still have its original status, and have a filled `Job` pointer.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL