core

package
v0.6.121 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2022 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Overview

Package core This is core package for the scheduler which includes

  • scheduler interface
  • scheduler implementation using gocron https://github.com/robfig/cron
  • updater which updates the schedules in the scheduler by reading periodically from the DB
  • snapshot runner which snapshot the schedules with there last exec times so that it can be used as check point in case of a crash. After a crash the scheduler replays the schedules from the last recorded snapshot. It relies on the admin idempotency aspect to fail executions if the execution with a scheduled time already exists with it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetCatchUpTimes

func GetCatchUpTimes(s models.SchedulableEntity, from time.Time, to time.Time) ([]time.Time, error)

GetCatchUpTimes find list of timestamps to be caught up on for schedule s from fromTime to toTime

func GetScheduledTime

func GetScheduledTime(s models.SchedulableEntity, fromTime time.Time) (time.Time, error)

GetScheduledTime find next schedule time for both cron and fixed rate scheduled entity given the fromTime

Types

type GoCronJob

type GoCronJob struct {
	// contains filtered or unexported fields
}

GoCronJob this provides a wrapper around the go cron libraries job function.

func (*GoCronJob) Run

func (g *GoCronJob) Run(t time.Time)

type GoCronScheduler

type GoCronScheduler struct {
	// contains filtered or unexported fields
}

GoCronScheduler this provides a scheduler functionality using the https://github.com/robfig/cron library.

func (*GoCronScheduler) AddCronJob

func (g *GoCronScheduler) AddCronJob(ctx context.Context, job *GoCronJob) error

AddCronJob adds the job from the cron store

func (*GoCronScheduler) AddFixedIntervalJob

func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJob) error

AddFixedIntervalJob adds the fixes interval job to the job store.

func (*GoCronScheduler) BootStrapSchedulesFromSnapShot

func (g *GoCronScheduler) BootStrapSchedulesFromSnapShot(ctx context.Context, schedules []models.SchedulableEntity,
	snapshot snapshoter.Snapshot)

BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions

func (*GoCronScheduler) CalculateSnapshot

func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snapshot

CalculateSnapshot creates a snapshot of the existing state of the schedules run by the scheduler which can be used in case of failure.

func (*GoCronScheduler) CatchUpSingleSchedule

func (g *GoCronScheduler) CatchUpSingleSchedule(ctx context.Context, s models.SchedulableEntity, fromTime time.Time, toTime time.Time) error

CatchUpSingleSchedule catches up the schedule s from fromTime to toTime

func (*GoCronScheduler) CatchupAll

func (g *GoCronScheduler) CatchupAll(ctx context.Context, until time.Time) bool

CatchupAll catches up all the schedules from the jobStore to until time

func (*GoCronScheduler) DeScheduleJob

func (g *GoCronScheduler) DeScheduleJob(ctx context.Context, schedule models.SchedulableEntity)

DeScheduleJob allows to remove a scheduled job using the implemented scheduler

func (*GoCronScheduler) GetTimedFuncWithSchedule

func (g *GoCronScheduler) GetTimedFuncWithSchedule() TimedFuncWithSchedule

GetTimedFuncWithSchedule returns the job function with scheduled time parameter

func (*GoCronScheduler) RemoveCronJob

func (g *GoCronScheduler) RemoveCronJob(ctx context.Context, job *GoCronJob)

RemoveCronJob removes the job from the cron store

func (*GoCronScheduler) RemoveFixedIntervalJob

func (g *GoCronScheduler) RemoveFixedIntervalJob(ctx context.Context, job *GoCronJob)

RemoveFixedIntervalJob removes the fixes interval job from the job store.

func (*GoCronScheduler) ScheduleJob

func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity,
	funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error

ScheduleJob allows to schedule a job using the implemented scheduler

func (*GoCronScheduler) UpdateSchedules

func (g *GoCronScheduler) UpdateSchedules(ctx context.Context, schedules []models.SchedulableEntity)

UpdateSchedules updates all the schedules in the schedulers job store

type Scheduler

type Scheduler interface {
	// ScheduleJob allows to schedule a job using the implemented scheduler
	ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastT *time.Time) error
	// DeScheduleJob allows to remove a scheduled job using the implemented scheduler
	DeScheduleJob(ctx context.Context, s models.SchedulableEntity)
	// BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions
	BootStrapSchedulesFromSnapShot(ctx context.Context, schedules []models.SchedulableEntity, snapshot snapshoter.Snapshot)
	// UpdateSchedules updates all the schedules in the schedulers job store
	UpdateSchedules(ctx context.Context, s []models.SchedulableEntity)
	// CalculateSnapshot creates a snapshot of the existing state of the schedules run by the scheduler which can be used in case of failure.
	CalculateSnapshot(ctx context.Context) snapshoter.Snapshot
	// CatchupAll catches up all the schedules in the schedulers job store to the until time
	CatchupAll(ctx context.Context, until time.Time) bool
}

Scheduler is the main scheduler interfaces for scheduling/descheduling jobs, updating the schedules, calculating snapshot of the schedules , bootstrapping the scheduler from the snapshot as well as the catcup functionality

func NewGoCronScheduler

func NewGoCronScheduler(ctx context.Context, schedules []models.SchedulableEntity, scope promutils.Scope,
	snapshot snapshoter.Snapshot, rateLimiter *rate.Limiter, executor executor.Executor) Scheduler

type Snapshotrunner

type Snapshotrunner struct {
	// contains filtered or unexported fields
}

Snapshotrunner allows the ability to snapshot the scheduler state and save it to the db. Its invoked periodically from the scheduledExecutor

func NewSnapshotRunner

func NewSnapshotRunner(snapshoter sImpl.Persistence, scheduler Scheduler) Snapshotrunner

func (Snapshotrunner) Run

func (u Snapshotrunner) Run(ctx context.Context)

type TimedFuncWithSchedule

type TimedFuncWithSchedule func(ctx context.Context, s models.SchedulableEntity, t time.Time) error

type Updater

type Updater struct {
	// contains filtered or unexported fields
}

Updater this updates the scheduler with the latest state of schedules from the DB.

func (Updater) UpdateGoCronSchedules

func (u Updater) UpdateGoCronSchedules(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL