jobs

package
v0.113.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

package jobs

jobs package provides utilities to background job management.

A Job, at its core, is nothing more than a synchronous function.

func(ctx context.Context) error {
	return nil
}

Working with synchronous functions removes the complexity of thinking about how to run your application in your main. Your components become more stateless and focus on the domain rather than the lifecycle management, such as implementing a graceful async shutdown. This less stateful approach can help to make testing also easier.

Short-lived Jobs with Repeat

If your Job is a short-lived interaction, which is meant to be executed continuously between intervals, then you can use the jobs.WithRepeat to implement a continuous execution that stops on a shutdown signal.

job := jobs.WithRepeat(schedule.Interval(time.Second), func(ctx context.Context) error {
	// I'm a short-lived job, and I prefer to be constantly executed,
	// Repeat will keep repeating to me every second until the shutdown is signalled.
	return nil
})

ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

if err := job(ctx); err != nil {
	log.Println("ERROR", err.Error())
}
scheduling

In the schedule package, you can choose from various options on how would you like to schedule your job.

  • Schedule by time duration interval
schedule.Interval(time.Second) // schedule every second
  • Schedule on a Daily basis
schedule.Daily{Hour:12} // schedule every day at 12 o'clock
  • Schedule on a Monthly basis
// schedule every month at 12 o'clock on the third day
schedule.Monthly{Day: 3, Hour:12} 
Execution Order

If you wish to execute Jobs in a sequential order, use jobs.Sequence. It can express dependency between jobs if one should only execute if the previous one has already succeeded.

s := jobs.Sequence{
    func(ctx context.Context) error {
        // first job to execute
        return nil
    },
    func(ctx context.Context) error {
        // follow-up job to execute
        return nil
    },
}

err := s.Run(context.Background())

If you need to execute jobs concurrently, use jobs.Concurrence. It guarantees that if a job fails, you receive the error back. It also ensures that the jobs fail together as a unit, though signalling cancellation if any of the jobs has a failure.

c := jobs.Concurrence{
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
}

err := c.Run(context.Background())

Long-lived Jobs

If your job requires continuous work, you can use the received context as a parent context to get notified about a shutdown event. This allows simplicity in your code so you don't have to differentiate if you need to cancel operations because of a request cancellation or because of a shutdown event. You can still separate the two cancellation types by using background context.

func MyJob(signal context.Context) error {
	<-signal.Done() // work until shutdown signal
	return signal.Err() // returning the context error is not an issue.
}

Scheduled Jobs with Scheduler.WithSchedule

If you need cron-like background jobs with the guarantee that your background jobs are serialised across your application instances, and only one scheduled job can run at a time, then you may use jobs.Scheduler, which solves that for you.

m := schedule.Scheduler{
    LockerFactory: postgresql.NewLockerFactory[string](db),
    Repository:    postgresql.NewRepository[jobs.ScheduleState, string]{/* ... */},
}

job := m.WithSchedule("db maintenance", schedule.Interval(time.Hour*24*7), func(ctx context.Context) error {
    // this job is scheduled to run once at every seven days
    return nil
})

job := m.WithSchedule("db maintenance", schedule.Monthly{Day: 1}, func(ctx context.Context) error {
    // this job is scheduled to run once at every seven days
    return nil
})

Using components as Job with Graceful shutdown support

If your application components signal shutdown with a method interaction, like how http.Server do, then you can use jobs.WithShutdown to combine the entry-point method and the shutdown method into a single jobs.Job lambda expression. The graceful shutdown has a timeout, and the shutdown context will be cancelled afterwards.

srv := http.Server{
	Addr: "localhost:8080",
	Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusTeapot)
	}),
}

httpServerJob := jobs.WithShutdown(srv.ListenAndServe, srv.Shutdown)

Notify shutdown signals to jobs

The jobs.WithSignalNotify will listen to the shutdown syscalls, and will cancel the context of your Job. Using jobs.WithSignalNotify is most suitable in the main function.

// HTTP server as a job
job := jobs.WithShutdown(srv.ListenAndServe, srv.Shutdown)

// Job will benotified about shutdown signals.
job = jobs.WithSignalNotify(job)

if err := job(context.Background()); err != nil {
	log.Println("ERROR", err.Error())
}

Running your jobs in main

The most convenient way to run your jobs in your main is by using jobs.Run. It combines Concurrent job execution with shutdown cancellation by signals.

jobs.Run(ctx, job1, job2, job3)

Documentation

Overview

Package jobs provides utilities to background job management to achieve simplicity.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(ctx context.Context, jobs ...Job) error

Run helps to manage concurrent background Jobs in your main. Each Job will run in its own goroutine. If any of the Job encounters a failure, the other jobs will receive a cancellation signal.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
	"log"
	"net/http"
)

func main() {
	simpleJob := func(signal context.Context) error {
		<-signal.Done() // work until shutdown signal
		return signal.Err()
	}

	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}

	httpServerJob := jobs.WithShutdown(srv.ListenAndServe, srv.Shutdown)

	if err := jobs.Run(context.Background(), simpleJob, httpServerJob); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

Types

type Concurrence added in v0.110.0

type Concurrence []Job

Concurrence is construct that allows you to execute a list of Job concurrently. If any of the Job fails with an error, all Job will receive cancellation signal.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
)

func main() {
	err := jobs.Concurrence{
		func(ctx context.Context) error {
			// concurrent job 1
			return nil
		},
		func(ctx context.Context) error {
			// concurrent job 2
			return nil
		},
	}.Run(context.Background())
	_ = err
}
Output:

func (Concurrence) Run added in v0.110.0

func (c Concurrence) Run(ctx context.Context) error

type Job

type Job func(context.Context) error

Job is the basic unit of jobs package, that represents an executable work.

Job at its core, is nothing more than a synchronous function. Working with synchronous functions removes the complexity of thinking about how to run your application. Your components become more stateless and focus on the domain rather than the lifecycle management. This less stateful approach can help to make testing your Job also easier.

func OnError added in v0.108.0

func OnError[JFN genericJob](jfn JFN, fn func(error) error) Job
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
	"log"
)

func main() {
	jobWithErrorHandling := jobs.OnError(
		func(ctx context.Context) error { return nil },                          // job
		func(err error) error { log.Println("ERROR", err.Error()); return nil }, // error handling
	)
	_ = jobWithErrorHandling
}
Output:

func ToJob added in v0.106.0

func ToJob[JFN genericJob](fn JFN) Job

func WithRepeat added in v0.104.0

func WithRepeat[JFN genericJob](interval internal.Interval, jfn JFN) Job

WithRepeat will keep repeating a given Job until shutdown is signaled. It is most suitable for Job(s) meant to be short-lived and executed continuously until the shutdown signal.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
	"github.com/adamluzsi/frameless/pkg/jobs/schedule"
	"log"
	"time"
)

func main() {
	job := jobs.WithRepeat(schedule.Interval(time.Second), func(ctx context.Context) error {
		// I'm a short-lived job, and prefer to be constantly executed,
		// Repeat will keep repeating me every second until shutdown is signaled.
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	if err := job(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func WithShutdown added in v0.104.0

func WithShutdown[StartFn, StopFn genericJob](start StartFn, stop StopFn) Job

WithShutdown will combine the start and stop/shutdown function into a single Job function. It supports a graceful shutdown period; upon reaching the deadline, it will cancel the context passed to the shutdown function. WithShutdown makes it easy to use components with graceful shutdown support as a Job, such as the http.Server.

jobs.JobWithShutdown(srv.ListenAndServe, srv.Shutdown)
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
	"log"
	"net/http"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}

	httpServerJob := jobs.WithShutdown(srv.ListenAndServe, srv.Shutdown)
	_ = httpServerJob

	ctx, cancel := context.WithCancel(context.Background())
	// listen to a cancellation signal and then call the cancel func
	// or use ShutdownManager.
	_ = cancel

	if err := httpServerJob(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func WithSignalNotify added in v0.111.0

func WithSignalNotify[JFN genericJob](jfn JFN, shutdownSignals ...os.Signal) Job
Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
	"log"
	"net/http"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}

	job := jobs.WithShutdown(srv.ListenAndServe, srv.Shutdown)
	job = jobs.WithSignalNotify(job)

	if err := job(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Output:

func (Job) Run added in v0.111.0

func (fn Job) Run(ctx context.Context) error

type Runnable added in v0.110.0

type Runnable interface{ Run(context.Context) error }

type Sequence added in v0.110.0

type Sequence []Job

Sequence is construct that allows you to execute a list of Job sequentially. If any of the Job fails with an error, it breaks the sequential execution and the error is returned.

Example
package main

import (
	"context"
	"github.com/adamluzsi/frameless/pkg/jobs"
)

func main() {
	err := jobs.Sequence{
		func(ctx context.Context) error {
			// first job to execute
			return nil
		},
		func(ctx context.Context) error {
			// follow-up job to execute
			return nil
		},
	}.Run(context.Background())
	_ = err
}
Output:

func (Sequence) Run added in v0.110.0

func (s Sequence) Run(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL