etcdcron

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2024 License: MIT Imports: 26 Imported by: 1

README

Go Etcd Cron

This package implements a distributed and fault tolerant cron scheduler, using etcd as a backend. It is designed to be used in a distributed environment (such as Kubernetes), where multiple instances of the same process can run concurrently acting on a shared set of cron jobs.

Jobs are scheduled on a distributed queue, where only one of the instances will trigger the job. This ensures that the job is executed only (at least*) once. Multiple instances share the load of triggering jobs.

Jobs can be "one shot" or recurring. Recurring jobs can have a TTL, a delayed start, expiry time, and a maximum number of runs.

Getting started

import etcdcron "github.com/diagridio/go-etcd-cron"
import "github.com/diagridio/go-etcd-cron/api"

cron, err := etcdcron.New(Options{
  Client:         client,
  Namespace:      "abc",
  PartitionID:    0,
  PartitionTotal: 1,
  TriggerFn: func(context.Context, *api.TriggerRequest) bool {
    // Do something with your trigger here.
    // Return true if the trigger was successful, false otherwise.
    // Note, returning false will cause the job to be retried *immediately*.
  	return true
  },
})
if err != nil {
  panic(err)
}

// TODO: Pass proper context and do something with returned error.
go cron.Run(context.Background())

payload, _ := anypb.New(wrapperspb.String("hello"))
meta, _ := anypb.New(wrapperspb.String("world"))
tt := time.Now().Add(time.Second).Format(time.RFC3339)

cron.Add(ctx, "my-job", &api.Job{
  DueTime:  &tt,
  Payload:  payload,
  Metadata: meta,
}

API

Cron supports Add, Get, and Delete operations which are indexed on the job name.

A Job itself is made up of the following fields:

  • Schedule: A cron (repeated) expression that defines when the job should be triggered. Accepts a systemd cron like expression (* 30 9 * * 1-5) or an every expression (@every 5m). For more info see ./proto/job.proto. Optional if DueTime is set.
  • DueTime: A "point in time" string representing when the job schedule should start from, or the "one shot" time if other scheduling type fields are not provided. Accepts a "point in time" string in the format of RFC3339, Go duration string (therefore calculated from now), or non-repeating ISO8601 duration. Optional if Schedule is set.
  • TTL: Another "point in time" string representing when the job should expire. Must be greater than DueTime if both are set. Optional.
  • Repeats: The number of times the job should be triggered. Must be greater than 0 if set. Optional.
  • Metadata: A protobuf Any message that can be used to store any additional information about the job which will be passed to the trigger function. Optional.
  • Payload: A protobuf Any message that can be used to store the main payload of the job which will be passed to the trigger function. Optional.

A job must have at least either a Schedule or a DueTime set.

Leadership

The cron scheduler uses a partition key ownership model to ensure that only one partition instance of the scheduler is running at any given time. At boot, the replica attempts to claim its partition key. If it is successful, it will ensure that there are no other schedulers running with a different partition total value. Once both pass, the scheduler will begin to process jobs.

It is paramount that all replicas have the same partition total value. If this is not the case, the scheduler will not start until leadership of partitions with differnet partition total values are resolved.

Leadership keys are associated with an ETCD lease of 20s TTL to prevent stale leadership keys persisting forever in the event of an (unlikely) crash.

Counter

An associated counters key is used to track the current state of a job that is scheduled. It includes the last trigger time (if triggered), the number of times the job has been triggered, and the Partition ID of the associated job with the same name. Counters are lazily deleted in bulk by a garbage collector that runs every 180s in an effort to reduce pressure of jobs triggering.

The scheduler will never miss triggering jobs. If the scheduler falls behind in time (for example, due to downtime), it will catch up and trigger all jobs that were missed in immediate succession. A single Job schedule will only trigger one after the other, waiting for the response before scheduling the next.

Testing

go test --race -v ./...

History

This is a fork of https://github.com/Scalingo/go-etcd-cron, which had been based on https://github.com/robfig/cron.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Interface

type Interface interface {
	// Run is a blocking function that runs the cron instance. It will return an
	// error if the instance is already running.
	// Returns when the given context is cancelled, after doing all cleanup.
	Run(ctx context.Context) error

	// Add adds a job to the cron instance.
	Add(ctx context.Context, name string, job *api.Job) error

	// Get gets a job from the cron instance.
	Get(ctx context.Context, name string) (*api.Job, error)

	// Delete deletes a job from the cron instance.
	Delete(ctx context.Context, name string) error
}

Interface is a cron interface. It schedules and manages job which are stored and informed from ETCD. It uses a trigger function to call when a job is triggered. Jobs may be oneshot or recurring. Recurring jobs are scheduled to run at their next scheduled time. Oneshot jobs are scheduled to run once and are removed from the schedule after they are triggered.

func New

func New(opts Options) (Interface, error)

New creates a new cron instance.

type Options

type Options struct {
	// Log is the logger to use for logging.
	Log logr.Logger

	// Client is the etcd client to use for storing cron entries.
	Client *clientv3.Client

	// Namespace is the etcd namespace to use for storing cron entries.
	Namespace string

	// PartitionID is the partition ID to use for storing cron entries.
	PartitionID uint32

	// PartitionTotal is the total number of partitions to use for storing cron
	// entries.
	PartitionTotal uint32

	// TriggerFn is the function to call when a cron job is triggered.
	TriggerFn TriggerFunction
}

Options are the options for creating a new cron instance.

type TriggerFunction

type TriggerFunction func(context.Context, *api.TriggerRequest) bool

TriggerFunction is a function that is called when a cron job is triggered. Returning false will cause the job to be re-enqueued and triggered immediately.

Directories

Path Synopsis
internal
informer
Package mirror implements etcd mirroring operations.
Package mirror implements etcd mirroring operations.
key

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL