jobman

package module
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: MIT Imports: 9 Imported by: 0

README

jobman

Easy and powerful background jobs for Go applications.

install

go get -u github.com/melodyogonna/jobman

Usage

You can start using Jobman in two ways.

Using jobman with default options

The easiest way to start using jobman is by Initializing it without any options. This initializes jobman without any storage support, and with 5 workers.

package main

import (
  "github.com/melodyogonna/jobman"
  "log"
)

type EmailData struct {
      email string
      templateId string
}

func HandleEmailJob(job jobman.Job) error {
  log.Print("Email job handler called")
  data = job.Payload().(EmailData)
  // ...handle sending email
  return nil
}

func main(){
  jobman.Init() // Initialize jobman with default configurations
  jobman.RegisterHandlers("sendEmail", HandleEmailJob)

  job := jobman.GenericJob{
    JobType: "sendEmail",
    Data: EmailData{email:"johndoe@email.com", templateId: "testEmailTemplateId"}
  }
  jobman.WorkOn(job)
}

When a job is added to Jobman's job pool it'll be logged. For our example above, something like this is expected:

2025/02/13 15:19:15 worker: f222b4ab-1676-4245-bddd-30f06b902234 - handling job with type: NEWJOB                                             [0/8052]
2025/02/13 15:19:15 2 handlers registered for job type: NEWJOB. Forwarding ...
2025/02/13 15:19:15 worker: bddfa149-ab61-4dcb-a100-3b023be30996 - handling job with type: sendEmail
2025/02/13 15:19:15 1 handlers registered for job type: SubmissionContentApproved. Forwarding ...

Using jobman with custom options

You can configure Jobman to use custom storage backend - this allows you to create timed jobs. Timed jobs are associated with a future time when the job should be handled.


package main

import (
  "github.com/melodyogonna/jobman"
  "log"
  "os"
  "time"
)

func HandleEmailJob(job jobman.Job) error {
  log.Print("Email job handler called")
  // ...handle sending email
  return nil
}

func main(){
  jobman.InitWithOptions(jobman.SetupConfig{
    Backend: jobman.PostgresBackend(os.Getenv("DATABASE_URL")),
    WorkerSize: 10
  }) // Initialize jobman with custom configurations
  jobman.RegisterHandlers("sendEmail", HandleEmailJob)

  job := jobman.GenericTimedJob{
    JobType: "sendEmail",
    When: time.Now().Add(time.HOUR * 24)
    Data: EmailData{email:"johndoe@email.com", templateId: "testEmailTemplateId"}
  }
  jobman.WorkOn(job)
}

Jobman will panic if you tried to make it work on a timed job without setting up a backend.

Timed Jobs

Timed jobs have timing attached, jobman will save these jobs using the specified backend.

Components

My goal with this library is to make something where simple parts compose together intuitively. To that end, Jobman's core has 3 composing parts:

  1. Poller
  2. Backend
  3. Job

Poller

The purpose of a poller is to check some external source for due jobs, then add any jobs found to a job pool. A poller has the interface:


type JobPool chan Job

type Poller interface {
	Poll(p JobPool)
}

The default poller checks whatever backend passed during initialization every minute. You can create a custom poller and pass it to Jobman during initialization:

package main

import (
  "github.com/melodyogonna/jobman"
  "time"
)

// customPoller checks for jobs every hour
type customPoller struct {
  storage jobman.Backend
}
func (p CustomPoller) Poll(pool jobman.JobPool) {
  for {
    time.Sleep(time.Hour)
    due, err := p.storage.FindDue()
    if err != nil {
      // do error handling
      return
    }
    for _, job := range due {
      pool <- job
    }
  }
}

func GetCustomPoller(backend jobman.Backend) jobman.Pooler {
  return &customPoller{storage: backend}
}

func main(){
  jobman.InitWithOptions(jobman.SetupConfig{
    Backend: jobman.PostgresBackend(os.Getenv("DATABASE_URL")),
    WorkerSize: 10,
    Poller: GetCustomPoller(jobman.PostgresBackend(os.Getenv("DATABASE_URL")))
  }) // Initialize jobman with custom configurations
}

Jobman will use the default 1-minute poller if you don't pass any during setup.

Roadmap

Some ideas about the things I intend to implement in the future

  • Job retries
  • Redis backend
  • Nats backend
  • Action hooks - To monitor different states of a job
  • More tests

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Init

func Init()

Init sets up Jobman with default options namely - 5 workers, no poller, and no backend. TimedJob is disabled when jobman is initialized with default options. Sending timed jobs will cause a panic.

func InitWithOptions

func InitWithOptions(options SetupConfig)

InitWithOptions sets up Jobman with custom options. If you provide a backend but no pooler then jobman will use the default poller, which polls the storage every minute.

func PostgresBackend

func PostgresBackend(url string) *postgresBackend

func RegisterHandlers added in v0.2.0

func RegisterHandlers(jobType string, handlers ...handler)

RegisterHandlers registers a function that should be called when job of jobType needs to be processed.

func WorkOn

func WorkOn(job Job)

WorkOn gives Jobman a job to work on. If the Job is a TimedJob it'll be saved to the database, otherwise it'll be handled immediately

Types

type Backend

type Backend interface {
	// Save job, return an error job could not be saved
	Save(ctx context.Context, job TimedJob) error
	// Find due jobs.
	FindDue(ctx context.Context) ([]TimedJob, error)
	// Mark a job as complete
	MarkComplete(ctx context.Context, job TimedJob) error
}

Backend defines an interface for persisting jobs to an external persistent storage.

type GenericJob

type GenericJob struct {
	JobType string      `json:"job_type"`
	Data    any         `json:"data"`
	Opts    *JobOptions `json:"opts"`
}

func (GenericJob) Options

func (job GenericJob) Options() JobOptions

func (GenericJob) Payload

func (job GenericJob) Payload() any

func (GenericJob) Type

func (job GenericJob) Type() string

type GenericTimedJob

type GenericTimedJob struct {
	JobType string      `json:"job_type"`
	Data    any         `json:"data"` // This should be JSON serializable if we intend to save this in DB
	Opts    *JobOptions `json:"opts"`
	Id      int         `json:"id"`
	When    time.Time   `json:"when"`
}

GenericTimedJob is a default job that shouldn't get handled immediately. We need to enforce persistence when dealing with Timed jobs, so It requires a way to mark it as complete.

func (GenericTimedJob) ID

func (job GenericTimedJob) ID() any

func (GenericTimedJob) In

func (job GenericTimedJob) In() time.Time

func (GenericTimedJob) MarkCompleted

func (job GenericTimedJob) MarkCompleted() error

func (GenericTimedJob) Options

func (job GenericTimedJob) Options() JobOptions

func (GenericTimedJob) Payload

func (job GenericTimedJob) Payload() any

func (GenericTimedJob) Type

func (job GenericTimedJob) Type() string

type Job

type Job interface {
	// Retrieve job type
	Type() string

	// Retrieve Job data
	Payload() any

	Options() JobOptions
}

Job defines a standard job interface with properties a job should have.

type JobOptions

type JobOptions struct {
	Retry bool `json:"retry"`
}

type JobPool

type JobPool chan Job

type Poller

type Poller interface {
	Poll(p JobPool)
}

Poller searches some job storage interface internal to it and forwards every job it finds to the job pool

type RetryOptions

type RetryOptions struct{}

type SetupConfig

type SetupConfig struct {
	Backend    Backend
	Poller     Poller
	WorkerSize uint
}

type TimedJob

type TimedJob interface {
	Job

	// When job should be worked on
	In() time.Time

	// Mark job as complete after it has been handled. Persistent jobs needs a way to be marked as complete
	MarkCompleted() error

	// Timed jobs require an identity.
	ID() any
}

TimedJob is a job that shouldn't be worked on immediately. Only Timed jobs will be persisted in an external storage.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL