Documentation ¶
Index ¶
- type Id
- type Pool
- type PoolStatus
- type QueueState
- type Task
- func (t *Task) Cancel()
- func (t *Task) CheckExit() bool
- func (t *Task) ClearAndRecompute()
- func (t *Task) ClearTimeout()
- func (t *Task) ExeTime() time.Duration
- func (t *Task) ExitIfSignaled()
- func (t *Task) Fail(err error)
- func (t *Task) GetChildTaskPool() *TaskPool
- func (t *Task) GetMeta() TaskMetadata
- func (t *Task) GetResult(resultKey TaskResultKey) any
- func (t *Task) GetResults() TaskResult
- func (t *Task) GetSignalChan() chan int
- func (t *Task) GetTaskPool() *TaskPool
- func (t *Task) JobName() string
- func (t *Task) Manipulate(fn func(meta TaskMetadata) error) error
- func (t *Task) OnResult(callback func(TaskResult))
- func (t *Task) Q(tp *TaskPool) *Task
- func (t *Task) ReadError() error
- func (t *Task) ReqNoErr(err error)
- func (t *Task) SetChildTaskPool(pool *TaskPool)
- func (t *Task) SetCleanup(cleanup TaskHandler)
- func (t *Task) SetErrorCleanup(cleanup TaskHandler)
- func (t *Task) SetPostAction(action func(TaskResult))
- func (t *Task) SetResult(results TaskResult)
- func (t *Task) SetTimeout(timeout time.Time)
- func (t *Task) Status() (bool, TaskExitStatus)
- func (t *Task) Success(msg ...any)
- func (t *Task) SwLap(label string)
- func (t *Task) TaskId() Id
- func (t *Task) Wait() *Task
- type TaskExitStatus
- type TaskHandler
- type TaskInterface
- type TaskMetadata
- type TaskPool
- func (tp *TaskPool) AddCleanup(fn func(pool Pool))
- func (tp *TaskPool) AddError(t *Task)
- func (tp *TaskPool) Cancel()
- func (tp *TaskPool) CreatedInTask() *Task
- func (tp *TaskPool) Errors() []*Task
- func (tp *TaskPool) GetRootPool() *TaskPool
- func (tp *TaskPool) GetWorkerPool() *WorkerPool
- func (tp *TaskPool) ID() Id
- func (tp *TaskPool) IsGlobal() bool
- func (tp *TaskPool) IsRoot() bool
- func (tp *TaskPool) LockExit()
- func (tp *TaskPool) MarkGlobal()
- func (tp *TaskPool) QueueTask(t *Task) (err error)
- func (tp *TaskPool) RemoveTask(taskId Id)
- func (tp *TaskPool) SignalAllQueued()
- func (tp *TaskPool) Status() PoolStatus
- func (tp *TaskPool) UnlockExit()
- func (tp *TaskPool) Wait(supplementWorker bool, task ...*Task)
- type TaskResult
- type TaskResultKey
- type TaskService
- type WorkerPool
- func (wp *WorkerPool) AddHit(time time.Time, target *Task)
- func (wp *WorkerPool) DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)
- func (wp *WorkerPool) GetTask(taskId Id) *Task
- func (wp *WorkerPool) GetTaskPool(tpId Id) *TaskPool
- func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *TaskPool
- func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
- func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) *TaskPool
- func (wp *WorkerPool) RegisterJob(jobName string, fn TaskHandler)
- func (wp *WorkerPool) Run()
- func (wp *WorkerPool) Status() (int, int, int, int, int)
- func (wp *WorkerPool) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool interface { ID() Id QueueTask(*Task) error MarkGlobal() IsGlobal() bool SignalAllQueued() CreatedInTask() *Task Wait(bool, ...*Task) Cancel() IsRoot() bool Status() PoolStatus AddCleanup(fn func(Pool)) GetRootPool() *TaskPool GetWorkerPool() *WorkerPool LockExit() UnlockExit() RemoveTask(Id) Errors() []*Task AddError(t *Task) }
type PoolStatus ¶
type PoolStatus struct { // The count of tasks that have completed on this task pool. // Complete *DOES* include failed tasks Complete int64 // The count of failed tasks on this task pool Failed int // The count of all tasks that have been queued on this task pool Total int64 // Percent to completion of all tasks Progress float64 // How long the pool has been alive Runtime time.Duration }
type QueueState ¶
type QueueState string
const ( PreQueued QueueState = "pre-queued" InQueue QueueState = "in-queue" Executing QueueState = "executing" Exited QueueState = "exited" )
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func (*Task) Cancel ¶
func (t *Task) Cancel()
Cancel Unknowable if this is the last operation of a task, so t.success() will not have an effect after a task is cancelled. t.error() may override the exit status in special cases, such as a timeout, which is both an error and a reason for cancellation.
Cancellations are always external to the task. From within the body of the task, either error or success should be called. If a task finds itself not required to continue, success should be returned
func (*Task) ClearAndRecompute ¶
func (t *Task) ClearAndRecompute()
func (*Task) ClearTimeout ¶
func (t *Task) ClearTimeout()
func (*Task) ExitIfSignaled ¶
func (t *Task) ExitIfSignaled()
ExitIfSignaled should be used intermittently to check if the task should exit. If the task should exit, it panics back to the top of safety work
func (*Task) Fail ¶
Fail will set the error on the task, and then panic with ErrTaskError, which informs the worker recovery function to exit the task with the error that is set, and not treat it as a real panic.
func (*Task) GetChildTaskPool ¶
func (*Task) GetMeta ¶
func (t *Task) GetMeta() TaskMetadata
func (*Task) GetResult ¶
func (t *Task) GetResult(resultKey TaskResultKey) any
func (*Task) GetResults ¶
func (t *Task) GetResults() TaskResult
func (*Task) GetSignalChan ¶
func (*Task) GetTaskPool ¶
func (*Task) Manipulate ¶
func (t *Task) Manipulate(fn func(meta TaskMetadata) error) error
Manipulate is used to change the metadata of a task while it is running. This can be useful to have a task be waiting for input from a client, and this function can be used to send that data to the task via a chan, for example.
func (*Task) OnResult ¶
func (t *Task) OnResult(callback func(TaskResult))
OnResult takes a function to be run when the task result changes
func (*Task) Q ¶
Q queues task on given taskPool tp, if tp is nil, will default to the global task pool. Essentially an alias for tp.QueueTask(t), so you can NewTask(...).Q(). Returns the given task to further support this
func (*Task) SetChildTaskPool ¶
func (*Task) SetCleanup ¶
func (t *Task) SetCleanup(cleanup TaskHandler)
SetCleanup takes a function to be run after the task has completed, no matter the exit status. Many cleanup functions can be registered to run in sequence after the task completes. The cleanup functions are run in the order they are registered. Modifications to the task state should not be made in the cleanup functions (i.e. read-only), as the task has already completed, and may result in a deadlock. If the task has already completed, this function will NOT be called. Therefore, it is only safe to call SetCleanup() from inside of a task handler. If you want to register a function from outside the task handler, or to run after the task has completed successfully, use t.SetPostAction() instead.
func (*Task) SetErrorCleanup ¶
func (t *Task) SetErrorCleanup(cleanup TaskHandler)
SetErrorCleanup works the same as t.SetCleanup(), but only runs if the task errors
func (*Task) SetPostAction ¶
func (t *Task) SetPostAction(action func(TaskResult))
SetPostAction takes a function to be run after the task has successfully completed with the task results as the input of the function
func (*Task) SetResult ¶
func (t *Task) SetResult(results TaskResult)
func (*Task) SetTimeout ¶
func (*Task) Status ¶
func (t *Task) Status() (bool, TaskExitStatus)
Status returns a boolean representing if a task has completed, and a string describing its exit type, if completed.
type TaskExitStatus ¶
type TaskExitStatus string
const ( TaskSuccess TaskExitStatus = "success" TaskCanceled TaskExitStatus = "cancelled" TaskError TaskExitStatus = "error" TaskNoStatus TaskExitStatus = "" )
type TaskHandler ¶
type TaskHandler func(*Task)
type TaskInterface ¶
type TaskInterface interface { TaskId() Id JobName() string GetTaskPool() *TaskPool GetChildTaskPool() *TaskPool Status() (bool, TaskExitStatus) GetMeta() TaskMetadata GetResult(string) any GetResults() TaskResult Q(pool *TaskPool) *Task Wait() *Task Cancel() SwLap(string) ClearTimeout() ReadError() any ClearAndRecompute() SetPostAction(action func(TaskResult)) SetCleanup(cleanup func()) SetErrorCleanup(cleanup func()) ExeTime() time.Duration }
type TaskMetadata ¶
type TaskMetadata interface { JobName() string MetaString() string FormatToResult() TaskResult Verify() error }
type TaskPool ¶
type TaskPool struct {
// contains filtered or unexported fields
}
func (*TaskPool) AddCleanup ¶
func (*TaskPool) CreatedInTask ¶
func (*TaskPool) GetRootPool ¶
func (*TaskPool) GetWorkerPool ¶
func (tp *TaskPool) GetWorkerPool() *WorkerPool
func (*TaskPool) MarkGlobal ¶
func (tp *TaskPool) MarkGlobal()
MarkGlobal specifies the work queue as being a "global" one
func (*TaskPool) RemoveTask ¶
func (*TaskPool) SignalAllQueued ¶
func (tp *TaskPool) SignalAllQueued()
func (*TaskPool) Status ¶
func (tp *TaskPool) Status() PoolStatus
func (*TaskPool) UnlockExit ¶
func (tp *TaskPool) UnlockExit()
func (*TaskPool) Wait ¶
Wait Parks the thread on the work queue until all the tasks have been queued and finish. **If you never call tp.SignalAllQueued(), the waiters will never wake up** Make sure that you SignalAllQueued before parking here if it is the only thread loading tasks. If you are parking a thread that is currently executing a task, you can pass that task in as well, and that task will also listen for exit events.
type TaskResult ¶
type TaskResult map[TaskResultKey]any
func (TaskResult) ToMap ¶
func (tr TaskResult) ToMap() map[string]any
type TaskResultKey ¶
type TaskResultKey string
type TaskService ¶
type TaskService interface { RegisterJob(jobName string, fn TaskHandler) NewTaskPool(replace bool, createdBy *Task) *TaskPool GetTaskPoolByJobName(jobName string) *TaskPool GetTasksByJobName(jobName string) []*Task GetTask(taskId Id) *Task GetTaskPool(Id) *TaskPool DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error) }
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(initWorkers int, logLevel int) *WorkerPool
func (*WorkerPool) DispatchJob ¶
func (wp *WorkerPool) DispatchJob(jobName string, meta TaskMetadata, pool *TaskPool) (*Task, error)
func (*WorkerPool) GetTask ¶
func (wp *WorkerPool) GetTask(taskId Id) *Task
func (*WorkerPool) GetTaskPool ¶
func (wp *WorkerPool) GetTaskPool(tpId Id) *TaskPool
func (*WorkerPool) GetTaskPoolByJobName ¶
func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *TaskPool
func (*WorkerPool) GetTasksByJobName ¶
func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
func (*WorkerPool) NewTaskPool ¶
func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) *TaskPool
NewTaskPool `replace` spawns a temporary replacement thread on the parent worker pool. This prevents a deadlock when the queue fills up while adding many tasks, and none are being executed
`parent` allows chaining of task pools for floating updates to the top. This makes it possible for clients to subscribe to a single task, and get notified about all of the sub-updates of that task See taskPool.go
func (*WorkerPool) RegisterJob ¶
func (wp *WorkerPool) RegisterJob(jobName string, fn TaskHandler)
RegisterJob adds a template for a repeatable job that can be called upon later in the program
func (*WorkerPool) Run ¶
func (wp *WorkerPool) Run()
Run launches the standard threads for this worker pool
func (*WorkerPool) Status ¶
Status returns the count of tasks in the queue, the total number of tasks accepted, number of busy workers, and the total number of live workers in the worker pool
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()