gokugen

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

gokugen Godoc

go+刻限(kokugen)

Gokugen is in-memory scheduler and middleware applicator for it.

Term 刻限(kokugen) is japanese word equivalent of appointed time, scheduled time, or due.

Caveats

  • Do not use untested packages.
  • This is not stable release. Public APIs may or may not change without any notification.

Idea

The idea is based on this article(written in japanese).

./scheduler contains similar but modified implementation.

Differences
  • It removes cancelled tasks from min-heap at every one minute.
  • It passes an instance of context.Context to a task, which would be Done if scheduler is ended and/or the task is cancelled.
  • It has a countermeasurement for abnormally-returned work (i.e. calling runtime.Goexit or panicking). But not tested yet!
  • Task cancellations are controlled by Cancel method of a struct instance returned from Schedule.
  • Cancellation of scheduler is controlled by context.Context.
Additonal Properties

See below packages section.

Architecture

simplified architecture.

simplified_architecture

TODO

  • Reimplement funtionality
    • in-memory shceduler
    • single node task storage middleware
    • cron-like interface
  • Implement multi node task storage middleware
  • Refactoring
  • example package
  • Add detailed careful test.

Packages

./scheduler

Scheduler is in-memory scheduler.

With WorkerPool, scheduler limits how many tasks can be concurrently worked on. And with min-heap backed TaskQueue, task retrieval complexity is O(log n) where n is number of currently scheduled task.

See ./example/simple/main.go for exmpale usage.

Illustrative code sample
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ngicks/gokugen/scheduler"
)

func main() {
	// 1st arg is worker num inittially created
	// 2nd arg is max of internal min-heap. 0 is unlimited.
	var initialWorkerNun, queueMax uint
	sched := scheduler.NewScheduler(initialWorkerNun, queueMax)

	ctx, cancel := context.WithCancel(context.Background())
	// Start starts scheduler.
	go sched.Start(ctx)

	var then time.Time
	var work scheduler.WorkFn
	task := scheduler.NewTask(then, work)
	controller, err := sched.Schedule(task)
	if err != nil {
		// scheduling failed.
		// Queue max or trying to schedule after scheduler already ended.
		panic(err)
	}

	// You can check if task is cancelled
	controller.IsCancelled()
	// You can check if task is done
	controller.IsDone()
	// You can cancel
	cancelled := controller.Cancel()
	if !cancelled {
		fmt.Println("task is already cancelled")
	}

	// You can check how many workers are actively working on task.
	fmt.Println(sched.ActiveWorkerNum())
	// You can increase the number of Workers in WorkerPool
	sched.AddWorker(5)
	// You can decrease the number of Workers in WorkerPool.
	sched.RemoveWorker(5)

	// some time later...

	// cancel ctx before calling End.
	cancel()
	// Call End to tear down scheduler and all internal objects.
	// and to wait until all goroutines terminate.
	sched.End()
}
./heap

Min-heap with added Exclude and Peek method.

Used in scheduler as TaskQueue.

./cron

Cron package contains Row, cron row like struct, and rescheduler for Row.

See ./example/cron/main.go for exmpale usage.

Illustrative code sample
package main

import (
	"context"
	"time"

	"github.com/ngicks/gokugen"
	"github.com/ngicks/gokugen/cron"
	"github.com/ngicks/gokugen/scheduler"
)

func main() {
	scheduler := gokugen.NewMiddlewareApplicator(scheduler.NewScheduler(5, 0))

	ctx, cancel := context.WithCancel(context.Background())
	defer func() {
		cancel()
		scheduler.Scheduler().End()
	}()
	go scheduler.Scheduler().Start(ctx)

	// Do command every year, Jan, Feb, Mar, every day, 12:30.
	row := cron.Builder{}.
		Month(1, 2, 3).
		Day().
		Hour(12).
		Minute(30).
		Command([]string{"command"}).
		Build()
	// whence is time when scheduling target starts from.
	var whence time.Time
	// reshedule occurs if shouldReschedule returns true.
	var shouldReschedule func(workErr error, callCount int) bool
	// workRegistry is used to retrieve work function associated to command
	var workRegisry interface {
		Load(key string) (value cron.WorkFnWParam, ok bool)
	}
	controller := cron.NewCronLikeRescheduler(
		row,
		whence,
		shouldReschedule,
		scheduler,
		workRegisry,
	)

	// Scheduling starts.
	// After task is done, row will be recheduled for next time matched to row's configuration.
	err := controller.Schedule()
	if err != nil {
		// somehow Schedule error
		panic(err)
	}

	// some time later...

	// Cancell cancells current task and rescheduling.
	controller.Cancel()
}
./task_storage

TaskStorage provides middlewares that stores task information to external persistent data storage.

See ./example/persistent_shceduler/main.go for example usage.

Illustrative code sample
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ngicks/gokugen"
	"github.com/ngicks/gokugen/scheduler"
	taskstorage "github.com/ngicks/gokugen/task_storage"
)

func main() {
	scheduler := gokugen.NewMiddlewareApplicator(scheduler.NewScheduler(5, 0))

	// Repository interface.
	// External data storage is manipulated through this interface.
	var repository taskstorage.RepositoryUpdater
	// When Sync-ing, this cb is used to determine task should be restored
	// and re-scheduled in internal scheduler.
	// (e.g. ignore tasks if they are too old and overdue.)
	var shouldRestore func(taskstorage.TaskInfo) bool
	// workRegistry is used to retrieve work function associated to WorkId.
	// Using `impl/workregistry`.ParamUnmarshaller is safe for almost all users.(untested)
	var workRegisry interface {
		Load(key string) (value taskstorage.WorkFnWParam, ok bool)
	}
	// Context wrapper applicator function used in Sync.
	// In Sync newly created ctx is used to reschedule.
	// So without this function context wrapper
	// that should be applied in upper user code is totally ignored.
	var syncCtxWrapper func(gokugen.SchedulerContext) gokugen.SchedulerContext

	taskStorage := taskstorage.NewSingleNodeTaskStorage(
		repository,
		shouldRestore,
		workRegisry,
		syncCtxWrapper,
	)

	// Correct usage is as middleware.
	scheduler.Use(taskStorage.Middleware(true)...)

	// Sync syncs itnernal state with external.
	// Normally TaskStorage does it reversely through middlewares,
	// mirroring internal state to external data storage.
	// But after rebooting system, or repository is changed externally,
	// Sync is needed to fetch back external data.
	rescheduled, schedulingErr, err := taskStorage.Sync(scheduler.Schedule)
	if err != nil {
		panic(err)
	}

	for taskId, taskController := range rescheduled {
		fmt.Printf(
			"id = %s, is scheduled for = %s\n",
			taskId,
			taskController.GetScheduledTime().Format(time.RFC3339Nano),
		)
	}
	for taskId, schedulingErr := range schedulingErr {
		fmt.Printf("id = %s, err = %s\n", taskId, schedulingErr)
	}

	ctx, cancel := context.WithCancel(context.Background())
	go scheduler.Scheduler().Start(ctx)

	var scheduleTarget time.Time
	task, err := scheduler.Schedule(
		// To store correct data to external repository,
		// WorkId, Param is additionally needed.
		gokugen.BuildContext(
			scheduleTarget,
			nil,
			nil,
			gokugen.WithWorkId("func1"),
			gokugen.WithParam([]string{"param", "param"}),
		),
	)
	if err != nil {
		panic(err)
	}

	// This is wrapped scheduler.TaskController.
	task.IsCancelled()

	// some time later...

	// cancel ctx and tear down scheduler.
	cancel()
	scheduler.Scheduler().End()
}
./example

example contains some example executables.

./impl

impl contains some helper implementations.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrValueNotFound = errors.New("value not found")
)

Functions

func GetParam

func GetParam(ctx SchedulerContext) (any, error)

func GetTaskId

func GetTaskId(ctx SchedulerContext) (string, error)

GetTaskId gets task id from ctx. This may be heavy or cause error. If task id is not set, GetTaskId returns a wrapped ErrValueNotFound.

func GetWorkId

func GetWorkId(ctx SchedulerContext) (string, error)

GetWorkId gets task id from ctx. This may be heavy or cause error. If work id is not set, GetWorkId returns a wrapped ErrValueNotFound.

Types

type MiddlewareApplicator

type MiddlewareApplicator[T Scheduler] struct {
	// contains filtered or unexported fields
}

func NewMiddlewareApplicator

func NewMiddlewareApplicator[T Scheduler](scheduler T) *MiddlewareApplicator[T]

func (*MiddlewareApplicator[T]) Schedule

func (s *MiddlewareApplicator[T]) Schedule(ctx SchedulerContext) (Task, error)

Schedule schedules ctx to inner scheduler with middlewares applied. Middlewares will be called in first-in-first-applied order.

func (*MiddlewareApplicator[T]) Scheduler

func (ma *MiddlewareApplicator[T]) Scheduler() T

Scheduler is getter of inner sheculer.

func (*MiddlewareApplicator[T]) Use

func (s *MiddlewareApplicator[T]) Use(mw ...MiddlewareFunc)

Use registers MiddlewareFunc. First registered one will be invoked first.

type MiddlewareFunc

type MiddlewareFunc = func(handler ScheduleHandlerFn) ScheduleHandlerFn

type Option

type Option func(ctx SchedulerContext) SchedulerContext

func WithParam

func WithParam(param any) Option

func WithParamLoader

func WithParamLoader(loader func() (any, error)) Option

func WithTaskId

func WithTaskId(taskId string) Option

func WithWorkFn

func WithWorkFn(workFn WorkFn) Option

func WithWorkFnWrapper

func WithWorkFnWrapper(wrapper WorkFnWrapper) Option

func WithWorkId

func WithWorkId(workId string) Option

type PlainContext

type PlainContext struct {
	// contains filtered or unexported fields
}

PlainContext is intended to be a base context of SchedulerContext.

func (*PlainContext) ScheduledTime

func (ctx *PlainContext) ScheduledTime() time.Time

func (*PlainContext) Value

func (ctx *PlainContext) Value(key any) (any, error)

func (*PlainContext) Work

func (ctx *PlainContext) Work() WorkFn

type ScheduleHandlerFn

type ScheduleHandlerFn = func(ctx SchedulerContext) (Task, error)

type Scheduler

type Scheduler interface {
	Schedule(task *scheduler.Task) (*scheduler.TaskController, error)
}

type SchedulerContext

type SchedulerContext interface {
	ScheduledTime() time.Time
	Work() WorkFn
	Value(key any) (any, error)
}

SchedulerContext is minimal set of data relevant to scheduling and middlewares.

func BuildContext

func BuildContext(scheduledTime time.Time, workFn WorkFn, valMap map[any]any, options ...Option) SchedulerContext

BuildContext builds a new SchedulerContext. workFn, valMap can be nil. Add options to set additional values to the ctx.

func NewPlainContext

func NewPlainContext(scheduledTime time.Time, workFn WorkFn, values map[any]any) SchedulerContext

NewPlainContext creates a new PlainContext instance. But recommendation here is to use BuildContext instead.

func WrapContext

func WrapContext(parent SchedulerContext, options ...Option) (ctx SchedulerContext)

WrapContext wrapps parent with options.

func WrapWithParam

func WrapWithParam(parent SchedulerContext, param any) SchedulerContext

func WrapWithParamLoader

func WrapWithParamLoader(parent SchedulerContext, loader func() (any, error)) SchedulerContext

func WrapWithTaskId

func WrapWithTaskId(parent SchedulerContext, taskId string) SchedulerContext

func WrapWithWorkFn

func WrapWithWorkFn(parent SchedulerContext, workFn WorkFn) SchedulerContext

func WrapWithWorkFnWrapper

func WrapWithWorkFnWrapper(parent SchedulerContext, wrapper WorkFnWrapper) SchedulerContext

func WrapWithWorkId

func WrapWithWorkId(parent SchedulerContext, workId string) SchedulerContext

type Task

type Task interface {
	Cancel() (cancelled bool)
	CancelWithReason(err error) (cancelled bool)
	GetScheduledTime() time.Time
	IsCancelled() bool
	IsDone() bool
}

type WorkFn

type WorkFn = func(taskCtx context.Context, scheduled time.Time) (any, error)

type WorkFnWParam

type WorkFnWParam = func(taskCtx context.Context, scheduled time.Time, param any) (any, error)

type WorkFnWrapper

type WorkFnWrapper = func(self SchedulerContext, workFn WorkFn) WorkFn

Directories

Path Synopsis
Package mock_gokugen is a generated GoMock package.
Package mock_gokugen is a generated GoMock package.
__mock
Package mock_common is a generated GoMock package.
Package mock_common is a generated GoMock package.
__mock
Package mock_cron is a generated GoMock package.
Package mock_cron is a generated GoMock package.
example
impl
middleware
log
log/__mock
Package mock_log is a generated GoMock package.
Package mock_log is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL