Documentation ¶
Overview ¶
Package harmonytask implements a pure (no task logic), distributed task manager. This clean interface allows a task implementer to completely avoid being concerned with task scheduling and management. It's based on the idea of tasks as small units of work broken from other work by hardware, parallelizabilty, reliability, or any other reason. Workers will be Greedy: vaccuuming up their favorite jobs from a list. Once 1 task is accepted, harmonydb tries to get other task runner machines to accept work (round robin) before trying again to accept. * Mental Model:
Things that block tasks: - task not registered for any running server - max was specified and reached - resource exhaustion - CanAccept() interface (per-task implmentation) does not accept it. Ways tasks start: - DB Read every 3 seconds - Task was added (to db) by this process Ways tasks get added: - Async Listener task (for chain, etc) - Followers: Tasks get added because another task completed When Follower collectors run: - If both sides are process-local, then this process will pick it up. - If properly registered already, the http endpoint will be tried to start it. - Otherwise, at the listen interval during db scrape it will be found. How duplicate tasks are avoided: - that's up to the task definition, but probably a unique key
* To use: 1.Implement TaskInterface for a new task. 2. Have New() receive this & all other ACTIVE implementations. * * As we are not expecting DBAs in this database, it's important to know what grows uncontrolled. The only growing harmony_* table is harmony_task_history (somewhat quickly). These will need a clean-up for after the task data could never be acted upon. but the design **requires** extraInfo tables to grow until the task's info could not possibly be used by a following task, including slow release rollout. This would normally be in the order of months old. * Other possible enhancements include more collaborative coordination to assign a task to machines closer to the data.
__Database_Behavior__ harmony_task is the list of work that has not been completed.
AddTaskFunc manages the additions, but is designed to have its transactions failed-out on overlap with a similar task already written. It's up to the TaskInterface implementer to discover this overlap via some other table it uses (since overlap can mean very different things).
harmony_task_history
This holds transactions that completed or saw too many retries. It also serves as input for subsequent (follower) tasks to kick off. This is not done machine-internally because a follower may not be on the same machine as the previous task.
harmony_task_machines
Managed by lib/harmony/resources, this is a reference to machines registered via the resources. This registration does not obligate the machine to anything, but serves as a discovery mechanism. Paths are hostnames + ports which are presumed to support http, but this assumption is only used by the task system.
Index ¶
Constants ¶
const ( WorkSourcePoller = "poller" WorkSourceRecover = "recovered" WorkSourceIAmBored = "bored" )
Variables ¶
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often
var POLL_DURATION = time.Second * 3 // Poll for Work this frequently
Consts (except for unit test)
var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait this long before scheduling another
Functions ¶
func SingletonTaskAdder ¶ added in v1.27.0
func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error
Types ¶
type AddTaskFunc ¶
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error))
AddTaskFunc is responsible for adding a task's details "extra info" to the DB. It should return true if the task should be added, false if it was already there. This is typically accomplished with a "unique" index on your detals table that would cause the insert to fail. The error indicates that instead of a conflict (which we should ignore) that we actually have a serious problem that needs to be logged with context.
type TaskEngine ¶
type TaskEngine struct { WorkOrigin string // contains filtered or unexported fields }
func New ¶
func New( db *harmonydb.DB, impls []TaskInterface, hostnameAndPort string) (*TaskEngine, error)
New creates all the task definitions. Note that TaskEngine knows nothing about the tasks themselves and serves to be a generic container for common work
func (*TaskEngine) GracefullyTerminate ¶
func (e *TaskEngine) GracefullyTerminate()
GracefullyTerminate hangs until all present tasks have completed. Call this to cleanly exit the process. As some processes are long-running, passing a deadline will ignore those still running (to be picked-up later).
func (*TaskEngine) ResourcesAvailable ¶
func (e *TaskEngine) ResourcesAvailable() resources.Resources
ResourcesAvailable determines what resources are still unassigned.
type TaskInterface ¶
type TaskInterface interface { // Do the task assigned. Call stillOwned before making single-writer-only // changes to ensure the work has not been stolen. // This is the ONLY function that should attempt to do the work, and must // ONLY be called by harmonytask. // Indicate if the task no-longer needs scheduling with done=true including // cases where it's past the deadline. Do(taskID TaskID, stillOwned func() bool) (done bool, err error) // CanAccept should return if the task can run on this machine. It should // return null if the task type is not allowed on this machine. // It should select the task it most wants to accomplish. // It is also responsible for determining & reserving disk space (including scratch). CanAccept([]TaskID, *TaskEngine) (*TaskID, error) // TypeDetails() returns static details about how this task behaves and // how this machine will run it. Read once at the beginning. TypeDetails() TaskTypeDetails // This listener will consume all external sources continuously for work. // Do() may also be called from a backlog of work. This must not // start doing the work (it still must be scheduled). // Note: Task de-duplication should happen in ExtraInfoFunc by // returning false, typically by determining from the tx that the work // exists already. The easy way is to have a unique joint index // across all fields that will be common. // Adder should typically only add its own task type, but multiple // is possible for when 1 trigger starts 2 things. // Usage Example: // func (b *BazType)Adder(addTask AddTaskFunc) { // for { // bazMaker := <- bazChannel // addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) (bool, error) { // _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot) // VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot) // if err != nil { // scream(err) // return false // } // return true // }) // } // } Adder(AddTaskFunc) }
TaskInterface must be implemented in order to have a task used by harmonytask.
type TaskTypeDetails ¶
type TaskTypeDetails struct { // Max returns how many tasks this machine can run of this type. // Zero (default) or less means unrestricted. Max int // Name is the task name to be added to the task list. Name string // Peak costs to Do() the task. Cost resources.Resources // Max Failure count before the job is dropped. // 0 = retry forever MaxFailures uint // Follow another task's completion via this task's creation. // The function should populate extraInfo from data // available from the previous task's tables, using the given TaskID. // It should also return success if the trigger succeeded. // NOTE: if refatoring tasks, see if your task is // necessary. Ex: Is the sector state correct for your stage to run? Follows map[string]func(TaskID, AddTaskFunc) (bool, error) // IAmBored is called (when populated) when there's capacity but no work. // Tasks added will be proposed to CanAccept() on this machine. // CanAccept() can read taskEngine's WorkOrigin string to learn about a task. // Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states. IAmBored func(AddTaskFunc) error }