taskman

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2025 License: MIT Imports: 11 Imported by: 0

README

go-taskman

An efficient and scalable task manager for in-process task scheduling in Go applications. The package is designed to handle a large number of concurrently running recurring jobs, while at the same time keeping the number of goroutines relatively low. A second design focus is the ability of simultaneous task execution, achieved by grouping of tasks into jobs.

Features

  • Defines the interface Task, which when implemented allows for easy inclusion of existing structures in the manager.
  • Grouping of tasks into Jobs for near-simultaneous execution.
  • Utilizes a worker pool setup.
    • This allows the manager to limit the number of spawned goroutines to the number of workers in the pool, and thus keeping memory usage down.
    • A priority queue is used to dispatch jobs for execution in the worker pool. The queue is a min heap, minimized by shortest time until next execution.
  • Dynamic worker pool scaling.
    • The worker pool scales based on the state of the queue;
      • Largest parallel execution of tasks
      • Tasks executed per second
      • Average task execution time
    • The scaling algorithm is designed to optimize for worker availability, and as such errs on the safe side when it comes to scaling down.

Install

go get github.com/jakobilobi/go-taskman

Usage

The most basic usage is to add a function directly, along with the cadence that the function execution should recurr at. In this case, a jobID is returned to allow the caller to later modify or remove the job.

manager := New()
defer manager.Stop()

jobID, err := manager.ScheduleFunc(
    func() error {
        log.Printf("Executing the function")
        return nil
    },
    10 * time.Second,
)
// Handle the err and do something with the job ID

Full usage of the package involves implementing the Task interface, and adding tasks to the manager in a Job.

// Make an arbitrary struct implement the Task interface
type SomeStruct struct {
	ID      string
}

func (s SomeStruct) Execute() error {
	log.Printf("Executing SomeStruct with ID: %s", s.ID)
	return nil
}

...

// Utilize the implementation when adding a Job
manager := New()
defer manager.Stop()

// A job with two tasks and a cadence of 10 seconds, set to have its first execution immediately
job := Job{
    Cadence:  10 * time.Second,
    ID:       "job1",
    NextExec: time.Now(),
    Tasks:    []Task{
        SomeStruct{ID: "task1"},
        SomeStruct{ID: "task2"},
    },
}

err := manager.ScheduleJob(job)
// Handle the err

Contributing

For contributions, please open a GitHub issue with your questions and suggestions. Before submitting an issue, have a look at the existing TODO list to see if your idea is already in the works.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	Cadence time.Duration // Time between executions of the job
	Tasks   []Task        // Tasks in the job

	ID       string    // Unique ID for the job
	NextExec time.Time // The next time the job should be executed
	// contains filtered or unexported fields
}

Job is a container for a group of tasks, with a unique ID and a cadence for scheduling.

type SimpleTask added in v0.2.0

type SimpleTask struct {
	// contains filtered or unexported fields
}

SimpleTask is a task that executes a function.

func (SimpleTask) Execute added in v0.2.0

func (st SimpleTask) Execute() error

Execute executes the function and returns the error.

type Task

type Task interface {
	Execute() error
}

Task is an interface for tasks that can be executed.

type TaskManager added in v0.2.0

type TaskManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskManager manages task scheduling and execution. Tasks are scheduled within Jobs, and the manager dispatches scheduled jobs to a worker pool for execution.

func New added in v0.2.0

func New() *TaskManager

New creates, starts and returns a new TaskManager.

func (*TaskManager) ErrorChannel added in v0.2.0

func (tm *TaskManager) ErrorChannel() <-chan error

ErrorChannel returns a read-only channel for reading errors from task execution.

func (*TaskManager) RemoveJob added in v0.2.0

func (tm *TaskManager) RemoveJob(jobID string) error

RemoveJob removes a job from the TaskManager.

func (*TaskManager) ReplaceJob added in v0.2.0

func (tm *TaskManager) ReplaceJob(newJob Job) error

ReplaceJob replaces a job in the TaskManager's queue with a new job, if their ID:s match. The new job's NextExec will be overwritten by the old job's, to preserve the TaskManager's schedule. Use this function to update a job's tasks without changing its schedule.

func (*TaskManager) ScheduleFunc added in v0.2.0

func (tm *TaskManager) ScheduleFunc(function func() error, cadence time.Duration) (string, error)

ScheduleFunc takes a function and adds it to the TaskManager in a Job. Creates and returns a randomized ID, used to identify the Job within the task manager.

func (*TaskManager) ScheduleJob added in v0.2.0

func (tm *TaskManager) ScheduleJob(job Job) error

ScheduleJob adds a job to the TaskManager. A job is a group of tasks that are scheduled to execute at a regular interval. The tasks in the job are executed in parallel, but the job's cadence determines when the job is executed. The function returns a job ID that can be used to identify the job within the TaskManager. Job requirements: - Cadence must be greater than 0 - Job must have at least one task - NextExec must not be more than one cadence old, set to time.Now() for instant execution - Job must have an ID, unique within the TaskManager

func (*TaskManager) ScheduleTask added in v0.2.0

func (tm *TaskManager) ScheduleTask(task Task, cadence time.Duration) (string, error)

ScheduleTask takes a Task and adds it to the TaskManager in a Job. Creates and returns a randomized ID, used to identify the Job within the task manager.

func (*TaskManager) ScheduleTasks added in v0.2.0

func (tm *TaskManager) ScheduleTasks(tasks []Task, cadence time.Duration) (string, error)

ScheduleTasks takes a slice of Task and adds them to the TaskManager in a Job. Creates and returns a randomized ID, used to identify the Job within the task manager.

func (*TaskManager) Stop added in v0.2.0

func (tm *TaskManager) Stop()

Stop signals the TaskManager to stop processing tasks and exit. Note: blocks until the TaskManager, including all workers, has completely stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL