scheduler

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgTypeReactrJobErr    = "reactr.joberr" // any kind of error from a job run
	MsgTypeReactrRunErr    = "reactr.runerr" // specifically a RunErr returned from a Wasm Runnable
	MsgTypeReactrResult    = "reactr.result"
	MsgTypeReactrNilResult = "reactr.nil"
)

MsgTypeReactrJobErr and others are Grav message types used for Scheduler job

Variables

View Source
var (
	ErrJobTimeout = errors.New("job timeout")
)

ErrJobTimeout and others are errors related to workers

Functions

This section is empty.

Types

type ChangeEvent

type ChangeEvent int

ChangeEvent represents a change relevant to a worker

const (
	ChangeTypeStart ChangeEvent = iota
	ChangeTypeStop  ChangeEvent = iota
)

ChangeTypeStart and others represent types of changes

type Ctx

type Ctx struct {
	Context context.Context
	// contains filtered or unexported fields
}

Ctx is a Job context

func (*Ctx) AddVar

func (c *Ctx) AddVar(name, value string)

AddVar adds an FFI variable to the context

func (*Ctx) Do

func (c *Ctx) Do(job Job) *Result

Do runs a new job

func (*Ctx) HasFFIResult

func (c *Ctx) HasFFIResult() bool

HasFFIResult returns true if the Ctx has a current FFI result

func (*Ctx) SetFFIResult

func (c *Ctx) SetFFIResult(result []byte, err error) (*FFIResult, error)

func (*Ctx) UseFFIResult

func (c *Ctx) UseFFIResult() (*FFIResult, error)

func (*Ctx) UseVars

func (c *Ctx) UseVars() ([]FFIVariable, error)

UseVars returns the list of variables that the Wasm module has set on this Ctx. They are ordered and named. Since the variables can only be used by one host call, they are cleared after being returned.

type DoFunc

type DoFunc func(Job) *Result

DoFunc describes a function to schedule work

type FFIResult

type FFIResult struct {
	Result []byte
	Err    error
}

FFIResult is the results of an FFI host function call

func (*FFIResult) FFISize

func (r *FFIResult) FFISize() int32

FFISize returns the "size" of the result (positive int32 for a successful result, negative for error result)

type FFIVariable

type FFIVariable struct {
	Name  string
	Value interface{}
}

FFIVariable is a variable that a Wasm Runnable can store host-side to be used in a host call such as a DB query. They are both ordered AND named, stored on the instance itself.

type Group

type Group struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Group represents a group of job results

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group

func (*Group) Add

func (g *Group) Add(result *Result)

Add adds a job result to the group

func (*Group) Wait

func (g *Group) Wait() error

Wait waits for all results to come in and returns an error if any arise

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job describes a job to be done

func NewJob

func NewJob(jobType string, data interface{}) Job

NewJob creates a new job

func (Job) Bytes

func (j Job) Bytes() []byte

Bytes returns the []byte value of the job's data

func (Job) Data

func (j Job) Data() interface{}

Data returns the "raw" data for the job

func (Job) Int

func (j Job) Int() int

Int returns the int value of the job's data

func (Job) Req

func (j Job) Req() *request.CoordinatedRequest

Req returns the Coordinated request attached to the Job

func (Job) String

func (j Job) String() string

String returns the string value of a job's data

func (Job) UUID

func (j Job) UUID() string

func (Job) Unmarshal

func (j Job) Unmarshal(target interface{}) error

Unmarshal unmarshals the job's data into a struct

type JobFunc

type JobFunc func(interface{}) *Result

JobFunc is a function that runs a job of a predetermined type

type Option

type Option func(workerOpts) workerOpts

Option is a function that modifies workerOpts

func Autoscale

func Autoscale(max int) Option

Autoscale returns an Option that enables autoscaling and sets the max number of threads

func MaxRetries

func MaxRetries(count int) Option

MaxRetries returns an Option to set the worker maximum retry count

func PoolSize

func PoolSize(size int) Option

PoolSize returns an Option to set the worker pool size

func PreWarm

func PreWarm() Option

PreWarm sets the worker to pre-warm itself to minimize cold start time. if not enabled, worker will "warm up" when it receives its first job.

func RetrySeconds

func RetrySeconds(secs int) Option

RetrySeconds returns an Option to set the worker retry seconds

func TimeoutSeconds

func TimeoutSeconds(timeout int) Option

TimeoutSeconds returns an Option with the job timeout seconds set

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result describes a result

func (*Result) Discard

func (r *Result) Discard()

Discard returns immediately and discards the eventual results and thus prevents the memory from hanging around

func (*Result) Then

func (r *Result) Then() (interface{}, error)

Then returns the result or error from a Result

func (*Result) ThenDo

func (r *Result) ThenDo(do ResultFunc)

ThenDo accepts a callback function to be called asynchronously when the result completes.

func (*Result) ThenInt

func (r *Result) ThenInt() (int, error)

ThenInt returns the result or error from a Result

func (*Result) ThenJSON

func (r *Result) ThenJSON(out interface{}) error

ThenJSON unmarshals the result or returns the error from a Result

func (*Result) UUID

func (r *Result) UUID() string

UUID returns the result/job's UUID

type ResultFunc

type ResultFunc func(interface{}, error)

ResultFunc is a result callback function.

type RunErr

type RunErr struct {
	Code    int    `json:"code,omitempty"`
	Message string `json:"message,omitempty"`
}

RunErr represents an error returned from a Wasm Runnable it lives in the rt package to avoid import cycles

func (RunErr) Error

func (r RunErr) Error() string

Error returns the stringified JSON representation of the error

func (RunErr) ToVKErr

func (r RunErr) ToVKErr() vk.Error

ToVKErr converts a RunErr to a VKError

type Runnable

type Runnable interface {
	// Run is the entrypoint for jobs handled by a Runnable
	Run(Job, *Ctx) (interface{}, error)

	// OnChange is called when the worker using the Runnable instance is going to change.
	// OnChange will be called for things like startup and shutdown.
	OnChange(ChangeEvent) error
}

Runnable describes something that is runnable

type ScalerMetrics

type ScalerMetrics struct {
	TotalThreadCount int                      `json:"totalThreadCount"`
	TotalJobCount    int                      `json:"totalJobCount"`
	Workers          map[string]WorkerMetrics `json:"workers"`
}

ScalerMetrics is internal metrics about the scaler

type Schedule

type Schedule interface {
	Check() *Job
	Done() bool
}

Schedule is a type that returns an *optional* job if there is something that should be scheduled. Reactr will poll the Check() method at regular intervals to see if work is available.

func After

func After(seconds int, jobFunc func() Job) Schedule

After returns a schedule that will schedule the job provided by jobFunc one time x seconds after creation

func Every

func Every(seconds int, jobFunc func() Job) Schedule

Every returns a Schedule that will schedule the job provided by jobFunc every x seconds

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler represents the main control object

func New

func New() *Scheduler

New returns a Scheduler ready to accept Jobs

func NewWithLogger

func NewWithLogger(log *vlog.Logger) *Scheduler

NewWithLogger returns a Scheduler with a custom logger

func (*Scheduler) DeRegister

func (r *Scheduler) DeRegister(jobType string) error

DeRegister stops the workers for a given jobType and removes it

func (*Scheduler) Do

func (r *Scheduler) Do(job Job) *Result

Do schedules a job to be worked on and returns a result object

func (*Scheduler) IsRegistered

func (r *Scheduler) IsRegistered(jobType string) bool

IsRegistered returns true if the instance has a worker registered for the given jobType

func (*Scheduler) Job

func (r *Scheduler) Job(jobType string, data interface{}) Job

Job is a shorter alias for NewJob

func (*Scheduler) Listen

func (r *Scheduler) Listen(pod *bus.Pod, msgType string)

Listen causes Scheduler to listen for messages of the given type and trigger the job of the same type. The message's data is passed to the runnable as the job data. The job's result is then emitted as a message. If an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent.

func (*Scheduler) ListenAndRun

func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Message, interface{}, error))

ListenAndRun subscribes Scheduler to a messageType and calls `run` for each job result

func (*Scheduler) Metrics

func (r *Scheduler) Metrics() ScalerMetrics

Metrics returns a snapshot in time describing Scheduler's internals

func (*Scheduler) Register

func (r *Scheduler) Register(jobType string, runner Runnable, options ...Option) JobFunc

Register registers a Runnable with the Scheduler and returns a shortcut function to run those jobs

func (*Scheduler) Schedule

func (r *Scheduler) Schedule(s Schedule)

Schedule adds a new Schedule to the instance, Scheduler will 'watch' the Schedule and Do any jobs when the Schedule indicates it's needed

type WorkerMetrics

type WorkerMetrics struct {
	TargetThreadCount int     `json:"targetThreadCount"`
	ThreadCount       int     `json:"threadCount"`
	JobCount          int     `json:"jobCount"`
	JobRate           float64 `json:"jobRate"`
}

WorkerMetrics is metrics about a worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL