tasker

package
v0.150.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

README

package tasker

tasker package provides utilities to background task management.

A tasker.Task, at its core, is nothing more than a synchronous function.

func(ctx context.Context) error {
	return nil
}

Working with synchronous functions removes the complexity of thinking about how to run your application in your main. Your components become more stateless and focus on the domain rather than the lifecycle management, such as implementing a graceful async shutdown. This less stateful approach can help to make testing also easier.

Short-lived Jobs with Repeat

If your Job is a short-lived interaction, which is meant to be executed continuously between intervals, then you can use the tasker.WithRepeat to implement a continuous execution that stops on a shutdown signal.

task := tasker.WithRepeat(schedule.Interval(time.Second), func(ctx context.Context) error {
	// I'm a short-lived task, and I prefer to be constantly executed,
	// Repeat will keep repeating to me every second until the shutdown is signalled.
	return nil
})

ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

if err := task(ctx); err != nil {
	logger.Error(ctx, err.Error())
}
scheduling

In the schedule package, you can choose from various options on how would you like to schedule your task.

  • Schedule by time duration interval
schedule.Interval(time.Second) // schedule every second
  • Schedule on a Daily basis
schedule.Daily{Hour:12} // schedule every day at 12 o'clock
  • Schedule on a Monthly basis
// schedule every month at 12 o'clock on the third day
schedule.Monthly{Day: 3, Hour:12} 
Execution Order

If you wish to execute Jobs in a sequential order, use tasker.Sequence. It can express dependency between tasks if one should only execute if the previous one has already succeeded.

s := tasker.Sequence(
    func(ctx context.Context) error {
        // first task to execute
        return nil
    },
    func(ctx context.Context) error {
        // follow-up task to execute
        return nil
    },
)

err := s.Run(context.Background())

If you need to execute tasks concurrently, use tasker.Concurrence. It guarantees that if a task fails, you receive the error back. It also ensures that the tasks fail together as a unit, though signalling cancellation if any of the tasks has a failure.

c := tasker.Concurrence(
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
)

err := c.Run(context.Background())

You can model dependency between tasks by mixing "Sequence" and "Concurrence".

task := tasker.Sequence(
	tasker.Concurrence( // group 1 which is a prerequisite to group 2
		func(ctx context.Context) error { return nil /* some migration task 1 */ },
		func(ctx context.Context) error { return nil /* some migration task 2 */ },
	),
	tasker.Concurrence( // group 2 which depends on group 1 success
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 1 */ },
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 2 */ },
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 3 */ },
	),
)

tasker.Main(context.Background(), task)

Long-lived Jobs

If your task requires continuous work, you can use the received context as a parent context to get notified about a shutdown event. This allows simplicity in your code so you don't have to differentiate if you need to cancel operations because of a request cancellation or because of a shutdown event. You can still separate the two cancellation types by using background context.

func MyJob(signal context.Context) error {
	<-signal.Done() // work until shutdown signal
	return signal.Err() // returning the context error is not an issue.
}

Cron-like scheduled Tasks with Scheduler.WithSchedule

If you need cron-like background tasks with the guarantee that your background tasks are serialised across your application instances, and only one scheduled task can run at a time, then you may use tasker.Scheduler, which solves that for you.

package main

import (
	"context"
	"os"
	"database/sql"

	"github.com/adamluzsi/frameless/adapters/postgresql"
	"github.com/adamluzsi/frameless/pkg/contexts"
	"github.com/adamluzsi/frameless/pkg/logger"
	"github.com/adamluzsi/frameless/pkg/tasker"
	"github.com/adamluzsi/frameless/pkg/tasker/schedule"
)

func main() {
	ctx := context.Background()

	db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
	if err != nil {
		logger.Error(ctx, "error during postgres db opening", logger.ErrField(err))
		os.Exit(1)
	}

	scheduler := schedule.Scheduler{
		LockerFactory: &postgresql.LockerFactory[string]{DB: db},
		Repository:    &postgresql.TaskerScheduleStateRepository{DB: db},
	}

	task1 := scheduler.WithSchedule("my scheduled task", schedule.Monthly{Day: 1}, func(ctx context.Context) error {
		// this task will only run in one instance every month, on the first day.
		return nil
	})

	if err := tasker.Main(ctx, task1); err != nil {
		logger.Error(ctx, "error during the application run", logger.ErrField(err))
		os.Exit(1)
	}
}

Using components as Job with Graceful shutdown support

If your application components signal shutdown with a method interaction, like how http.Server do, then you can use tasker.WithShutdown to combine the entry-point method and the shutdown method into a single tasker.Job lambda expression. The graceful shutdown has a timeout, and the shutdown context will be cancelled afterwards.

srv := http.Server{
	Addr: "localhost:8080",
	Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusTeapot)
	}),
}

httpServerTask := tasker.WithShutdown(
	tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed), 
	srv.Shutdown,
)

Notify shutdown signals to tasks

The tasker.WithSignalNotify will listen to the shutdown syscalls, and will cancel the context of your Task. Using tasker.WithSignalNotify is most suitable from the main function.

// The task will be notified about shutdown signal call as context cancellation.
task := tasker.WithSignalNotify(MyTask)

if err := task(context.Background()); err != nil {
	logger.Error(ctx, err.Error())
}

Running your tasks in main

The most convenient way to run your tasks in your main is by using tasker.Main. It combines Concurrent task execution with shutdown cancellation by signals.

tasker.Main(ctx, task1, task2, task3)

Documentation

Overview

Package tasker provides utilities to background task management to achieve simplicity.

Example (SequenceMixedWithConcurrence)
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
)

func main() {
	_ = tasker.Sequence(
		tasker.Concurrence(
			func() { /* migration task 1 */ },
			func() { /* migration task 2 */ },
		),
		tasker.Concurrence(
			func() { /* task dependent on migrations */ },
			func() { /* task dependent on migrations */ },
			func() { /* task dependent on migrations */ },
		),
	)(context.Background())
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Main

func Main(ctx context.Context, tasks ...Task) error

Main helps to manage concurrent background Tasks in your main. Each Task will run in its own goroutine. If any of the Task encounters a failure, the other tasker will receive a cancellation signal.

Types

type Interval

type Interval interface {
	UntilNext(lastRanAt time.Time) time.Duration
}

type Runnable

type Runnable interface{ Run(context.Context) error }

type Task

type Task func(context.Context) error

Task is the basic unit of tasker package, that represents an executable work.

Task at its core, is nothing more than a synchronous function. Working with synchronous functions removes the complexity of thinking about how to run your application. Your components become more stateless and focus on the domain rather than the lifecycle management. This less stateful approach can help to make testing your Task also easier.

func Concurrence

func Concurrence[TFN genericTask](tfns ...TFN) Task

Concurrence is construct that allows you to execute a list of Task concurrently. If any of the Task fails with an error, all Task will receive cancellation signal.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
)

func main() {
	err := tasker.Concurrence(
		func(ctx context.Context) error {
			// concurrent task 1
			return nil
		},
		func(ctx context.Context) error {
			// concurrent task 2
			return nil
		},
	).Run(context.Background())
	_ = err
}
Output:

func IgnoreError

func IgnoreError[TFN genericTask](tfn TFN, errsToIgnore ...error) Task

func OnError

func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/logger"
	"github.com/adamluzsi/frameless/pkg/tasker"
)

func main() {
	withErrorHandling := tasker.OnError(
		func(ctx context.Context) error { return nil },                                            // task
		func(ctx context.Context, err error) error { logger.Error(ctx, err.Error()); return nil }, // error handling
	)
	_ = withErrorHandling
}
Output:

func Sequence

func Sequence[TFN genericTask](tfns ...TFN) Task
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
)

func main() {
	err := tasker.Sequence(
		func(ctx context.Context) error {
			// first task to execute
			return nil
		},
		func(ctx context.Context) error {
			// follow-up task to execute
			return nil
		},
	).Run(context.Background())
	_ = err
}
Output:

func ToTask

func ToTask[TFN genericTask](tfn TFN) Task

func WithRepeat

func WithRepeat[TFN genericTask](interval Interval, tfn TFN) Task

WithRepeat will keep repeating a given Task until shutdown is signaled. It is most suitable for Task(s) meant to be short-lived and executed continuously until the shutdown signal.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
	"github.com/adamluzsi/frameless/pkg/tasker/schedule"
	"log"
	"time"
)

func main() {
	task := tasker.WithRepeat(schedule.Interval(time.Second), func(ctx context.Context) error {
		// I'm a short-lived task, and prefer to be constantly executed,
		// Repeat will keep repeating me every second until shutdown is signaled.
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	if err := task(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func WithShutdown

func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task

WithShutdown will combine the start and stop/shutdown function into a single Task function. It supports a graceful shutdown period; upon reaching the deadline, it will cancel the context passed to the shutdown function. WithShutdown makes it easy to use components with graceful shutdown support as a Task, such as the http.Server.

tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
	"log"
)

func main() {
	task := tasker.WithShutdown(
		func(ctx context.Context) error {
			// start working
			<-ctx.Done()
			return nil
		},
		func(ctx context.Context) error {
			// graceful stop for work
			<-ctx.Done()
			return nil
		},
	)

	ctx, cancel := context.WithCancel(context.Background())
	// listen to a cancellation signal and then call the cancel func
	// or use ShutdownManager.
	_ = cancel

	if err := task(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

Example (HttpServer)
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
	"log"
	"net/http"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}
	httpServerTask := tasker.WithShutdown(
		tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed),
		srv.Shutdown,
	)

	if err := tasker.WithSignalNotify(httpServerTask)(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func WithSignalNotify

func WithSignalNotify[TFN genericTask](tfn TFN, shutdownSignals ...os.Signal) Task
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/tasker"
	"log"
	"net/http"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}

	task := tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
	task = tasker.WithSignalNotify(task)

	if err := task(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func (Task) Run

func (fn Task) Run(ctx context.Context) error

Run method supplies Runnable interface for Task.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL