job

package
v0.0.0-...-ff244ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2021 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// WorkerType is the key in JSON for the type of worker
	WorkerType = "worker"
)

Variables

View Source
var (
	// ErrClosed is using a closed system
	ErrClosed = errors.New("jobs: closed")
	// ErrNotFoundJob is used when the job could not be found
	ErrNotFoundJob = errors.New("jobs: not found")
	// ErrQueueClosed is used to indicate the queue is closed
	ErrQueueClosed = errors.New("jobs: queue is closed")
	// ErrUnknownWorker the asked worker does not exist
	ErrUnknownWorker = errors.New("jobs: could not find worker")
	// ErrMessageNil is used for an nil message
	ErrMessageNil = errors.New("jobs: message is nil")
	// ErrMessageUnmarshal is used when unmarshalling a message causes an error
	ErrMessageUnmarshal = errors.New("jobs: message unmarshal")
	// ErrAbort can be used to abort the execution of the job without causing
	// errors.
	ErrAbort = errors.New("jobs: abort")

	// ErrUnknownTrigger is used when the trigger type is not recognized
	ErrUnknownTrigger = errors.New("Unknown trigger type")
	// ErrNotFoundTrigger is used when the trigger was not found
	ErrNotFoundTrigger = errors.New("Trigger with specified ID does not exist")
	// ErrMalformedTrigger is used to indicate the trigger is unparsable
	ErrMalformedTrigger = echo.NewHTTPError(http.StatusBadRequest, "Trigger unparsable")
)

Functions

func AddWorker

func AddWorker(conf *WorkerConfig)

AddWorker adds a new worker to global list of available workers.

func GetWorkersNamesList

func GetWorkersNamesList() []string

GetWorkersNamesList returns the names of the configured workers

func SystemStart

func SystemStart(b Broker, s Scheduler, workersList WorkersList) error

SystemStart initializes and starts the global jobs system with the given broker, scheduler instances and workers list.

Types

type Broker

type Broker interface {
	StartWorkers(workersList WorkersList) error
	ShutdownWorkers(ctx context.Context) error

	// PushJob will push try to push a new job from the specified job request.
	// This method is asynchronous.
	PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error)

	// WorkerQueueLen returns the total element in the queue of the specified
	// worker type.
	WorkerQueueLen(workerType string) (int, error)
	// WorkersTypes returns the list of registered workers types.
	WorkersTypes() []string
}

Broker interface is used to represent a job broker associated to a particular domain. A broker can be used to create jobs that are pushed in the job system.

func NewMemBroker

func NewMemBroker() Broker

NewMemBroker creates a new in-memory broker system.

The in-memory implementation of the job system has the specifity that workers are actually launched by the broker at its creation.

type ErrBadTrigger

type ErrBadTrigger struct {
	Err error
}

ErrBadTrigger is an error conveying the information of a trigger that is not valid, and could be deleted.

func (ErrBadTrigger) Error

func (e ErrBadTrigger) Error() string

type Event

type Event json.RawMessage

Event is a json encoded value of a realtime.Event

func NewEvent

func NewEvent(data *realtime.Event) (Event, error)

NewEvent return a json encoded realtime.Event

func (Event) Unmarshal

func (e Event) Unmarshal(evt interface{}) error

Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.

type EventTrigger

type EventTrigger struct {
	*TriggerInfos
	// contains filtered or unexported fields
}

EventTrigger implements Trigger for realtime triggered events

func NewEventTrigger

func NewEventTrigger(infos *TriggerInfos) (*EventTrigger, error)

NewEventTrigger returns a new instance of EventTrigger given the specified options.

func (*EventTrigger) DocType

func (t *EventTrigger) DocType() string

DocType implements the permission.Matcher interface

func (*EventTrigger) ID

func (t *EventTrigger) ID() string

ID implements the permission.Matcher interface

func (*EventTrigger) Infos

func (t *EventTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*EventTrigger) Match

func (t *EventTrigger) Match(key, value string) bool

Match implements the permission.Matcher interface

func (*EventTrigger) Schedule

func (t *EventTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*EventTrigger) Type

func (t *EventTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*EventTrigger) Unschedule

func (t *EventTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type Job

type Job struct {
	JobID       string      `json:"_id,omitempty"`
	JobRev      string      `json:"_rev,omitempty"`
	Domain      string      `json:"domain"`
	Prefix      string      `json:"prefix,omitempty"`
	WorkerType  string      `json:"worker"`
	TriggerID   string      `json:"trigger_id,omitempty"`
	Message     Message     `json:"message"`
	Event       Event       `json:"event"`
	Manual      bool        `json:"manual_execution,omitempty"`
	Debounced   bool        `json:"debounced,omitempty"`
	Options     *JobOptions `json:"options,omitempty"`
	State       State       `json:"state"`
	QueuedAt    time.Time   `json:"queued_at"`
	StartedAt   time.Time   `json:"started_at"`
	FinishedAt  time.Time   `json:"finished_at"`
	Error       string      `json:"error,omitempty"`
	ForwardLogs bool        `json:"forward_logs,omitempty"`
}

Job contains all the metadata informations of a Job. It can be marshalled in JSON.

func Get

func Get(db prefixer.Prefixer, jobID string) (*Job, error)

Get returns the informations about a job.

func GetJobs

func GetJobs(db prefixer.Prefixer, triggerID string, limit int) ([]*Job, error)

GetJobs returns the jobs launched by the given trigger.

func GetJobsBeforeDate

func GetJobsBeforeDate(db prefixer.Prefixer, date time.Time) ([]*Job, error)

GetJobsBeforeDate returns alls jobs queued before the specified date

func GetLastsJobs

func GetLastsJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)

GetLastsJobs returns the N lasts job of each state for an instance/worker type pair

func GetQueuedJobs

func GetQueuedJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)

GetQueuedJobs returns the list of jobs which states is "queued" or "running"

func NewJob

func NewJob(db prefixer.Prefixer, req *JobRequest) *Job

NewJob creates a new Job instance from a job request.

func (*Job) Ack

func (j *Job) Ack() error

Ack sets the job infos state to Done an sends the new job infos on the channel.

func (*Job) AckConsumed

func (j *Job) AckConsumed() error

AckConsumed sets the job infos state to Running an sends the new job infos on the channel.

func (*Job) Clone

func (j *Job) Clone() couchdb.Doc

Clone implements the couchdb.Doc interface

func (*Job) Create

func (j *Job) Create() error

Create creates the job in couchdb

func (*Job) DBPrefix

func (j *Job) DBPrefix() string

DBPrefix implements the prefixer.Prefixer interface.

func (*Job) DocType

func (j *Job) DocType() string

DocType implements the couchdb.Doc interface

func (*Job) DomainName

func (j *Job) DomainName() string

DomainName implements the prefixer.Prefixer interface.

func (*Job) ID

func (j *Job) ID() string

ID implements the couchdb.Doc interface

func (*Job) Logger

func (j *Job) Logger() *logrus.Entry

Logger returns a logger associated with the job domain

func (*Job) Match

func (j *Job) Match(key, value string) bool

Match implements the permission.Matcher interface

func (*Job) Nack

func (j *Job) Nack(err error) error

Nack sets the job infos state to Errored, set the specified error has the error field and sends the new job infos on the channel.

func (*Job) Rev

func (j *Job) Rev() string

Rev implements the couchdb.Doc interface

func (*Job) SetID

func (j *Job) SetID(id string)

SetID implements the couchdb.Doc interface

func (*Job) SetRev

func (j *Job) SetRev(rev string)

SetRev implements the couchdb.Doc interface

func (*Job) Update

func (j *Job) Update() error

Update updates the job in couchdb

type JobErrorCheckerHook

type JobErrorCheckerHook func(err error) bool

JobErrorCheckerHook is an optional method called at the beginning of the job execution to prevent a retry according to the previous error (specifically useful in the retries loop)

type JobOptions

type JobOptions struct {
	MaxExecCount int           `json:"max_exec_count"`
	MaxExecTime  time.Duration `json:"max_exec_time"`
	Timeout      time.Duration `json:"timeout"`
}

JobOptions struct contains the execution properties of the jobs.

type JobRequest

type JobRequest struct {
	WorkerType  string
	TriggerID   string
	Trigger     Trigger
	Message     Message
	Event       Event
	Manual      bool
	Debounced   bool
	ForwardLogs bool
	Admin       bool
	Options     *JobOptions
}

JobRequest struct is used to represent a new job request.

func (*JobRequest) DocType

func (jr *JobRequest) DocType() string

DocType implements the permission.Matcher interface

func (*JobRequest) ID

func (jr *JobRequest) ID() string

ID implements the permission.Matcher interface

func (*JobRequest) Match

func (jr *JobRequest) Match(key, value string) bool

Match implements the permission.Matcher interface

type JobSystem

type JobSystem interface {
	Broker
	Scheduler
	utils.Shutdowner
}

JobSystem is a pair of broker, scheduler linked together.

func System

func System() JobSystem

System returns the global job system.

type Message

type Message json.RawMessage

Message is a json encoded job message.

func NewMessage

func NewMessage(data interface{}) (Message, error)

NewMessage returns a json encoded data

func (Message) MarshalJSON

func (m Message) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler on Message.

func (Message) Unmarshal

func (m Message) Unmarshal(msg interface{}) error

Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler on Message. It should be retro- compatible with the old Message representation { Data, Type }.

type Scheduler

type Scheduler interface {
	StartScheduler(broker Broker) error
	ShutdownScheduler(ctx context.Context) error
	PollScheduler(now int64) error
	AddTrigger(trigger Trigger) error
	GetTrigger(db prefixer.Prefixer, id string) (Trigger, error)
	DeleteTrigger(db prefixer.Prefixer, id string) error
	GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error)
	CleanRedis() error
	RebuildRedis(db prefixer.Prefixer) error
}

Scheduler interface is used to represent a scheduler that is responsible to listen respond to triggers jobs requests and send them to the broker.

func NewMemScheduler

func NewMemScheduler() Scheduler

NewMemScheduler creates a new in-memory scheduler that will load all registered triggers and schedule their work.

type State

type State string

State represent the state of a job.

const (
	// Queued state
	Queued State = "queued"
	// Running state
	Running State = "running"
	// Done state
	Done State = "done"
	// Errored state
	Errored State = "errored"
)

type Trigger

type Trigger interface {
	prefixer.Prefixer
	Type() string
	Infos() *TriggerInfos
	// Schedule should return a channel on which the trigger can send job
	// requests when it decides to.
	Schedule() <-chan *JobRequest
	// Unschedule should be used to clean the trigger states and should close
	// the returns jobs channel.
	Unschedule()
}

Trigger interface is used to represent a trigger.

func NewTrigger

func NewTrigger(db prefixer.Prefixer, infos TriggerInfos, data interface{}) (Trigger, error)

NewTrigger creates the trigger associates with the specified trigger options.

type TriggerInfos

type TriggerInfos struct {
	TID          string        `json:"_id,omitempty"`
	TRev         string        `json:"_rev,omitempty"`
	Domain       string        `json:"domain"`
	Prefix       string        `json:"prefix,omitempty"`
	Type         string        `json:"type"`
	WorkerType   string        `json:"worker"`
	Arguments    string        `json:"arguments"`
	Debounce     string        `json:"debounce"`
	Options      *JobOptions   `json:"options"`
	Message      Message       `json:"message"`
	CurrentState *TriggerState `json:"current_state,omitempty"`
}

TriggerInfos is a struct containing all the options of a trigger.

func (*TriggerInfos) Clone

func (t *TriggerInfos) Clone() couchdb.Doc

Clone implements the couchdb.Doc interface

func (*TriggerInfos) DBPrefix

func (t *TriggerInfos) DBPrefix() string

DBPrefix implements the prefixer.Prefixer interface.

func (*TriggerInfos) DocType

func (t *TriggerInfos) DocType() string

DocType implements the couchdb.Doc interface

func (*TriggerInfos) DomainName

func (t *TriggerInfos) DomainName() string

DomainName implements the prefixer.Prefixer interface.

func (*TriggerInfos) ID

func (t *TriggerInfos) ID() string

ID implements the couchdb.Doc interface

func (*TriggerInfos) JobRequest

func (t *TriggerInfos) JobRequest() *JobRequest

JobRequest returns a job request associated with the scheduler informations.

func (*TriggerInfos) JobRequestWithEvent

func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)

JobRequestWithEvent returns a job request associated with the scheduler informations associated to the specified realtime event.

func (*TriggerInfos) Match

func (t *TriggerInfos) Match(key, value string) bool

Match implements the permission.Matcher interface

func (*TriggerInfos) Rev

func (t *TriggerInfos) Rev() string

Rev implements the couchdb.Doc interface

func (*TriggerInfos) SetID

func (t *TriggerInfos) SetID(id string)

SetID implements the couchdb.Doc interface

func (*TriggerInfos) SetRev

func (t *TriggerInfos) SetRev(rev string)

SetRev implements the couchdb.Doc interface

type TriggerState

type TriggerState struct {
	TID                 string     `json:"trigger_id"`
	Status              State      `json:"status"`
	LastSuccess         *time.Time `json:"last_success,omitempty"`
	LastSuccessfulJobID string     `json:"last_successful_job_id,omitempty"`
	LastExecution       *time.Time `json:"last_execution,omitempty"`
	LastExecutedJobID   string     `json:"last_executed_job_id,omitempty"`
	LastFailure         *time.Time `json:"last_failure,omitempty"`
	LastFailedJobID     string     `json:"last_failed_job_id,omitempty"`
	LastError           string     `json:"last_error,omitempty"`
	LastManualExecution *time.Time `json:"last_manual_execution,omitempty"`
	LastManualJobID     string     `json:"last_manual_job_id,omitempty"`
}

TriggerState represent the current state of the trigger

func GetTriggerState

func GetTriggerState(db prefixer.Prefixer, triggerID string) (*TriggerState, error)

GetTriggerState returns the state of the trigger, calculated from the last launched jobs.

type Worker

type Worker struct {
	Type string
	Conf *WorkerConfig
	// contains filtered or unexported fields
}

Worker is a unit of work that will consume from a queue and execute the do method for each jobs it pulls.

func NewWorker

func NewWorker(conf *WorkerConfig) *Worker

NewWorker creates a new instance of Worker with the given configuration.

func (*Worker) Shutdown

func (w *Worker) Shutdown(ctx context.Context) error

Shutdown is used to close the worker, waiting for all tasks to end

func (*Worker) Start

func (w *Worker) Start(jobs chan *Job) error

Start is used to start the worker consumption of messages from its queue.

type WorkerBeforeHook

type WorkerBeforeHook func(job *Job) (bool, error)

WorkerBeforeHook is an optional method that is always called before the job is being pushed into the queue. It can be useful to skip the job beforehand.

type WorkerCommit

type WorkerCommit func(ctx *WorkerContext, errjob error) error

WorkerCommit is an optional method that is always called once after the execution of the WorkerFunc.

type WorkerConfig

type WorkerConfig struct {
	WorkerInit   WorkerInitFunc
	WorkerStart  WorkerStartFunc
	WorkerFunc   WorkerFunc
	WorkerCommit WorkerCommit
	WorkerType   string
	BeforeHook   WorkerBeforeHook
	ErrorHook    JobErrorCheckerHook
	Concurrency  int
	MaxExecCount int
	AdminOnly    bool
	Timeout      time.Duration
	RetryDelay   time.Duration
}

WorkerConfig is the configuration parameter of a worker defined by the job system. It contains parameters of the worker along with the worker main function that perform the work against a job's message.

func GetWorkersList

func GetWorkersList() ([]*WorkerConfig, error)

GetWorkersList returns a list of all activated workers, configured as defined by the configuration file.

func (*WorkerConfig) Clone

func (w *WorkerConfig) Clone() *WorkerConfig

Clone clones the worker config

type WorkerContext

type WorkerContext struct {
	context.Context
	// contains filtered or unexported fields
}

WorkerContext is a context.Context passed to the worker for each job execution and contains specific values from the job.

func NewWorkerContext

func NewWorkerContext(workerID string, job *Job) *WorkerContext

NewWorkerContext returns a context.Context usable by a worker. TODO : Save the right prefix

func (*WorkerContext) Cookie

func (c *WorkerContext) Cookie() interface{}

Cookie returns the cookie associated with the worker context.

func (*WorkerContext) ID

func (c *WorkerContext) ID() string

ID returns a unique identifier for the worker context.

func (*WorkerContext) Logger

func (c *WorkerContext) Logger() *logrus.Entry

Logger return the logger associated with the worker context.

func (*WorkerContext) Manual

func (c *WorkerContext) Manual() bool

Manual returns if the job was started manually

func (*WorkerContext) NoRetry

func (c *WorkerContext) NoRetry() bool

NoRetry returns the no-retry flag.

func (*WorkerContext) SetNoRetry

func (c *WorkerContext) SetNoRetry()

SetNoRetry set the no-retry flag to prevent a retry on the next execution.

func (*WorkerContext) TriggerID

func (c *WorkerContext) TriggerID() (string, bool)

TriggerID returns the possible trigger identifier responsible for launching the job.

func (*WorkerContext) UnmarshalEvent

func (c *WorkerContext) UnmarshalEvent(v interface{}) error

UnmarshalEvent unmarshals the event contained in the worker context.

func (*WorkerContext) UnmarshalMessage

func (c *WorkerContext) UnmarshalMessage(v interface{}) error

UnmarshalMessage unmarshals the message contained in the worker context.

func (*WorkerContext) WithCookie

func (c *WorkerContext) WithCookie(cookie interface{}) *WorkerContext

WithCookie returns a clone of the context with a new cookie value.

func (*WorkerContext) WithTimeout

func (c *WorkerContext) WithTimeout(timeout time.Duration) (*WorkerContext, context.CancelFunc)

WithTimeout returns a clone of the context with a different deadline.

type WorkerFunc

type WorkerFunc func(ctx *WorkerContext) error

WorkerFunc represent the work function that a worker should implement.

type WorkerInitFunc

type WorkerInitFunc func() error

WorkerInitFunc is called at the start of the worker system, only once. It is not called before every job process. It can be useful to initialize a global variable used by the worker.

type WorkerStartFunc

type WorkerStartFunc func(ctx *WorkerContext) (*WorkerContext, error)

WorkerStartFunc is optionally called at the beginning of the each job process and can produce a context value.

type WorkersList

type WorkersList []*WorkerConfig

WorkersList is a map associating a worker type with its acutal configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL