task

package
v0.0.0-...-cbbc4cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Id

type Id = string

type Pool

type Pool interface {
	ID() Id

	QueueTask(*Task) error

	MarkGlobal()
	IsGlobal() bool
	SignalAllQueued()

	CreatedInTask() *Task

	Wait(bool, ...*Task)
	Cancel()

	IsRoot() bool
	Status() PoolStatus
	AddCleanup(fn func(Pool))

	GetRootPool() *TaskPool
	GetWorkerPool() *WorkerPool

	LockExit()
	UnlockExit()

	RemoveTask(Id)

	Errors() []*Task
	AddError(t *Task)
}

type PoolStatus

type PoolStatus struct {
	// The count of tasks that have completed on this task pool.
	// Complete *DOES* include failed tasks
	Complete int64

	// The count of failed tasks on this task pool
	Failed int

	// The count of all tasks that have been queued on this task pool
	Total int64

	// Percent to completion of all tasks
	Progress float64

	// How long the pool has been alive
	Runtime time.Duration
}

type QueueState

type QueueState string
const (
	PreQueued QueueState = "pre-queued"
	InQueue   QueueState = "in-queue"
	Executing QueueState = "executing"
	Exited    QueueState = "exited"
)

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) Cancel

func (t *Task) Cancel()

Cancel Unknowable if this is the last operation of a task, so t.success() will not have an effect after a task is cancelled. t.error() may override the exit status in special cases, such as a timeout, which is both an error and a reason for cancellation.

Cancellations are always external to the task. From within the body of the task, either error or success should be called. If a task finds itself not required to continue, success should be returned

func (*Task) CheckExit

func (t *Task) CheckExit() bool

func (*Task) ClearAndRecompute

func (t *Task) ClearAndRecompute()

func (*Task) ClearTimeout

func (t *Task) ClearTimeout()

func (*Task) ExeTime

func (t *Task) ExeTime() time.Duration

func (*Task) ExitIfSignaled

func (t *Task) ExitIfSignaled()

ExitIfSignaled should be used intermittently to check if the task should exit. If the task should exit, it panics back to the top of safety work

func (*Task) Fail

func (t *Task) Fail(err error)

Fail will set the error on the task, and then panic with ErrTaskError, which informs the worker recovery function to exit the task with the error that is set, and not treat it as a real panic.

func (*Task) GetChildTaskPool

func (t *Task) GetChildTaskPool() *TaskPool

func (*Task) GetMeta

func (t *Task) GetMeta() TaskMetadata

func (*Task) GetResult

func (t *Task) GetResult(resultKey TaskResultKey) any

func (*Task) GetResults

func (t *Task) GetResults() TaskResult

func (*Task) GetSignalChan

func (t *Task) GetSignalChan() chan int

func (*Task) GetTaskPool

func (t *Task) GetTaskPool() *TaskPool

func (*Task) JobName

func (t *Task) JobName() string

func (*Task) Manipulate

func (t *Task) Manipulate(fn func(meta TaskMetadata) error) error

Manipulate is used to change the metadata of a task while it is running. This can be useful to have a task be waiting for input from a client, and this function can be used to send that data to the task via a chan, for example.

func (*Task) OnResult

func (t *Task) OnResult(callback func(TaskResult))

OnResult takes a function to be run when the task result changes

func (*Task) Q

func (t *Task) Q(tp *TaskPool) *Task

Q queues task on given taskPool tp, if tp is nil, will default to the global task pool. Essentially an alias for tp.QueueTask(t), so you can NewTask(...).Q(). Returns the given task to further support this

func (*Task) ReadError

func (t *Task) ReadError() error

func (*Task) ReqNoErr

func (t *Task) ReqNoErr(err error)

ReqNoErr is a wrapper around t.Fail, but only fails if the error is not nil

func (*Task) SetChildTaskPool

func (t *Task) SetChildTaskPool(pool *TaskPool)

func (*Task) SetCleanup

func (t *Task) SetCleanup(cleanup TaskHandler)

SetCleanup takes a function to be run after the task has completed, no matter the exit status. Many cleanup functions can be registered to run in sequence after the task completes. The cleanup functions are run in the order they are registered. Modifications to the task state should not be made in the cleanup functions (i.e. read-only), as the task has already completed, and may result in a deadlock. If the task has already completed, this function will NOT be called. Therefore, it is only safe to call SetCleanup() from inside of a task handler. If you want to register a function from outside the task handler, or to run after the task has completed successfully, use t.SetPostAction() instead.

func (*Task) SetErrorCleanup

func (t *Task) SetErrorCleanup(cleanup TaskHandler)

SetErrorCleanup works the same as t.SetCleanup(), but only runs if the task errors

func (*Task) SetPostAction

func (t *Task) SetPostAction(action func(TaskResult))

SetPostAction takes a function to be run after the task has successfully completed with the task results as the input of the function

func (*Task) SetResult

func (t *Task) SetResult(results TaskResult)

func (*Task) SetTimeout

func (t *Task) SetTimeout(timeout time.Time)

func (*Task) Status

func (t *Task) Status() (bool, TaskExitStatus)

Status returns a boolean representing if a task has completed, and a string describing its exit type, if completed.

func (*Task) Success

func (t *Task) Success(msg ...any)

func (*Task) SwLap

func (t *Task) SwLap(label string)

Add a lap in the tasks stopwatch

func (*Task) TaskId

func (t *Task) TaskId() Id

func (*Task) Wait

func (t *Task) Wait() *Task

Wait Block until a task is finished. "Finished" can define success, failure, or cancel

type TaskExitStatus

type TaskExitStatus string
const (
	TaskSuccess  TaskExitStatus = "success"
	TaskCanceled TaskExitStatus = "cancelled"
	TaskError    TaskExitStatus = "error"
	TaskNoStatus TaskExitStatus = ""
)

type TaskHandler

type TaskHandler func(*Task)

type TaskInterface

type TaskInterface interface {
	TaskId() Id
	JobName() string
	GetTaskPool() *TaskPool
	GetChildTaskPool() *TaskPool
	Status() (bool, TaskExitStatus)
	GetMeta() TaskMetadata
	GetResult(string) any
	GetResults() TaskResult

	Q(pool *TaskPool) *Task

	Wait() *Task
	Cancel()

	SwLap(string)
	ClearTimeout()

	ReadError() any
	ClearAndRecompute()
	SetPostAction(action func(TaskResult))
	SetCleanup(cleanup func())
	SetErrorCleanup(cleanup func())

	ExeTime() time.Duration
}

type TaskMetadata

type TaskMetadata interface {
	JobName() string
	MetaString() string
	FormatToResult() TaskResult
	Verify() error
}

type TaskPool

type TaskPool struct {
	// contains filtered or unexported fields
}

func (*TaskPool) AddCleanup

func (tp *TaskPool) AddCleanup(fn func(pool Pool))

func (*TaskPool) AddError

func (tp *TaskPool) AddError(t *Task)

func (*TaskPool) Cancel

func (tp *TaskPool) Cancel()

func (*TaskPool) CreatedInTask

func (tp *TaskPool) CreatedInTask() *Task

func (*TaskPool) Errors

func (tp *TaskPool) Errors() []*Task

func (*TaskPool) GetRootPool

func (tp *TaskPool) GetRootPool() *TaskPool

func (*TaskPool) GetWorkerPool

func (tp *TaskPool) GetWorkerPool() *WorkerPool

func (*TaskPool) ID

func (tp *TaskPool) ID() Id

func (*TaskPool) IsGlobal

func (tp *TaskPool) IsGlobal() bool

func (*TaskPool) IsRoot

func (tp *TaskPool) IsRoot() bool

func (*TaskPool) LockExit

func (tp *TaskPool) LockExit()

func (*TaskPool) MarkGlobal

func (tp *TaskPool) MarkGlobal()

MarkGlobal specifies the work queue as being a "global" one

func (*TaskPool) QueueTask

func (tp *TaskPool) QueueTask(t *Task) (err error)

func (*TaskPool) RemoveTask

func (tp *TaskPool) RemoveTask(taskId Id)

func (*TaskPool) SignalAllQueued

func (tp *TaskPool) SignalAllQueued()

func (*TaskPool) Status

func (tp *TaskPool) Status() PoolStatus

func (*TaskPool) UnlockExit

func (tp *TaskPool) UnlockExit()

func (*TaskPool) Wait

func (tp *TaskPool) Wait(supplementWorker bool, task ...*Task)

Wait Parks the thread on the work queue until all the tasks have been queued and finish. **If you never call tp.SignalAllQueued(), the waiters will never wake up** Make sure that you SignalAllQueued before parking here if it is the only thread loading tasks. If you are parking a thread that is currently executing a task, you can pass that task in as well, and that task will also listen for exit events.

type TaskResult

type TaskResult map[TaskResultKey]any

func (TaskResult) ToMap

func (tr TaskResult) ToMap() map[string]any

type TaskResultKey

type TaskResultKey string

type TaskService

type TaskService interface {
	RegisterJob(jobName string, fn TaskHandler)
	NewTaskPool(replace bool, createdBy *Task) *TaskPool
	GetTaskPoolByJobName(jobName string) *TaskPool
	GetTasksByJobName(jobName string) []*Task

	GetTask(taskId Id) *Task
	GetTaskPool(Id) *TaskPool

	DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)
}

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(initWorkers int, logLevel int) *WorkerPool

func (*WorkerPool) AddHit

func (wp *WorkerPool) AddHit(time time.Time, target *Task)

func (*WorkerPool) DispatchJob

func (wp *WorkerPool) DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)

func (*WorkerPool) GetTask

func (wp *WorkerPool) GetTask(taskId Id) *Task

func (*WorkerPool) GetTaskPool

func (wp *WorkerPool) GetTaskPool(tpId Id) *TaskPool

func (*WorkerPool) GetTaskPoolByJobName

func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *TaskPool

func (*WorkerPool) GetTasksByJobName

func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task

func (*WorkerPool) NewTaskPool

func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) *TaskPool

NewTaskPool `replace` spawns a temporary replacement thread on the parent worker pool. This prevents a deadlock when the queue fills up while adding many tasks, and none are being executed

`parent` allows chaining of task pools for floating updates to the top. This makes it possible for clients to subscribe to a single task, and get notified about all of the sub-updates of that task See taskPool.go

func (*WorkerPool) RegisterJob

func (wp *WorkerPool) RegisterJob(jobName string, fn TaskHandler)

RegisterJob adds a template for a repeatable job that can be called upon later in the program

func (*WorkerPool) Run

func (wp *WorkerPool) Run()

Run launches the standard threads for this worker pool

func (*WorkerPool) Status

func (wp *WorkerPool) Status() (int, int, int, int, int)

Status returns the count of tasks in the queue, the total number of tasks accepted, number of busy workers, and the total number of live workers in the worker pool

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL