Documentation ¶
Index ¶
- Constants
- Variables
- func AddWorker(conf *WorkerConfig)
- func GetWorkersNamesList() []string
- func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
- type Broker
- type ErrBadTrigger
- type Event
- type EventTrigger
- type Job
- func Get(db prefixer.Prefixer, jobID string) (*Job, error)
- func GetJobs(db prefixer.Prefixer, triggerID string, limit int) ([]*Job, error)
- func GetJobsBeforeDate(db prefixer.Prefixer, date time.Time) ([]*Job, error)
- func GetLastsJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)
- func GetQueuedJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)
- func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
- func (j *Job) Ack() error
- func (j *Job) AckConsumed() error
- func (j *Job) Clone() couchdb.Doc
- func (j *Job) Create() error
- func (j *Job) DBPrefix() string
- func (j *Job) DocType() string
- func (j *Job) DomainName() string
- func (j *Job) ID() string
- func (j *Job) Logger() *logrus.Entry
- func (j *Job) Match(key, value string) bool
- func (j *Job) Nack(err error) error
- func (j *Job) Rev() string
- func (j *Job) SetID(id string)
- func (j *Job) SetRev(rev string)
- func (j *Job) Update() error
- type JobErrorCheckerHook
- type JobOptions
- type JobRequest
- type JobSystem
- type Message
- type Scheduler
- type State
- type Trigger
- type TriggerInfos
- func (t *TriggerInfos) Clone() couchdb.Doc
- func (t *TriggerInfos) DBPrefix() string
- func (t *TriggerInfos) DocType() string
- func (t *TriggerInfos) DomainName() string
- func (t *TriggerInfos) ID() string
- func (t *TriggerInfos) JobRequest() *JobRequest
- func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
- func (t *TriggerInfos) Match(key, value string) bool
- func (t *TriggerInfos) Rev() string
- func (t *TriggerInfos) SetID(id string)
- func (t *TriggerInfos) SetRev(rev string)
- type TriggerState
- type Worker
- type WorkerBeforeHook
- type WorkerCommit
- type WorkerConfig
- type WorkerContext
- func (c *WorkerContext) Cookie() interface{}
- func (c *WorkerContext) ID() string
- func (c *WorkerContext) Logger() *logrus.Entry
- func (c *WorkerContext) Manual() bool
- func (c *WorkerContext) NoRetry() bool
- func (c *WorkerContext) SetNoRetry()
- func (c *WorkerContext) TriggerID() (string, bool)
- func (c *WorkerContext) UnmarshalEvent(v interface{}) error
- func (c *WorkerContext) UnmarshalMessage(v interface{}) error
- func (c *WorkerContext) WithCookie(cookie interface{}) *WorkerContext
- func (c *WorkerContext) WithTimeout(timeout time.Duration) (*WorkerContext, context.CancelFunc)
- type WorkerFunc
- type WorkerInitFunc
- type WorkerStartFunc
- type WorkersList
Constants ¶
const (
// WorkerType is the key in JSON for the type of worker
WorkerType = "worker"
)
Variables ¶
var ( // ErrClosed is using a closed system ErrClosed = errors.New("jobs: closed") // ErrNotFoundJob is used when the job could not be found ErrNotFoundJob = errors.New("jobs: not found") // ErrQueueClosed is used to indicate the queue is closed ErrQueueClosed = errors.New("jobs: queue is closed") // ErrUnknownWorker the asked worker does not exist ErrUnknownWorker = errors.New("jobs: could not find worker") // ErrMessageNil is used for an nil message ErrMessageNil = errors.New("jobs: message is nil") // ErrMessageUnmarshal is used when unmarshalling a message causes an error ErrMessageUnmarshal = errors.New("jobs: message unmarshal") // ErrAbort can be used to abort the execution of the job without causing // errors. ErrAbort = errors.New("jobs: abort") // ErrUnknownTrigger is used when the trigger type is not recognized ErrUnknownTrigger = errors.New("Unknown trigger type") // ErrNotFoundTrigger is used when the trigger was not found ErrNotFoundTrigger = errors.New("Trigger with specified ID does not exist") // ErrMalformedTrigger is used to indicate the trigger is unparsable ErrMalformedTrigger = echo.NewHTTPError(http.StatusBadRequest, "Trigger unparsable") )
Functions ¶
func AddWorker ¶
func AddWorker(conf *WorkerConfig)
AddWorker adds a new worker to global list of available workers.
func GetWorkersNamesList ¶
func GetWorkersNamesList() []string
GetWorkersNamesList returns the names of the configured workers
func SystemStart ¶
func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
SystemStart initializes and starts the global jobs system with the given broker, scheduler instances and workers list.
Types ¶
type Broker ¶
type Broker interface { StartWorkers(workersList WorkersList) error ShutdownWorkers(ctx context.Context) error // PushJob will push try to push a new job from the specified job request. // This method is asynchronous. PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error) // WorkerQueueLen returns the total element in the queue of the specified // worker type. WorkerQueueLen(workerType string) (int, error) // WorkersTypes returns the list of registered workers types. WorkersTypes() []string }
Broker interface is used to represent a job broker associated to a particular domain. A broker can be used to create jobs that are pushed in the job system.
func NewMemBroker ¶
func NewMemBroker() Broker
NewMemBroker creates a new in-memory broker system.
The in-memory implementation of the job system has the specifity that workers are actually launched by the broker at its creation.
type ErrBadTrigger ¶
type ErrBadTrigger struct {
Err error
}
ErrBadTrigger is an error conveying the information of a trigger that is not valid, and could be deleted.
func (ErrBadTrigger) Error ¶
func (e ErrBadTrigger) Error() string
type Event ¶
type Event json.RawMessage
Event is a json encoded value of a realtime.Event
type EventTrigger ¶
type EventTrigger struct { *TriggerInfos // contains filtered or unexported fields }
EventTrigger implements Trigger for realtime triggered events
func NewEventTrigger ¶
func NewEventTrigger(infos *TriggerInfos) (*EventTrigger, error)
NewEventTrigger returns a new instance of EventTrigger given the specified options.
func (*EventTrigger) DocType ¶
func (t *EventTrigger) DocType() string
DocType implements the permission.Matcher interface
func (*EventTrigger) ID ¶
func (t *EventTrigger) ID() string
ID implements the permission.Matcher interface
func (*EventTrigger) Infos ¶
func (t *EventTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*EventTrigger) Match ¶
func (t *EventTrigger) Match(key, value string) bool
Match implements the permission.Matcher interface
func (*EventTrigger) Schedule ¶
func (t *EventTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*EventTrigger) Type ¶
func (t *EventTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*EventTrigger) Unschedule ¶
func (t *EventTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type Job ¶
type Job struct { JobID string `json:"_id,omitempty"` JobRev string `json:"_rev,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` WorkerType string `json:"worker"` TriggerID string `json:"trigger_id,omitempty"` Message Message `json:"message"` Event Event `json:"event"` Manual bool `json:"manual_execution,omitempty"` Debounced bool `json:"debounced,omitempty"` Options *JobOptions `json:"options,omitempty"` State State `json:"state"` QueuedAt time.Time `json:"queued_at"` StartedAt time.Time `json:"started_at"` FinishedAt time.Time `json:"finished_at"` Error string `json:"error,omitempty"` ForwardLogs bool `json:"forward_logs,omitempty"` }
Job contains all the metadata informations of a Job. It can be marshalled in JSON.
func GetJobsBeforeDate ¶
GetJobsBeforeDate returns alls jobs queued before the specified date
func GetLastsJobs ¶
GetLastsJobs returns the N lasts job of each state for an instance/worker type pair
func GetQueuedJobs ¶
GetQueuedJobs returns the list of jobs which states is "queued" or "running"
func NewJob ¶
func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
NewJob creates a new Job instance from a job request.
func (*Job) AckConsumed ¶
AckConsumed sets the job infos state to Running an sends the new job infos on the channel.
func (*Job) DomainName ¶
DomainName implements the prefixer.Prefixer interface.
type JobErrorCheckerHook ¶
JobErrorCheckerHook is an optional method called at the beginning of the job execution to prevent a retry according to the previous error (specifically useful in the retries loop)
type JobOptions ¶
type JobOptions struct { MaxExecCount int `json:"max_exec_count"` MaxExecTime time.Duration `json:"max_exec_time"` Timeout time.Duration `json:"timeout"` }
JobOptions struct contains the execution properties of the jobs.
type JobRequest ¶
type JobRequest struct { WorkerType string TriggerID string Trigger Trigger Message Message Event Event Manual bool Debounced bool ForwardLogs bool Admin bool Options *JobOptions }
JobRequest struct is used to represent a new job request.
func (*JobRequest) DocType ¶
func (jr *JobRequest) DocType() string
DocType implements the permission.Matcher interface
func (*JobRequest) ID ¶
func (jr *JobRequest) ID() string
ID implements the permission.Matcher interface
func (*JobRequest) Match ¶
func (jr *JobRequest) Match(key, value string) bool
Match implements the permission.Matcher interface
type JobSystem ¶
type JobSystem interface { Broker Scheduler utils.Shutdowner }
JobSystem is a pair of broker, scheduler linked together.
type Message ¶
type Message json.RawMessage
Message is a json encoded job message.
func NewMessage ¶
NewMessage returns a json encoded data
func (Message) MarshalJSON ¶
MarshalJSON implements json.Marshaler on Message.
func (Message) Unmarshal ¶
Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.
func (*Message) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler on Message. It should be retro- compatible with the old Message representation { Data, Type }.
type Scheduler ¶
type Scheduler interface { StartScheduler(broker Broker) error ShutdownScheduler(ctx context.Context) error PollScheduler(now int64) error AddTrigger(trigger Trigger) error GetTrigger(db prefixer.Prefixer, id string) (Trigger, error) DeleteTrigger(db prefixer.Prefixer, id string) error GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error) CleanRedis() error RebuildRedis(db prefixer.Prefixer) error }
Scheduler interface is used to represent a scheduler that is responsible to listen respond to triggers jobs requests and send them to the broker.
func NewMemScheduler ¶
func NewMemScheduler() Scheduler
NewMemScheduler creates a new in-memory scheduler that will load all registered triggers and schedule their work.
type Trigger ¶
type Trigger interface { prefixer.Prefixer Type() string Infos() *TriggerInfos // Schedule should return a channel on which the trigger can send job // requests when it decides to. Schedule() <-chan *JobRequest // Unschedule should be used to clean the trigger states and should close // the returns jobs channel. Unschedule() }
Trigger interface is used to represent a trigger.
func NewTrigger ¶
func NewTrigger(db prefixer.Prefixer, infos TriggerInfos, data interface{}) (Trigger, error)
NewTrigger creates the trigger associates with the specified trigger options.
type TriggerInfos ¶
type TriggerInfos struct { TID string `json:"_id,omitempty"` TRev string `json:"_rev,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` Type string `json:"type"` WorkerType string `json:"worker"` Arguments string `json:"arguments"` Debounce string `json:"debounce"` Options *JobOptions `json:"options"` Message Message `json:"message"` CurrentState *TriggerState `json:"current_state,omitempty"` }
TriggerInfos is a struct containing all the options of a trigger.
func (*TriggerInfos) Clone ¶
func (t *TriggerInfos) Clone() couchdb.Doc
Clone implements the couchdb.Doc interface
func (*TriggerInfos) DBPrefix ¶
func (t *TriggerInfos) DBPrefix() string
DBPrefix implements the prefixer.Prefixer interface.
func (*TriggerInfos) DocType ¶
func (t *TriggerInfos) DocType() string
DocType implements the couchdb.Doc interface
func (*TriggerInfos) DomainName ¶
func (t *TriggerInfos) DomainName() string
DomainName implements the prefixer.Prefixer interface.
func (*TriggerInfos) ID ¶
func (t *TriggerInfos) ID() string
ID implements the couchdb.Doc interface
func (*TriggerInfos) JobRequest ¶
func (t *TriggerInfos) JobRequest() *JobRequest
JobRequest returns a job request associated with the scheduler informations.
func (*TriggerInfos) JobRequestWithEvent ¶
func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
JobRequestWithEvent returns a job request associated with the scheduler informations associated to the specified realtime event.
func (*TriggerInfos) Match ¶
func (t *TriggerInfos) Match(key, value string) bool
Match implements the permission.Matcher interface
func (*TriggerInfos) Rev ¶
func (t *TriggerInfos) Rev() string
Rev implements the couchdb.Doc interface
func (*TriggerInfos) SetID ¶
func (t *TriggerInfos) SetID(id string)
SetID implements the couchdb.Doc interface
func (*TriggerInfos) SetRev ¶
func (t *TriggerInfos) SetRev(rev string)
SetRev implements the couchdb.Doc interface
type TriggerState ¶
type TriggerState struct { TID string `json:"trigger_id"` Status State `json:"status"` LastSuccess *time.Time `json:"last_success,omitempty"` LastSuccessfulJobID string `json:"last_successful_job_id,omitempty"` LastExecution *time.Time `json:"last_execution,omitempty"` LastExecutedJobID string `json:"last_executed_job_id,omitempty"` LastFailure *time.Time `json:"last_failure,omitempty"` LastFailedJobID string `json:"last_failed_job_id,omitempty"` LastError string `json:"last_error,omitempty"` LastManualExecution *time.Time `json:"last_manual_execution,omitempty"` LastManualJobID string `json:"last_manual_job_id,omitempty"` }
TriggerState represent the current state of the trigger
func GetTriggerState ¶
func GetTriggerState(db prefixer.Prefixer, triggerID string) (*TriggerState, error)
GetTriggerState returns the state of the trigger, calculated from the last launched jobs.
type Worker ¶
type Worker struct { Type string Conf *WorkerConfig // contains filtered or unexported fields }
Worker is a unit of work that will consume from a queue and execute the do method for each jobs it pulls.
func NewWorker ¶
func NewWorker(conf *WorkerConfig) *Worker
NewWorker creates a new instance of Worker with the given configuration.
type WorkerBeforeHook ¶
WorkerBeforeHook is an optional method that is always called before the job is being pushed into the queue. It can be useful to skip the job beforehand.
type WorkerCommit ¶
type WorkerCommit func(ctx *WorkerContext, errjob error) error
WorkerCommit is an optional method that is always called once after the execution of the WorkerFunc.
type WorkerConfig ¶
type WorkerConfig struct { WorkerInit WorkerInitFunc WorkerStart WorkerStartFunc WorkerFunc WorkerFunc WorkerCommit WorkerCommit WorkerType string BeforeHook WorkerBeforeHook ErrorHook JobErrorCheckerHook Concurrency int MaxExecCount int AdminOnly bool Timeout time.Duration RetryDelay time.Duration }
WorkerConfig is the configuration parameter of a worker defined by the job system. It contains parameters of the worker along with the worker main function that perform the work against a job's message.
func GetWorkersList ¶
func GetWorkersList() ([]*WorkerConfig, error)
GetWorkersList returns a list of all activated workers, configured as defined by the configuration file.
func (*WorkerConfig) Clone ¶
func (w *WorkerConfig) Clone() *WorkerConfig
Clone clones the worker config
type WorkerContext ¶
WorkerContext is a context.Context passed to the worker for each job execution and contains specific values from the job.
func NewWorkerContext ¶
func NewWorkerContext(workerID string, job *Job) *WorkerContext
NewWorkerContext returns a context.Context usable by a worker. TODO : Save the right prefix
func (*WorkerContext) Cookie ¶
func (c *WorkerContext) Cookie() interface{}
Cookie returns the cookie associated with the worker context.
func (*WorkerContext) ID ¶
func (c *WorkerContext) ID() string
ID returns a unique identifier for the worker context.
func (*WorkerContext) Logger ¶
func (c *WorkerContext) Logger() *logrus.Entry
Logger return the logger associated with the worker context.
func (*WorkerContext) Manual ¶
func (c *WorkerContext) Manual() bool
Manual returns if the job was started manually
func (*WorkerContext) NoRetry ¶
func (c *WorkerContext) NoRetry() bool
NoRetry returns the no-retry flag.
func (*WorkerContext) SetNoRetry ¶
func (c *WorkerContext) SetNoRetry()
SetNoRetry set the no-retry flag to prevent a retry on the next execution.
func (*WorkerContext) TriggerID ¶
func (c *WorkerContext) TriggerID() (string, bool)
TriggerID returns the possible trigger identifier responsible for launching the job.
func (*WorkerContext) UnmarshalEvent ¶
func (c *WorkerContext) UnmarshalEvent(v interface{}) error
UnmarshalEvent unmarshals the event contained in the worker context.
func (*WorkerContext) UnmarshalMessage ¶
func (c *WorkerContext) UnmarshalMessage(v interface{}) error
UnmarshalMessage unmarshals the message contained in the worker context.
func (*WorkerContext) WithCookie ¶
func (c *WorkerContext) WithCookie(cookie interface{}) *WorkerContext
WithCookie returns a clone of the context with a new cookie value.
func (*WorkerContext) WithTimeout ¶
func (c *WorkerContext) WithTimeout(timeout time.Duration) (*WorkerContext, context.CancelFunc)
WithTimeout returns a clone of the context with a different deadline.
type WorkerFunc ¶
type WorkerFunc func(ctx *WorkerContext) error
WorkerFunc represent the work function that a worker should implement.
type WorkerInitFunc ¶
type WorkerInitFunc func() error
WorkerInitFunc is called at the start of the worker system, only once. It is not called before every job process. It can be useful to initialize a global variable used by the worker.
type WorkerStartFunc ¶
type WorkerStartFunc func(ctx *WorkerContext) (*WorkerContext, error)
WorkerStartFunc is optionally called at the beginning of the each job process and can produce a context value.
type WorkersList ¶
type WorkersList []*WorkerConfig
WorkersList is a map associating a worker type with its acutal configuration.