Documentation ¶
Index ¶
- type Id
- type Pool
- type PoolStatus
- type QueueState
- type Task
- func (t *Task) Cancel()
- func (t *Task) CheckExit() bool
- func (t *Task) ClearAndRecompute()
- func (t *Task) ClearTimeout()
- func (t *Task) ExeTime() time.Duration
- func (t *Task) ExitIfSignaled()
- func (t *Task) Fail(err error)
- func (t *Task) GetChildTaskPool() *TaskPool
- func (t *Task) GetMeta() TaskMetadata
- func (t *Task) GetResult(resultKey string) any
- func (t *Task) GetResults() TaskResult
- func (t *Task) GetSignalChan() chan int
- func (t *Task) GetTaskPool() *TaskPool
- func (t *Task) JobName() string
- func (t *Task) Manipulate(fn func(meta TaskMetadata) error) error
- func (t *Task) OnResult(callback func(TaskResult))
- func (t *Task) Q(tp *TaskPool) *Task
- func (t *Task) ReadError() error
- func (t *Task) ReqNoErr(err error)
- func (t *Task) SetChildTaskPool(pool *TaskPool)
- func (t *Task) SetCleanup(cleanup TaskHandler)
- func (t *Task) SetErrorCleanup(cleanup TaskHandler)
- func (t *Task) SetPostAction(action func(TaskResult))
- func (t *Task) SetResult(results TaskResult)
- func (t *Task) SetTimeout(timeout time.Time)
- func (t *Task) Status() (bool, TaskExitStatus)
- func (t *Task) Success(msg ...any)
- func (t *Task) SwLap(label string)
- func (t *Task) TaskId() Id
- func (t *Task) Wait() *Task
- type TaskExitStatus
- type TaskHandler
- type TaskInterface
- type TaskMetadata
- type TaskPool
- func (tp *TaskPool) AddCleanup(fn func(pool Pool))
- func (tp *TaskPool) AddError(t *Task)
- func (tp *TaskPool) Cancel()
- func (tp *TaskPool) CreatedInTask() *Task
- func (tp *TaskPool) Errors() []*Task
- func (tp *TaskPool) GetRootPool() *TaskPool
- func (tp *TaskPool) GetWorkerPool() *WorkerPool
- func (tp *TaskPool) ID() Id
- func (tp *TaskPool) IsGlobal() bool
- func (tp *TaskPool) IsRoot() bool
- func (tp *TaskPool) LockExit()
- func (tp *TaskPool) MarkGlobal()
- func (tp *TaskPool) QueueTask(t *Task) (err error)
- func (tp *TaskPool) RemoveTask(taskId Id)
- func (tp *TaskPool) SignalAllQueued()
- func (tp *TaskPool) Status() PoolStatus
- func (tp *TaskPool) UnlockExit()
- func (tp *TaskPool) Wait(supplementWorker bool)
- type TaskResult
- type TaskService
- type WorkerPool
- func (wp *WorkerPool) AddHit(time time.Time, target *Task)
- func (wp *WorkerPool) DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)
- func (wp *WorkerPool) GetTask(taskId Id) *Task
- func (wp *WorkerPool) GetTaskPool(tpId Id) *TaskPool
- func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *TaskPool
- func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
- func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) *TaskPool
- func (wp *WorkerPool) RegisterJob(jobName string, fn TaskHandler)
- func (wp *WorkerPool) Run()
- func (wp *WorkerPool) Status() (int, int, int, int, int)
- func (wp *WorkerPool) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool interface { ID() Id QueueTask(*Task) error MarkGlobal() IsGlobal() bool SignalAllQueued() CreatedInTask() *Task Wait(bool) Cancel() IsRoot() bool Status() PoolStatus AddCleanup(fn func(Pool)) GetRootPool() *TaskPool GetWorkerPool() *WorkerPool LockExit() UnlockExit() RemoveTask(Id) Errors() []*Task AddError(t *Task) }
type PoolStatus ¶
type PoolStatus struct { // The count of tasks that have completed on this task pool. // Complete *DOES* include failed tasks Complete int64 // The count of failed tasks on this task pool Failed int // The count of all tasks that have been queued on this task pool Total int64 // Percent to completion of all tasks Progress float64 // How long the pool has been alive Runtime time.Duration }
type QueueState ¶
type QueueState string
const ( PreQueued QueueState = "pre-queued" InQueue QueueState = "in-queue" Executing QueueState = "executing" Exited QueueState = "exited" )
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func (*Task) Cancel ¶
func (t *Task) Cancel()
Cancel Unknowable if this is the last operation of a task, so t.success() will not have an effect after a task is cancelled. t.error() may override the exit status in special cases, such as a timeout, which is both an error and a reason for cancellation.
Cancellations are always external to the task. From within the body of the task, either error or success should be called. If a task finds itself not required to continue, success should be returned
func (*Task) ClearAndRecompute ¶
func (t *Task) ClearAndRecompute()
func (*Task) ClearTimeout ¶
func (t *Task) ClearTimeout()
func (*Task) ExitIfSignaled ¶
func (t *Task) ExitIfSignaled()
ExitIfSignaled should be used intermittently to check if the task should exit. If the task should exit, it panics back to the top of safety work
func (*Task) Fail ¶
Fail will set the error on the tsak, and then panic with ErrTaskError, which informs the worker recovery function to exit the task with the error that is set, and not treat it as a real panic.
func (*Task) GetChildTaskPool ¶
func (*Task) GetMeta ¶
func (t *Task) GetMeta() TaskMetadata
func (*Task) GetResults ¶
func (t *Task) GetResults() TaskResult
func (*Task) GetSignalChan ¶
func (*Task) GetTaskPool ¶
func (*Task) Manipulate ¶
func (t *Task) Manipulate(fn func(meta TaskMetadata) error) error
Manipulate is used to change the metadata of a task while it is running. This can be useful to have a task be waiting for input from a client, and this function can be used to send that data to the task via a chan, for example.
func (*Task) OnResult ¶
func (t *Task) OnResult(callback func(TaskResult))
OnResult takes a function to be run when the task result changes
func (*Task) Q ¶
Q queues task on given taskPool tp, if tp is nil, will default to the global task pool. Essentially an alias for tp.QueueTask(t), so you can NewTask(...).Q(). Returns the given task to further support this
func (*Task) SetChildTaskPool ¶
func (*Task) SetCleanup ¶
func (t *Task) SetCleanup(cleanup TaskHandler)
func (*Task) SetErrorCleanup ¶
func (t *Task) SetErrorCleanup(cleanup TaskHandler)
Pass a function to run if the task throws an error, in theory to cleanup any half-processed state that could litter if not finished
func (*Task) SetPostAction ¶
func (t *Task) SetPostAction(action func(TaskResult))
SetPostAction takes a function to be run after the task has successfully completed with the task results as the input of the function
func (*Task) SetResult ¶
func (t *Task) SetResult(results TaskResult)
func (*Task) SetTimeout ¶
func (*Task) Status ¶
func (t *Task) Status() (bool, TaskExitStatus)
Status returns a boolean representing if a task has completed, and a string describing its exit type, if completed.
type TaskExitStatus ¶
type TaskExitStatus = string
const ( TaskSuccess TaskExitStatus = "success" TaskCanceled TaskExitStatus = "cancelled" TaskError TaskExitStatus = "error" TaskNoStatus TaskExitStatus = "" )
type TaskHandler ¶
type TaskHandler func(*Task)
type TaskInterface ¶
type TaskInterface interface { TaskId() Id JobName() string GetTaskPool() *TaskPool GetChildTaskPool() *TaskPool Status() (bool, TaskExitStatus) GetMeta() TaskMetadata GetResult(string) any GetResults() TaskResult Q(pool *TaskPool) *Task Wait() *Task Cancel() SwLap(string) ClearTimeout() ReadError() any ClearAndRecompute() SetPostAction(action func(TaskResult)) SetCleanup(cleanup func()) SetErrorCleanup(cleanup func()) ExeTime() time.Duration }
type TaskMetadata ¶
type TaskMetadata interface { JobName() string MetaString() string FormatToResult() TaskResult Verify() error }
type TaskPool ¶
type TaskPool struct {
// contains filtered or unexported fields
}
func (*TaskPool) AddCleanup ¶
func (*TaskPool) CreatedInTask ¶
func (*TaskPool) GetRootPool ¶
func (*TaskPool) GetWorkerPool ¶
func (tp *TaskPool) GetWorkerPool() *WorkerPool
func (*TaskPool) MarkGlobal ¶
func (tp *TaskPool) MarkGlobal()
MarkGlobal specifies the work queue as being a "global" one
func (*TaskPool) RemoveTask ¶
func (*TaskPool) SignalAllQueued ¶
func (tp *TaskPool) SignalAllQueued()
func (*TaskPool) Status ¶
func (tp *TaskPool) Status() PoolStatus
func (*TaskPool) UnlockExit ¶
func (tp *TaskPool) UnlockExit()
type TaskResult ¶
type TaskService ¶
type TaskService interface { RegisterJob(jobName string, fn TaskHandler) NewTaskPool(replace bool, createdBy *Task) *TaskPool GetTaskPoolByJobName(jobName string) *TaskPool GetTasksByJobName(jobName string) []*Task GetTask(taskId Id) *Task GetTaskPool(Id) *TaskPool DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error) }
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(initWorkers int, logLevel int) *WorkerPool
func (*WorkerPool) DispatchJob ¶
func (wp *WorkerPool) DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)
func (*WorkerPool) GetTask ¶
func (wp *WorkerPool) GetTask(taskId Id) *Task
func (*WorkerPool) GetTaskPool ¶
func (wp *WorkerPool) GetTaskPool(tpId Id) *TaskPool
func (*WorkerPool) GetTaskPoolByJobName ¶
func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *TaskPool
func (*WorkerPool) GetTasksByJobName ¶
func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
func (*WorkerPool) NewTaskPool ¶
func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) *TaskPool
NewTaskPool `replace` spawns a temporary replacement thread on the parent worker pool. This prevents a deadlock when the queue fills up while adding many tasks, and none are being executed
`parent` allows chaining of task pools for floating updates to the top. This makes it possible for clients to subscribe to a single task, and get notified about all of the sub-updates of that task See taskPool.go
func (*WorkerPool) RegisterJob ¶
func (wp *WorkerPool) RegisterJob(jobName string, fn TaskHandler)
RegisterJob adds a template for a repeatable job that can be called upon later in the program
func (*WorkerPool) Run ¶
func (wp *WorkerPool) Run()
Run launches the standard threads for this worker pool
func (*WorkerPool) Status ¶
Status returns the count of tasks in the queue, the total number of tasks accepted, number of busy workers, and the total number of live workers in the worker pool
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()