harmonytask

package
v1.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0, MIT Imports: 12 Imported by: 0

Documentation

Overview

Package harmonytask implements a pure (no task logic), distributed task manager. This clean interface allows a task implementer to completely avoid being concerned with task scheduling and management. It's based on the idea of tasks as small units of work broken from other work by hardware, parallelizabilty, reliability, or any other reason. Workers will be Greedy: vaccuuming up their favorite jobs from a list. Once 1 task is accepted, harmonydb tries to get other task runner machines to accept work (round robin) before trying again to accept. * Mental Model:

Things that block tasks:
	- task not registered for any running server
	- max was specified and reached
	- resource exhaustion
	- CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start:
	- DB Read every 3 seconds
	- Task was added (to db) by this process
Ways tasks get added:
    - Async Listener task (for chain, etc)
	- Followers: Tasks get added because another task completed
When Follower collectors run:
    - If both sides are process-local, then this process will pick it up.
	- If properly registered already, the http endpoint will be tried to start it.
	- Otherwise, at the listen interval during db scrape it will be found.
How duplicate tasks are avoided:
    - that's up to the task definition, but probably a unique key

* To use: 1.Implement TaskInterface for a new task. 2. Have New() receive this & all other ACTIVE implementations. * * As we are not expecting DBAs in this database, it's important to know what grows uncontrolled. The only growing harmony_* table is harmony_task_history (somewhat quickly). These will need a clean-up for after the task data could never be acted upon. but the design **requires** extraInfo tables to grow until the task's info could not possibly be used by a following task, including slow release rollout. This would normally be in the order of months old. * Other possible enhancements include more collaborative coordination to assign a task to machines closer to the data.

__Database_Behavior__ harmony_task is the list of work that has not been completed.

AddTaskFunc manages the additions, but is designed to have its
transactions failed-out on overlap with a similar task already written.
It's up to the TaskInterface implementer to discover this overlap via
some other table it uses (since overlap can mean very different things).

harmony_task_history

This holds transactions that completed or saw too many retries. It also
serves as input for subsequent (follower) tasks to kick off. This is not
done machine-internally because a follower may not be on the same machine
as the previous task.

harmony_task_machines

Managed by lib/harmony/resources, this is a reference to machines registered
via the resources. This registration does not obligate the machine to
anything, but serves as a discovery mechanism. Paths are hostnames + ports
which are presumed to support http, but this assumption is only used by
the task system.

Index

Constants

View Source
const (
	WorkSourcePoller   = "poller"
	WorkSourceRecover  = "recovered"
	WorkSourceIAmBored = "bored"
)

Variables

View Source
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
View Source
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often
View Source
var POLL_DURATION = time.Second * 3 // Poll for Work this frequently

Consts (except for unit test)

View Source
var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait this long before scheduling another

Functions

func SingletonTaskAdder added in v1.27.0

func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error

Types

type AddTaskFunc

type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error))

AddTaskFunc is responsible for adding a task's details "extra info" to the DB. It should return true if the task should be added, false if it was already there. This is typically accomplished with a "unique" index on your detals table that would cause the insert to fail. The error indicates that instead of a conflict (which we should ignore) that we actually have a serious problem that needs to be logged with context.

type TaskEngine

type TaskEngine struct {
	WorkOrigin string
	// contains filtered or unexported fields
}

func New

func New(
	db *harmonydb.DB,
	impls []TaskInterface,
	hostnameAndPort string) (*TaskEngine, error)

New creates all the task definitions. Note that TaskEngine knows nothing about the tasks themselves and serves to be a generic container for common work

func (*TaskEngine) GracefullyTerminate

func (e *TaskEngine) GracefullyTerminate()

GracefullyTerminate hangs until all present tasks have completed. Call this to cleanly exit the process. As some processes are long-running, passing a deadline will ignore those still running (to be picked-up later).

func (*TaskEngine) ResourcesAvailable

func (e *TaskEngine) ResourcesAvailable() resources.Resources

ResourcesAvailable determines what resources are still unassigned.

type TaskID

type TaskID int

type TaskInterface

type TaskInterface interface {
	// Do the task assigned. Call stillOwned before making single-writer-only
	// changes to ensure the work has not been stolen.
	// This is the ONLY function that should attempt to do the work, and must
	// ONLY be called by harmonytask.
	// Indicate if the task no-longer needs scheduling with done=true including
	// cases where it's past the deadline.
	Do(taskID TaskID, stillOwned func() bool) (done bool, err error)

	// CanAccept should return if the task can run on this machine. It should
	// return null if the task type is not allowed on this machine.
	// It should select the task it most wants to accomplish.
	// It is also responsible for determining & reserving disk space (including scratch).
	CanAccept([]TaskID, *TaskEngine) (*TaskID, error)

	// TypeDetails() returns static details about how this task behaves and
	// how this machine will run it. Read once at the beginning.
	TypeDetails() TaskTypeDetails

	// This listener will consume all external sources continuously for work.
	// Do() may also be called from a backlog of work. This must not
	// start doing the work (it still must be scheduled).
	// Note: Task de-duplication should happen in ExtraInfoFunc by
	//  returning false, typically by determining from the tx that the work
	//  exists already. The easy way is to have a unique joint index
	//  across all fields that will be common.
	// Adder should typically only add its own task type, but multiple
	//   is possible for when 1 trigger starts 2 things.
	// Usage Example:
	// func (b *BazType)Adder(addTask AddTaskFunc) {
	//	  for {
	//      bazMaker := <- bazChannel
	//	    addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) (bool, error) {
	//	       _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot)
	//			  VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot)
	//         if err != nil {
	//				scream(err)
	//	 		 	return false
	//		   }
	// 		   return true
	//		})
	//	  }
	// }
	Adder(AddTaskFunc)
}

TaskInterface must be implemented in order to have a task used by harmonytask.

type TaskTypeDetails

type TaskTypeDetails struct {
	// Max returns how many tasks this machine can run of this type.
	// Zero (default) or less means unrestricted.
	Max int

	// Name is the task name to be added to the task list.
	Name string

	// Peak costs to Do() the task.
	Cost resources.Resources

	// Max Failure count before the job is dropped.
	// 0 = retry forever
	MaxFailures uint

	// Follow another task's completion via this task's creation.
	// The function should populate extraInfo from data
	// available from the previous task's tables, using the given TaskID.
	// It should also return success if the trigger succeeded.
	// NOTE: if refatoring tasks, see if your task is
	// necessary. Ex: Is the sector state correct for your stage to run?
	Follows map[string]func(TaskID, AddTaskFunc) (bool, error)

	// IAmBored is called (when populated) when there's capacity but no work.
	// Tasks added will be proposed to CanAccept() on this machine.
	// CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
	// Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
	IAmBored func(AddTaskFunc) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL