cronx

package module
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2023 License: BSD-3-Clause Imports: 24 Imported by: 0

README

Go Doc Release Go Report Card Build Status Sourcegraph

logo

Cronx is a library to manage cron jobs, a cron manager library. It includes a live monitoring of current schedule and state of active jobs that can be outputted as JSON or HTML template.

Installation

In order to install cronx package, you need to install Go and set your Go workspace first.

You first need Go installed (version >=1.15 is required), then you can use the below Go command to install cronx.

go get -v github.com/rizalgowandy/cronx

Import it in your code:

package main

import "github.com/rizalgowandy/cronx"

Quick Start

Check the example here.

Run docker:

docker-compose up

Run the binary:

make run | jq -R -r '. as $line | try fromjson catch $line'

Then, browse to:

cronx

Available Status

  • Down => Job fails to be registered.
  • Up => Job has just been created.
  • Running => Job is currently running.
  • Success => Job succeeds on the last run, waiting for next run.
  • Error => Job fails on the last run.

Schedule Specification Format

Schedule

Field name Mandatory? Allowed values Allowed special characters
Seconds Optional 0-59 * / , -
Minutes Yes 0-59 * / , -
Hours Yes 0-23 * / , -
Day of month Yes 1-31 * / , - ?
Month Yes 1-12 or JAN-DEC * / , -
Day of week Yes 0-6 or SUN-SAT * / , - ?

Predefined schedules

Entry Description Equivalent
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 0 1 1 *
@monthly Run once a month, midnight, first of month 0 0 0 1 * *
@weekly Run once a week, midnight between Sat/Sun 0 0 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 0 * * *
@hourly Run once an hour, beginning of hour 0 0 * * * *

Intervals

@every <duration>

For example, "@every 1h30m10s" would indicate a schedule that activates after 1 hour, 30 minutes, 10 seconds, and then every interval after that.

Please refer to this link for more detail.

Interceptor / Middleware

Interceptor or commonly known as middleware is an operation that commonly executed before any of other operation. This library has the capability to add multiple middlewares that will be executed before or after the real job. It means you can log the running job, send telemetry, or protect the application from going down because of panic by adding middlewares. The idea of a middleware is to be declared once, and be executed on all registered jobs. Hence, reduce the code duplication on each job implementation.

Adding Interceptor / Middleware

package main

import (
	"github.com/rizalgowandy/cronx"
	"github.com/rizalgowandy/cronx/interceptor"
)

func main() {
	// Create cron middleware.
	// The order is important.
	// The first one will be executed first.
	middleware := cronx.Chain(
		interceptor.RequestID,           // Inject request id to context.
		interceptor.Recover(),           // Auto recover from panic.
		interceptor.Logger(),            // Log start and finish process.
		interceptor.DefaultWorkerPool(), // Limit concurrent running job.
	)

	cronx.NewManager(cronx.WithInterceptor(middleware))
}

Check all available interceptors here.

Custom Interceptor / Middleware

package main

import (
	"context"
	"time"

	"github.com/rizalgowandy/cronx"
)

// Sleep is a middleware that sleep a few second after job has been executed.
func Sleep() cronx.Interceptor {
	return func(ctx context.Context, job *cronx.Job, handler cronx.Handler) error {
		err := handler(ctx, job)
		time.Sleep(10 * time.Second)
		return err
	}
}

For more example check here.

FAQ

What are the available commands?

Here the list of commonly used commands.

package main

import (
	"context"

	"github.com/rizalgowandy/cronx"
)

// Schedule sets a job to run at specific time.
// Example:
//  @every 5m
//  0 */10 * * * * => every 10m
func Schedule(spec string, job cronx.JobItf) error

// ScheduleFunc adds a func to the Cron to be run on the given schedule.
func ScheduleFunc(spec, name string, cmd func(ctx context.Context) error) error

// Schedules sets a job to run multiple times at specific time.
// Symbol */,-? should never be used as separator character.
// These symbols are reserved for cron specification.
//
// Example:
//  Spec		: "0 0 1 * * *#0 0 2 * * *#0 0 3 * * *
//  Separator	: "#"
//  This input schedules the job to run 3 times.
func Schedules(spec, separator string, job cronx.JobItf) error

// SchedulesFunc adds a func to the Cron to be run on the given schedules.
func SchedulesFunc(spec, separator, name string, cmd func(ctx context.Context) error) error

Go here to see the list of available commands.

What are the available interceptors?

Go here to see the available interceptors.

Can I use my own router without starting the built-in router?

Yes, you can. This library is very modular.

Here's an example of using gin.

package main

import (
	"net/http"

	"github.com/gin-gonic/gin"
	"github.com/rizalgowandy/cronx"
)

func main() {
	// Since we want to create custom HTTP server.
	// Do not forget to shut down the cron gracefully manually here.
	manager := cronx.NewManager()
	defer manager.Stop()

	// An example using gin as the router.
	r := gin.Default()
	r.GET("/custom-path", func(c *gin.Context) {
		c.JSON(http.StatusOK, manager.GetInfo())
	})

	// Start your own server.
	r.Run()
}

Can I still get the built-in template if I use my own router?

Yes, you can.

package main

import (
	"github.com/labstack/echo/v4"
	"github.com/rizalgowandy/cronx"
	"github.com/rizalgowandy/cronx/page"
)

func main() {
	// Since we want to create custom HTTP server.
	// Do not forget to shut down the cron gracefully manually here.
	manager := cronx.NewManager()
	defer manager.Stop()

	// An example using echo as the router.
	e := echo.New()
	index, _ := page.GetStatusTemplate()
	e.GET("/jobs", func(context echo.Context) error {
		// Serve the template to the writer and pass the current status data.
		return index.Execute(context.Response().Writer, manager.GetStatusData(ctx.QueryParam(cronx.QueryParamSort)))
	})
}

Server is located in the US, but my user is in Jakarta, can I change the cron timezone?

Yes, you can. By default, the cron timezone will follow the server location timezone using time.Local. If you placed the server in the US, it will use the US timezone. If you placed the server in the SG, it will use the SG timezone.

package main

import (
	"time"

	"github.com/rizalgowandy/cronx"
)

func main() {
	loc := func() *time.Location { // Change timezone to Jakarta.
		jakarta, err := time.LoadLocation("Asia/Jakarta")
		if err != nil {
			secondsEastOfUTC := int((7 * time.Hour).Seconds())
			jakarta = time.FixedZone("WIB", secondsEastOfUTC)
		}
		return jakarta
	}()

	// Create a custom config.
	cronx.NewManager(cronx.WithLocation(loc))
}

My job requires certain information like current wave number, how can I get this information?

This kind of information is stored inside metadata, which stored automatically inside context.

package main

import (
	"context"
	"errors"

	"github.com/rizalgowandy/cronx"
	"github.com/rizalgowandy/gdk/pkg/logx"
)

type subscription struct{}

func (subscription) Run(ctx context.Context) error {
	md, ok := cronx.GetJobMetadata(ctx)
	if !ok {
		return errors.New("cannot job metadata")
	}
	logx.INF(ctx, md, "subscription is running")
	return nil
}

Documentation

Index

Constants

View Source
const (
	SortKeyID      sortx.Key = "id"
	SortKeyName    sortx.Key = "name"
	SortKeyStatus  sortx.Key = "status"
	SortKeyPrevRun sortx.Key = "prev_run"
	SortKeyNextRun sortx.Key = "next_run"
	SortKeyLatency sortx.Key = "latency"
)
View Source
const (
	// CtxKeyJobMetadata is context for cron job metadata.
	CtxKeyJobMetadata = contextKey("cron-job-metadata")
)

Context key for standardized context value.

View Source
const (
	QueryParamSort = "sort"
)
View Source
const SleepDuration = time.Second * 10

SleepDuration defines the duration to sleep the server if the defined address is busy.

Variables

View Source
var (
	// DefaultParser supports the v1 where the first parameter is second.
	DefaultParser = cron.NewParser(
		cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)
	DefaultInterceptors = Chain()
	DefaultLocation     = time.Local
	DefaultStorage      = storage.NewNoopClient()
	DefaultAlerter      = NewAlerter()
)

Default configuration for the manager.

Functions

func GetJobName added in v1.1.0

func GetJobName(job JobItf) (name string)

GetJobName return the Job name by reflect the job

func NewServer

func NewServer(manager *Manager, address string) (*http.Server, error)

NewServer creates a new HTTP server. - / => current server status. - /jobs => current jobs as frontend html. - /api/jobs => current jobs as json.

func NewSideCarServer

func NewSideCarServer(manager *Manager, address string)

NewSideCarServer creates a new sidecar HTTP server. HTTP server will be start automatically. - / => current server status. - /jobs => current jobs as frontend html. - /api/jobs => current jobs as json.

func NewStatusDataSorter added in v1.4.0

func NewStatusDataSorter(key sortx.Key, order sortx.Order, data []StatusData) sort.Interface

func SetJobMetadata

func SetJobMetadata(ctx context.Context, meta JobMetadata) context.Context

SetJobMetadata stores current job metadata inside current context.

Types

type Alerter added in v1.8.0

type Alerter struct{}

func NewAlerter added in v1.8.0

func NewAlerter() *Alerter

func (*Alerter) NotifyHighLatency added in v1.8.0

func (a *Alerter) NotifyHighLatency(
	ctx context.Context,
	job *Job,
	prev, next time.Time,
	latency, maxLatency time.Duration,
)

type AlerterItf added in v1.8.0

type AlerterItf interface {
	NotifyHighLatency(
		ctx context.Context,
		job *Job,
		prev, next time.Time,
		latency, maxLatency time.Duration,
	)
}

type Func

type Func func(ctx context.Context) error

Func is a type to allow callers to wrap a raw func. Example:

manager.Schedule("@every 5m", cronx.Func(myFunc))

func (Func) Run

func (f Func) Run(ctx context.Context) error

type FuncJob added in v1.2.0

type FuncJob struct {
	// contains filtered or unexported fields
}

FuncJob is a type to allow callers to wrap a raw func with name. Example:

manager.ScheduleFunc("@every 5m", "random name", myFunc)

func NewFuncJob added in v1.2.0

func NewFuncJob(name string, cmd Func) *FuncJob

NewFuncJob creates a wrapper type for a raw func with name.

func (FuncJob) Run added in v1.2.0

func (f FuncJob) Run(ctx context.Context) error

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler is the handler definition to run a job.

type HistoryPageData added in v1.7.0

type HistoryPageData struct {
	Data       []storage.History `json:"data"`
	Pagination Response          `json:"pagination"`
	Sort       pagination.Sort   `json:"sort"`
}

type Interceptor

type Interceptor func(ctx context.Context, job *Job, handler Handler) error

Interceptor is the middleware that will be executed before the current handler.

func Chain

func Chain(interceptors ...Interceptor) Interceptor

Chain returns a single interceptor from multiple interceptors.

type Job

type Job struct {
	JobMetadata

	Name    string     `json:"name"`
	Status  StatusCode `json:"status"`
	Latency string     `json:"latency"`
	Error   string     `json:"error"`
	PrevRun time.Time  `json:"-"`
	NextRun time.Time  `json:"-"`
	// contains filtered or unexported fields
}

func NewJob

func NewJob(manager *Manager, job JobItf, waveNumber, totalWave int64) *Job

NewJob creates a new job with default status and name.

func (*Job) RecordHistory added in v1.6.0

func (j *Job) RecordHistory(ctx context.Context, start, finish time.Time)

func (*Job) Run

func (j *Job) Run()

Run executes the current job operation.

func (*Job) UpdateStatus

func (j *Job) UpdateStatus() StatusCode

UpdateStatus updates the current job status to the latest.

type JobItf

type JobItf interface {
	Run(ctx context.Context) error
}

type JobMetadata

type JobMetadata struct {
	EntryID    cron.EntryID `json:"entry_id"`
	Wave       int64        `json:"wave"`
	TotalWave  int64        `json:"total_wave"`
	IsLastWave bool         `json:"is_last_wave"`
}

func GetJobMetadata

func GetJobMetadata(ctx context.Context) (JobMetadata, bool)

GetJobMetadata returns job metadata from current context, and status if it exists or not.

type Manager added in v1.1.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager controls all the underlying job.

func NewManager added in v1.1.0

func NewManager(opts ...Option) *Manager

NewManager create a command controller with a specific config.

func (*Manager) GetEntries added in v1.1.0

func (m *Manager) GetEntries() []cron.Entry

GetEntries returns all the current registered jobs.

func (*Manager) GetEntry added in v1.1.0

func (m *Manager) GetEntry(id cron.EntryID) *cron.Entry

GetEntry returns a snapshot of the given entry, or nil if it couldn't be found.

func (*Manager) GetHistoryData added in v1.6.0

func (m *Manager) GetHistoryData(
	ctx context.Context,
	req *Request,
) (HistoryPageData, error)

GetHistoryData returns run histories for history page.

func (*Manager) GetInfo added in v1.1.0

func (m *Manager) GetInfo() map[string]interface{}

GetInfo returns command controller basic information.

func (*Manager) GetStatusData added in v1.1.0

func (m *Manager) GetStatusData(sortQuery string) StatusPageData

GetStatusData returns all jobs status for status page.

func (*Manager) Remove added in v1.1.0

func (m *Manager) Remove(id cron.EntryID)

Remove removes a specific job from running. Get EntryID from the list job entries manager.GetEntries(). If job is in the middle of running, once the process is finished it will be removed.

func (*Manager) Schedule added in v1.1.0

func (m *Manager) Schedule(spec string, job JobItf) error

Schedule sets a job to run at specific time. Example:

@every 5m
0 */10 * * * * => every 10m

func (*Manager) ScheduleFunc added in v1.2.0

func (m *Manager) ScheduleFunc(spec, name string, cmd func(ctx context.Context) error) error

ScheduleFunc adds a func to the Cron to be run on the given schedule.

func (*Manager) Schedules added in v1.1.0

func (m *Manager) Schedules(spec, separator string, job JobItf) error

Schedules sets a job to run multiple times at specific time. Symbol */,-? should never be used as separator character. These symbols are reserved for cron specification.

Example:

Spec		: "0 0 1 * * *#0 0 2 * * *#0 0 3 * * *
Separator	: "#"
This input schedules the job to run 3 times.

func (*Manager) SchedulesFunc added in v1.2.0

func (m *Manager) SchedulesFunc(
	spec, separator, name string,
	cmd func(ctx context.Context) error,
) error

SchedulesFunc adds a func to the Cron to be run on the given schedules.

func (*Manager) Start added in v1.1.0

func (m *Manager) Start()

Start starts jobs from running at the next scheduled time.

func (*Manager) Stop added in v1.1.0

func (m *Manager) Stop()

Stop stops active jobs from running at the next scheduled time.

type Option added in v1.1.0

type Option func(*Manager)

Option represents a modification to the default behavior of the manager.

func WithAlerter added in v1.10.0

func WithAlerter(client AlerterItf) Option

WithAlerter determines the alerter used to send notification for high latency job run detected.

func WithAutoStartDisabled added in v1.2.0

func WithAutoStartDisabled() Option

WithAutoStartDisabled prevent the cron job from actually running.

func WithInterceptor added in v1.1.0

func WithInterceptor(interceptors ...Interceptor) Option

WithInterceptor specifies Job wrappers to apply to all jobs added to this cron.

func WithLocation added in v1.1.0

func WithLocation(loc *time.Location) Option

WithLocation overrides the timezone of the cron instance.

func WithLowPriorityDownJobs added in v1.3.0

func WithLowPriorityDownJobs() Option

WithLowPriorityDownJobs puts the down jobs at the bottom of the list.

func WithParser added in v1.1.0

func WithParser(p cron.ScheduleParser) Option

WithParser overrides the parser used for interpreting job schedules.

func WithStorage added in v1.6.0

func WithStorage(client storage.Client) Option

WithStorage determines the reader and writer for historical data.

type Request added in v1.7.0

type Request struct {

	// Sort of the resources in the response e.g. sort=id:desc,created_at:desc
	// Sort is optional.
	Sort string `query:"sort"           form:"sort"           json:"sort"           xml:"sort"`
	// Limit number of results per call.
	// Limit is optional.
	Limit int `query:"limit"          form:"limit"          json:"limit"          xml:"limit"`
	// StartingAfter is a cursor for use in pagination.
	// StartingAfter is a resource ID that defines your place in the list.
	// StartingAfter is optional.
	StartingAfter *int64 `query:"starting_after" form:"starting_after" json:"starting_after" xml:"starting_after"`
	// EndingBefore is cursor for use in pagination.
	// EndingBefore is a resource ID that defines your place in the list.
	// EndingBefore is optional.
	EndingBefore *int64 `query:"ending_before"  form:"ending_before"  json:"ending_before"  xml:"ending_before"`
	// contains filtered or unexported fields
}

Request is a parameter to return list of data with pagination. Request is optional, most fields automatically filled by system. If you already have a response with pagination, you can generate pagination request directly to traverse next or prev page.

func (*Request) QueryParams added in v1.7.0

func (r *Request) QueryParams() map[string]string

func (*Request) URI added in v1.7.0

func (r *Request) URI(req *url.URL) *string

func (*Request) Validate added in v1.7.0

func (r *Request) Validate() error

type Response added in v1.7.0

type Response struct {
	Sort          string  `query:"sort"           form:"sort"           json:"sort"           xml:"sort"`
	StartingAfter *int64  `query:"starting_after" form:"starting_after" json:"starting_after" xml:"starting_after"`
	EndingBefore  *int64  `query:"ending_before"  form:"ending_before"  json:"ending_before"  xml:"ending_before"`
	Total         int     `query:"total"          form:"total"          json:"total"          xml:"total"`
	Yielded       int     `query:"yielded"        form:"yielded"        json:"yielded"        xml:"yielded"`
	Limit         int     `query:"limit"          form:"limit"          json:"limit"          xml:"limit"`
	PreviousURI   *string `query:"previous_uri"   form:"previous_uri"   json:"previous_uri"   xml:"previous_uri"`
	NextURI       *string `query:"next_uri"       form:"next_uri"       json:"next_uri"       xml:"next_uri"`
	// CursorRange returns cursors for starting after and ending before.
	// Format: [starting_after, ending_before].
	CursorRange []int64 `query:"cursor_range"   form:"cursor_range"   json:"cursor_range"   xml:"cursor_range"`
}

func (*Response) HasNextPage added in v1.7.0

func (r *Response) HasNextPage() bool

HasNextPage returns true if next page exists and can be traversed.

func (*Response) HasPrevPage added in v1.7.0

func (r *Response) HasPrevPage() bool

HasPrevPage returns true if prev page exists and can be traversed.

func (*Response) NextPageCursor added in v1.7.0

func (r *Response) NextPageCursor() *int64

NextPageCursor returns cursor to be used as starting after value.

func (*Response) NextPageRequest added in v1.7.0

func (r *Response) NextPageRequest() *Request

NextPageRequest returns pagination request for the next page result.

func (*Response) PrevPageCursor added in v1.7.0

func (r *Response) PrevPageCursor() *int64

PrevPageCursor returns cursor to be used as ending before value.

func (*Response) PrevPageRequest added in v1.7.0

func (r *Response) PrevPageRequest() *Request

PrevPageRequest returns pagination request for the prev page result.

type ServerController

type ServerController struct {
	// Manager controls all the underlying job.
	Manager *Manager
}

ServerController is http server controller.

func (*ServerController) APIHistories added in v1.6.0

func (c *ServerController) APIHistories(ctx echo.Context) error

APIHistories returns run histories as json.

func (*ServerController) APIJobs

func (c *ServerController) APIJobs(ctx echo.Context) error

APIJobs returns job status as json.

func (*ServerController) HealthCheck

func (c *ServerController) HealthCheck(ctx echo.Context) error

HealthCheck returns server status.

func (*ServerController) Histories added in v1.8.0

func (c *ServerController) Histories(ctx echo.Context) error

Histories return job history as frontend template.

func (*ServerController) Jobs

func (c *ServerController) Jobs(ctx echo.Context) error

Jobs return job status as frontend template.

type StatusCode

type StatusCode string

StatusCode describes current job status.

const (
	// StatusCodeUp describes that current job has just been created.
	StatusCodeUp StatusCode = "UP"
	// StatusCodeSuccess describes that current job is waiting for next execution time.
	StatusCodeSuccess StatusCode = "SUCCESS"
	// StatusCodeRunning describes that current job is currently running.
	StatusCodeRunning StatusCode = "RUNNING"
	// StatusCodeDown describes that current job has failed to be registered.
	StatusCodeDown StatusCode = "DOWN"
	// StatusCodeError describes that last run has failed.
	StatusCodeError StatusCode = "ERROR"
)

func (StatusCode) String added in v1.6.0

func (s StatusCode) String() string

type StatusData

type StatusData struct {
	// ID is unique per job.
	ID cron.EntryID `json:"id"`
	// Job defines current job.
	Job *Job `json:"job"`
	// Next defines the next schedule to execute current job.
	Next time.Time `json:"next"`
	// Prev defines the last run of the current job.
	Prev time.Time `json:"prev"`
}

StatusData defines current job status.

type StatusPageData added in v1.5.0

type StatusPageData struct {
	Data []StatusData    `json:"data"`
	Sort pagination.Sort `json:"sort"`
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL