cronx

package module
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: BSD-3-Clause Imports: 21 Imported by: 0

README

Go Doc Release Go Report Card Build Status Sourcegraph

logo

Cronx is a library to manage cron jobs, a cron manager library. It includes a live monitoring of current schedule and state of active jobs that can be outputted as JSON or HTML template.

Installation

In order to install cronx package, you need to install Go and set your Go workspace first.

You first need Go installed (version 1.14+ is required), then you can use the below Go command to install cronx.

go get -u -v github.com/rizalgowandy/cronx

Import it in your code:

package main

import "github.com/rizalgowandy/cronx"

Quick Start

Check the example here.

Run the binary:

make run | jq -R -r '. as $line | try fromjson catch $line'

Then, browse to:

cronx

Available Status

  • Down => Job fails to be registered.
  • Up => Job has just been created.
  • Running => Job is currently running.
  • Idle => Job is waiting for next execution time.
  • Error => Job fails on the last run.

Schedule Specification Format

Schedule

Field name Mandatory? Allowed values Allowed special characters
Seconds Optional 0-59 * / , -
Minutes Yes 0-59 * / , -
Hours Yes 0-23 * / , -
Day of month Yes 1-31 * / , - ?
Month Yes 1-12 or JAN-DEC * / , -
Day of week Yes 0-6 or SUN-SAT * / , - ?

Predefined schedules

Entry Description Equivalent
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 0 1 1 *
@monthly Run once a month, midnight, first of month 0 0 0 1 * *
@weekly Run once a week, midnight between Sat/Sun 0 0 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 0 * * *
@hourly Run once an hour, beginning of hour 0 0 * * * *

Intervals

@every <duration>

For example, "@every 1h30m10s" would indicate a schedule that activates after 1 hour, 30 minutes, 10 seconds, and then every interval after that.

Please refer to this link for more detail.

Interceptor / Middleware

Interceptor or commonly known as middleware is an operation that commonly executed before any of other operation. This library has the capability to add multiple middlewares that will be executed before or after the real job. It means you can log the running job, send telemetry, or protect the application from going down because of panic by adding middlewares. The idea of a middleware is to be declared once, and be executed on all registered jobs. Hence, reduce the code duplication on each job implementation.

Adding Interceptor / Middleware

package main

func main() {
	// Create cron middleware.
	// The order is important.
	// The first one will be executed first.
	middleware := cronx.Chain(
		interceptor.RequestID,           // Inject request id to context.
		interceptor.Recover(),           // Auto recover from panic.
		interceptor.Logger(),            // Log start and finish process.
		interceptor.DefaultWorkerPool(), // Limit concurrent running job.
	)

	cronx.New(cronx.WithInterceptor(middleware))
}

Check all available interceptors here.

Custom Interceptor / Middleware

package main

// Sleep is a middleware that sleep a few second after job has been executed.
func Sleep() cronx.Interceptor {
	return func(ctx context.Context, job *cronx.Job, handler cronx.Handler) error {
		err := handler(ctx, job)
		time.Sleep(10 * time.Second)
		return err
	}
}

For more example check here.

FAQ

What are the available commands?

Here the list of commonly used commands.

// Schedule sets a job to run at specific time.
// Example:
//  @every 5m
//  0 */10 * * * * => every 10m
Schedule(spec string, job JobItf) error

// ScheduleFunc adds a func to the Cron to be run on the given schedule.
ScheduleFunc(spec, name string, cmd func (ctx context.Context) error) error

// Schedules sets a job to run multiple times at specific time.
// Symbol */,-? should never be used as separator character.
// These symbols are reserved for cron specification.
//
// Example:
//  Spec		: "0 0 1 * * *#0 0 2 * * *#0 0 3 * * *
//  Separator	: "#"
//  This input schedules the job to run 3 times.
Schedules(spec, separator string, job JobItf) error

// SchedulesFunc adds a func to the Cron to be run on the given schedules.
SchedulesFunc(spec, separator, name string, cmd func (ctx context.Context) error) error

Go to here to see the list of available commands.

What are the available interceptors?

Go to here to see the available interceptors.

Can I use my own router without starting the built-in router?

Yes, you can. This library is very modular.

Here's an example of using gin.

package main

func main() {
	// Since we want to create custom HTTP server.
	// Do not forget to shutdown the cron gracefully manually here.
	manager := cronx.NewManager()
	defer manager.Stop()

	// An example using gin as the router.
	r := gin.Default()
	r.GET("/custom-path", func(c *gin.Context) {
		c.JSON(http.StatusOK, manager.GetInfo())
	})

	// Start your own server.
	r.Run()
}

Can I still get the built-in template if I use my own router?

Yes, you can.

package main

func main() {
	// Since we want to create custom HTTP server.
	// Do not forget to shutdown the cron gracefully manually here.
	manager := cronx.NewManager()
	defer manager.Stop()

	// GetStatusTemplate will return the built-in status page template.
	index, _ := page.GetStatusTemplate()

	// An example using echo as the router.
	e := echo.New()
	index, _ := page.GetStatusTemplate()
	e.GET("/jobs", func(context echo.Context) error {
		// Serve the template to the writer and pass the current status data.
		return index.Execute(context.Response().Writer, manager.GetStatusData())
	})
}

Server is located in the US, but my user is in Jakarta, can I change the cron timezone?

Yes, you can. By default, the cron timezone will follow the server location timezone using time.Local. If you placed the server in the US, it will use the US timezone. If you placed the server in the SG, it will use the SG timezone.

package main

func main() {
	loc := func() *time.Location { // Change timezone to Jakarta.
		jakarta, err := time.LoadLocation("Asia/Jakarta")
		if err != nil {
			secondsEastOfUTC := int((7 * time.Hour).Seconds())
			jakarta = time.FixedZone("WIB", secondsEastOfUTC)
		}
		return jakarta
	}()

	// Create a custom config.
	cronx.NewManager(cronx.WithLocation(loc))
}

My job requires certain information like current wave number, how can I get this information?

This kind of information is stored inside metadata, which stored automatically inside context.

package main

type subscription struct{}

func (subscription) Run(ctx context.Context) error {
	md, ok := cronx.GetJobMetadata(ctx)
	if !ok {
		return errors.New("cannot job metadata")
	}
	logx.INF(ctx, logx.KV{"job": fn.Name(), "metadata": md}, "subscription is running")
	return nil
}

Documentation

Index

Constants

View Source
const (
	SortKeyID      sortx.Key = "id"
	SortKeyName    sortx.Key = "name"
	SortKeyStatus  sortx.Key = "status"
	SortKeyPrevRun sortx.Key = "prev_run"
	SortKeyNextRun sortx.Key = "next_run"
	SortKeyLatency sortx.Key = "latency"
)
View Source
const (
	// CtxKeyJobMetadata is context for cron job metadata.
	CtxKeyJobMetadata = contextKey("cron-job-metadata")
)

Context key for standardized context value.

View Source
const (
	QueryParamSort = "sort"
)
View Source
const SleepDuration = time.Second * 10

SleepDuration defines the duration to sleep the server if the defined address is busy.

Variables

View Source
var (
	// Support the v1 where the first parameter is second.
	DefaultParser = cron.NewParser(
		cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)
	DefaultInterceptors = Chain()
	DefaultLocation     = time.Local
	DefaultStorage      = storage.NewNoopClient()
)

Default configuration for the manager.

Functions

func GetJobName added in v1.1.0

func GetJobName(job JobItf) (name string)

GetJobName return the Job name by reflect the job

func NewServer

func NewServer(manager *Manager, address string) (*http.Server, error)

NewServer creates a new HTTP server. - / => current server status. - /jobs => current jobs as frontend html. - /api/jobs => current jobs as json.

func NewSideCarServer

func NewSideCarServer(manager *Manager, address string)

NewSideCarServer creates a new side car HTTP server. HTTP server will be start automatically. - / => current server status. - /jobs => current jobs as frontend html. - /api/jobs => current jobs as json.

func NewStatusDataSorter added in v1.4.0

func NewStatusDataSorter(key sortx.Key, order sortx.Order, data []StatusData) sort.Interface

func SetJobMetadata

func SetJobMetadata(ctx context.Context, meta JobMetadata) context.Context

SetJobMetadata stores current job metadata inside current context.

Types

type Func

type Func func(ctx context.Context) error

Func is a type to allow callers to wrap a raw func. Example:

manager.Schedule("@every 5m", cronx.Func(myFunc))

func (Func) Run

func (f Func) Run(ctx context.Context) error

type FuncJob added in v1.2.0

type FuncJob struct {
	// contains filtered or unexported fields
}

FuncJob is a type to allow callers to wrap a raw func with name. Example:

manager.ScheduleFunc("@every 5m", "random name", myFunc)

func NewFuncJob added in v1.2.0

func NewFuncJob(name string, cmd Func) *FuncJob

NewFuncJob creates a wrapper type for a raw func with name.

func (FuncJob) Run added in v1.2.0

func (f FuncJob) Run(ctx context.Context) error

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler is the handler definition to run a job.

type HistoryData added in v1.6.0

type HistoryData struct {
	Data       []storage.History   `db:"data" json:"data"`
	Pagination pagination.Response `db:"pagination" json:"pagination"`
}

type Interceptor

type Interceptor func(ctx context.Context, job *Job, handler Handler) error

Interceptor is the middleware that will be executed before the current handler.

func Chain

func Chain(interceptors ...Interceptor) Interceptor

Chain returns a single interceptor from multiple interceptors.

type Job

type Job struct {
	JobMetadata

	Name    string     `db:"name" json:"name"`
	Status  StatusCode `db:"status" json:"status"`
	Latency string     `db:"latency" json:"latency"`
	Error   string     `db:"error" json:"error"`
	// contains filtered or unexported fields
}

func NewJob

func NewJob(manager *Manager, job JobItf, waveNumber, totalWave int64) *Job

NewJob creates a new job with default status and name.

func (*Job) RecordHistory added in v1.6.0

func (j *Job) RecordHistory(ctx context.Context, start, finish time.Time)

func (*Job) Run

func (j *Job) Run()

Run executes the current job operation.

func (*Job) UpdateStatus

func (j *Job) UpdateStatus() StatusCode

UpdateStatus updates the current job status to the latest.

type JobItf

type JobItf interface {
	Run(ctx context.Context) error
}

type JobMetadata

type JobMetadata struct {
	EntryID    cron.EntryID `db:"entry_id" json:"entry_id"`
	Wave       int64        `db:"wave" json:"wave"`
	TotalWave  int64        `db:"total_wave" json:"total_wave"`
	IsLastWave bool         `db:"is_last_wave" json:"is_last_wave"`
}

func GetJobMetadata

func GetJobMetadata(ctx context.Context) (JobMetadata, bool)

GetJobMetadata returns job metadata from current context, and status if it exists or not.

type Manager added in v1.1.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager controls all the underlying job.

func NewManager added in v1.1.0

func NewManager(opts ...Option) *Manager

NewManager create a command controller with a specific config.

func (*Manager) GetEntries added in v1.1.0

func (m *Manager) GetEntries() []cron.Entry

GetEntries returns all the current registered jobs.

func (*Manager) GetEntry added in v1.1.0

func (m *Manager) GetEntry(id cron.EntryID) *cron.Entry

GetEntry returns a snapshot of the given entry, or nil if it couldn't be found.

func (*Manager) GetHistoryData added in v1.6.0

func (m *Manager) GetHistoryData(
	ctx context.Context,
	req pagination.Request,
) (HistoryData, error)

GetStatusData returns run histories for history page.

func (*Manager) GetInfo added in v1.1.0

func (m *Manager) GetInfo() map[string]interface{}

GetInfo returns command controller basic information.

func (*Manager) GetStatusData added in v1.1.0

func (m *Manager) GetStatusData(param ...string) StatusPageData

GetStatusData returns all jobs status for status page.

func (*Manager) Remove added in v1.1.0

func (m *Manager) Remove(id cron.EntryID)

Remove removes a specific job from running. Get EntryID from the list job entries manager.GetEntries(). If job is in the middle of running, once the process is finished it will be removed.

func (*Manager) Schedule added in v1.1.0

func (m *Manager) Schedule(spec string, job JobItf) error

Schedule sets a job to run at specific time. Example:

@every 5m
0 */10 * * * * => every 10m

func (*Manager) ScheduleFunc added in v1.2.0

func (m *Manager) ScheduleFunc(spec, name string, cmd func(ctx context.Context) error) error

ScheduleFunc adds a func to the Cron to be run on the given schedule.

func (*Manager) Schedules added in v1.1.0

func (m *Manager) Schedules(spec, separator string, job JobItf) error

Schedules sets a job to run multiple times at specific time. Symbol */,-? should never be used as separator character. These symbols are reserved for cron specification.

Example:

Spec		: "0 0 1 * * *#0 0 2 * * *#0 0 3 * * *
Separator	: "#"
This input schedules the job to run 3 times.

func (*Manager) SchedulesFunc added in v1.2.0

func (m *Manager) SchedulesFunc(
	spec, separator, name string,
	cmd func(ctx context.Context) error,
) error

SchedulesFunc adds a func to the Cron to be run on the given schedules.

func (*Manager) Start added in v1.1.0

func (m *Manager) Start()

Start starts jobs from running at the next scheduled time.

func (*Manager) Stop added in v1.1.0

func (m *Manager) Stop()

Stop stops active jobs from running at the next scheduled time.

type Option added in v1.1.0

type Option func(*Manager)

Option represents a modification to the default behavior of the manager.

func WithAutoStartDisabled added in v1.2.0

func WithAutoStartDisabled() Option

WithAutoStartDisabled prevent the cron job from actually running.

func WithInterceptor added in v1.1.0

func WithInterceptor(interceptors ...Interceptor) Option

WithChain specifies Job wrappers to apply to all jobs added to this cron.

func WithLocation added in v1.1.0

func WithLocation(loc *time.Location) Option

WithLocation overrides the timezone of the cron instance.

func WithLowPriorityDownJobs added in v1.3.0

func WithLowPriorityDownJobs() Option

WithLowPriorityDownJobs puts the down jobs at the bottom of the list.

func WithParser added in v1.1.0

func WithParser(p cron.Parser) Option

WithParser overrides the parser used for interpreting job schedules.

func WithStorage added in v1.6.0

func WithStorage(client storage.Client) Option

WithStorage determines the reader and writer for historical data.

type ServerController

type ServerController struct {
	// Manager controls all the underlying job.
	Manager *Manager
}

ServerController is http server controller.

func (*ServerController) APIHistories added in v1.6.0

func (c *ServerController) APIHistories(ctx echo.Context) error

APIHistories returns run histories as json.

func (*ServerController) APIJobs

func (c *ServerController) APIJobs(ctx echo.Context) error

APIJobs returns job status as json.

func (*ServerController) HealthCheck

func (c *ServerController) HealthCheck(ctx echo.Context) error

HealthCheck returns server status.

func (*ServerController) Jobs

func (c *ServerController) Jobs(ctx echo.Context) error

Jobs return job status as frontend template.

type Sort added in v1.6.0

type Sort struct {
	Query   string            `db:"query" json:"query"`
	Columns map[string]string `db:"columns" json:"columns"`
}

type StatusCode

type StatusCode string

StatusCode describes current job status.

const (
	// StatusCodeUp describes that current job has just been created.
	StatusCodeUp StatusCode = "UP"
	// StatusCodeIdle describes that current job is waiting for next execution time.
	StatusCodeIdle StatusCode = "IDLE"
	// StatusCodeRunning describes that current job is currently running.
	StatusCodeRunning StatusCode = "RUNNING"
	// StatusCodeDown describes that current job has failed to be registered.
	StatusCodeDown StatusCode = "DOWN"
	// StatusCodeError describes that last run has failed.
	StatusCodeError StatusCode = "ERROR"
)

func (StatusCode) String added in v1.6.0

func (s StatusCode) String() string

type StatusData

type StatusData struct {
	// ID is unique per job.
	ID cron.EntryID `db:"id" json:"id"`
	// Job defines current job.
	Job *Job `db:"job" json:"job"`
	// Next defines the next schedule to execute current job.
	Next time.Time `db:"next" json:"next"`
	// Prev defines the last run of the current job.
	Prev time.Time `db:"prev" json:"prev"`
}

StatusData defines current job status.

type StatusPageData added in v1.5.0

type StatusPageData struct {
	Data []StatusData `db:"data" json:"data"`
	Sort Sort         `db:"sort" json:"sort"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL