Documentation ¶
Index ¶
- Constants
- Variables
- func AddWorker(conf *WorkerConfig)
- func GetCounterTypeFromWorkerType(workerType string) (limits.CounterType, error)
- func GetWorkersNamesList() []string
- func SetRedisTimeoutForTest()
- func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
- type AtTrigger
- type Broker
- type ClientTrigger
- type CronTrigger
- type DumpFilePather
- type ErrBadTrigger
- type Event
- type EventTrigger
- type Job
- func FilterByWorkerAndState(jobs []*Job, workerType string, state State, limit int) []*Job
- func FilterJobsBeforeDate(jobs []*Job, date time.Time) []*Job
- func Get(db prefixer.Prefixer, jobID string) (*Job, error)
- func GetAllJobs(db prefixer.Prefixer) ([]*Job, error)
- func GetJobs(db prefixer.Prefixer, triggerID string, limit int) ([]*Job, error)
- func GetLastsJobs(jobs []*Job, workerType string) ([]*Job, error)
- func GetQueuedJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)
- func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
- func (j *Job) Ack() error
- func (j *Job) AckConsumed() error
- func (j *Job) Clone() couchdb.Doc
- func (j *Job) Create() error
- func (j *Job) DBPrefix() string
- func (j *Job) DocType() string
- func (j *Job) DomainName() string
- func (j *Job) Fetch(field string) []string
- func (j *Job) ID() string
- func (j *Job) Logger() *logger.Entry
- func (j *Job) Nack(errorMessage string) error
- func (j *Job) Rev() string
- func (j *Job) SetID(id string)
- func (j *Job) SetRev(rev string)
- func (j *Job) Update() error
- func (j *Job) WaitUntilDone(db prefixer.Prefixer) error
- type JobErrorCheckerHook
- type JobOptions
- type JobRequest
- type JobSystem
- type Message
- type Payload
- type Scheduler
- type State
- type Trigger
- type TriggerInfos
- func (t *TriggerInfos) Clone() couchdb.Doc
- func (t *TriggerInfos) DBPrefix() string
- func (t *TriggerInfos) DocType() string
- func (t *TriggerInfos) DomainName() string
- func (t *TriggerInfos) Fetch(field string) []string
- func (t *TriggerInfos) ID() string
- func (t *TriggerInfos) JobRequest() *JobRequest
- func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
- func (t *TriggerInfos) Rev() string
- func (t *TriggerInfos) SetID(id string)
- func (t *TriggerInfos) SetRev(rev string)
- type TriggerState
- type WebhookTrigger
- func (w *WebhookTrigger) CombineRequest() string
- func (w *WebhookTrigger) Fire(payload Payload)
- func (w *WebhookTrigger) Infos() *TriggerInfos
- func (w *WebhookTrigger) Schedule() <-chan *JobRequest
- func (w *WebhookTrigger) SetCallback(cb firer)
- func (w *WebhookTrigger) Type() string
- func (w *WebhookTrigger) Unschedule()
- type Worker
- type WorkerBeforeHook
- type WorkerCommit
- type WorkerConfig
- type WorkerContext
- func (c *WorkerContext) Cookie() interface{}
- func (c *WorkerContext) ID() string
- func (c *WorkerContext) Logger() *logger.Entry
- func (c *WorkerContext) Manual() bool
- func (c *WorkerContext) NoRetry() bool
- func (c *WorkerContext) SetNoRetry()
- func (c *WorkerContext) TriggerID() (string, bool)
- func (c *WorkerContext) UnmarshalEvent(v interface{}) error
- func (c *WorkerContext) UnmarshalMessage(v interface{}) error
- func (c *WorkerContext) UnmarshalPayload() (map[string]interface{}, error)
- func (c *WorkerContext) WithCookie(cookie interface{}) *WorkerContext
- func (c *WorkerContext) WithTimeout(timeout time.Duration) (*WorkerContext, context.CancelFunc)
- type WorkerFunc
- type WorkerInitFunc
- type WorkerStartFunc
- type WorkersList
Constants ¶
const DocTypeVersionTrigger = "1"
DocTypeVersionTrigger represents the doctype version. Each time this document structure is modified, update this value
const SchedKey = "scheduling"
SchedKey is the the key of the sorted set in redis used for triggers currently being executed
const TriggersKey = "triggers"
TriggersKey is the the key of the sorted set in redis used for triggers waiting to be activated
Variables ¶
var ( // ErrClosed is using a closed system ErrClosed = errors.New("jobs: closed") // ErrNotFoundJob is used when the job could not be found ErrNotFoundJob = errors.New("jobs: not found") // ErrQueueClosed is used to indicate the queue is closed ErrQueueClosed = errors.New("jobs: queue is closed") // ErrUnknownWorker the asked worker does not exist ErrUnknownWorker = errors.New("jobs: could not find worker") // ErrMessageNil is used for an nil message ErrMessageNil = errors.New("jobs: message is nil") // ErrMessageUnmarshal is used when unmarshalling a message causes an error ErrMessageUnmarshal = errors.New("jobs: message unmarshal") // ErrAbort can be used to abort the execution of the job without causing // errors. ErrAbort = errors.New("jobs: abort") // ErrUnknownTrigger is used when the trigger type is not recognized ErrUnknownTrigger = errors.New("Unknown trigger type") // ErrNotFoundTrigger is used when the trigger was not found ErrNotFoundTrigger = errors.New("Trigger with specified ID does not exist") // ErrMalformedTrigger is used to indicate the trigger is unparsable ErrMalformedTrigger = echo.NewHTTPError(http.StatusBadRequest, "Trigger unparsable") // ErrNotCronTrigger is used when a @cron trigger is expected, but it is // not the case ErrNotCronTrigger = errors.New("Invalid type for trigger (@cron expected)") )
Functions ¶
func AddWorker ¶
func AddWorker(conf *WorkerConfig)
AddWorker adds a new worker to global list of available workers.
func GetCounterTypeFromWorkerType ¶
func GetCounterTypeFromWorkerType(workerType string) (limits.CounterType, error)
GetCounterTypeFromWorkerType returns the CounterTypeFromWorkerType
func GetWorkersNamesList ¶
func GetWorkersNamesList() []string
GetWorkersNamesList returns the names of the configured workers
func SetRedisTimeoutForTest ¶
func SetRedisTimeoutForTest()
SetRedisTimeoutForTest is used by unit test to avoid waiting 10 seconds on cleanup.
func SystemStart ¶
func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
SystemStart initializes and starts the global jobs system with the given broker, scheduler instances and workers list.
Types ¶
type AtTrigger ¶
type AtTrigger struct { *TriggerInfos // contains filtered or unexported fields }
AtTrigger implements the @at trigger type. It schedules a job at a specified time in the future.
func NewAtTrigger ¶
func NewAtTrigger(infos *TriggerInfos) (*AtTrigger, error)
NewAtTrigger returns a new instance of AtTrigger given the specified options.
func NewInTrigger ¶
func NewInTrigger(infos *TriggerInfos) (*AtTrigger, error)
NewInTrigger returns a new instance of AtTrigger given the specified options as @in.
func (*AtTrigger) CombineRequest ¶
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*AtTrigger) Infos ¶
func (a *AtTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*AtTrigger) Schedule ¶
func (a *AtTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*AtTrigger) Unschedule ¶
func (a *AtTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type Broker ¶
type Broker interface { StartWorkers(workersList WorkersList) error ShutdownWorkers(ctx context.Context) error // PushJob will push try to push a new job from the specified job request. // This method is asynchronous. PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error) // WorkerQueueLen returns the total element in the queue of the specified // worker type. WorkerQueueLen(workerType string) (int, error) // WorkerIsReserved returns true if the given worker type is reserved // (ie clients should not push jobs to it, only the stack). WorkerIsReserved(workerType string) (bool, error) // WorkersTypes returns the list of registered workers types. WorkersTypes() []string }
Broker interface is used to represent a job broker associated to a particular domain. A broker can be used to create jobs that are pushed in the job system.
func NewMemBroker ¶
func NewMemBroker() Broker
NewMemBroker creates a new in-memory broker system.
The in-memory implementation of the job system has the specifity that workers are actually launched by the broker at its creation.
func NewRedisBroker ¶
func NewRedisBroker(client redis.UniversalClient) Broker
NewRedisBroker creates a new broker that will use redis to distribute the jobs among several cozy-stack processes.
type ClientTrigger ¶
type ClientTrigger struct {
*TriggerInfos
}
ClientTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.
func NewClientTrigger ¶
func NewClientTrigger(infos *TriggerInfos) (*ClientTrigger, error)
NewClientTrigger returns a new instance of ClientTrigger.
func (*ClientTrigger) CombineRequest ¶
func (c *ClientTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*ClientTrigger) Infos ¶
func (c *ClientTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*ClientTrigger) Schedule ¶
func (c *ClientTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*ClientTrigger) Type ¶
func (c *ClientTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*ClientTrigger) Unschedule ¶
func (c *ClientTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type CronTrigger ¶
type CronTrigger struct { *TriggerInfos // contains filtered or unexported fields }
CronTrigger implements the @cron trigger type. It schedules recurring jobs with the weird but very used Cron syntax.
func NewCronTrigger ¶
func NewCronTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewCronTrigger returns a new instance of CronTrigger given the specified options.
func NewEveryTrigger ¶
func NewEveryTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewEveryTrigger returns an new instance of CronTrigger given the specified options as @every.
func (*CronTrigger) CombineRequest ¶
func (c *CronTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*CronTrigger) Infos ¶
func (c *CronTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*CronTrigger) NextExecution ¶
func (c *CronTrigger) NextExecution(last time.Time) time.Time
NextExecution returns the next time when a job should be fired for this trigger
func (*CronTrigger) Schedule ¶
func (c *CronTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*CronTrigger) Type ¶
func (c *CronTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*CronTrigger) Unschedule ¶
func (c *CronTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type DumpFilePather ¶
type DumpFilePather struct{}
DumpFilePather is a struct made for calling the Path method of a FileDoc and relying on the cached fullpath of this document (not trying to rebuild it)
type ErrBadTrigger ¶
type ErrBadTrigger struct {
Err error
}
ErrBadTrigger is an error conveying the information of a trigger that is not valid, and could be deleted.
func (ErrBadTrigger) Error ¶
func (e ErrBadTrigger) Error() string
type Event ¶
type Event json.RawMessage
Event is a json encoded value of a realtime.Event.
type EventTrigger ¶
type EventTrigger struct { *TriggerInfos // contains filtered or unexported fields }
EventTrigger implements Trigger for realtime triggered events
func NewEventTrigger ¶
func NewEventTrigger(infos *TriggerInfos) (*EventTrigger, error)
NewEventTrigger returns a new instance of EventTrigger given the specified options.
func (*EventTrigger) CombineRequest ¶
func (t *EventTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*EventTrigger) Infos ¶
func (t *EventTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*EventTrigger) Schedule ¶
func (t *EventTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*EventTrigger) Type ¶
func (t *EventTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*EventTrigger) Unschedule ¶
func (t *EventTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type Job ¶
type Job struct { JobID string `json:"_id,omitempty"` JobRev string `json:"_rev,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` WorkerType string `json:"worker"` TriggerID string `json:"trigger_id,omitempty"` Message Message `json:"message"` Event Event `json:"event"` Payload Payload `json:"payload,omitempty"` Manual bool `json:"manual_execution,omitempty"` Debounced bool `json:"debounced,omitempty"` Options *JobOptions `json:"options,omitempty"` State State `json:"state"` QueuedAt time.Time `json:"queued_at"` StartedAt time.Time `json:"started_at"` FinishedAt time.Time `json:"finished_at"` Error string `json:"error,omitempty"` ForwardLogs bool `json:"forward_logs,omitempty"` }
Job contains all the metadata informations of a Job. It can be marshalled in JSON.
func FilterByWorkerAndState ¶
FilterByWorkerAndState filters a job slice by its workerType and State
func FilterJobsBeforeDate ¶
FilterJobsBeforeDate returns alls jobs queued before the specified date
func GetAllJobs ¶
GetAllJobs returns the list of all the jobs on the given instance.
func GetLastsJobs ¶
GetLastsJobs returns the N lasts job of each state for an instance/worker type pair
func GetQueuedJobs ¶
GetQueuedJobs returns the list of jobs which states is "queued" or "running"
func NewJob ¶
func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
NewJob creates a new Job instance from a job request.
func (*Job) AckConsumed ¶
AckConsumed sets the job infos state to Running an sends the new job infos on the channel.
func (*Job) DomainName ¶
DomainName implements the prefixer.Prefixer interface.
type JobErrorCheckerHook ¶
JobErrorCheckerHook is an optional method called at the beginning of the job execution to prevent a retry according to the previous error (specifically useful in the retries loop)
type JobOptions ¶
type JobOptions struct { MaxExecCount int `json:"max_exec_count"` Timeout time.Duration `json:"timeout"` }
JobOptions struct contains the execution properties of the jobs.
type JobRequest ¶
type JobRequest struct { WorkerType string TriggerID string Trigger Trigger Message Message Event Event Payload Payload Manual bool Debounced bool ForwardLogs bool Options *JobOptions }
JobRequest struct is used to represent a new job request.
func (*JobRequest) DocType ¶
func (jr *JobRequest) DocType() string
DocType implements the permission.Getter interface
func (*JobRequest) Fetch ¶
func (jr *JobRequest) Fetch(field string) []string
Fetch implements the permission.Fetcher interface
func (*JobRequest) ID ¶
func (jr *JobRequest) ID() string
ID implements the permission.Getter interface
type JobSystem ¶
type JobSystem interface { Broker Scheduler utils.Shutdowner }
JobSystem is a pair of broker, scheduler linked together.
type Message ¶
type Message json.RawMessage
Message is a json encoded job message.
func NewMessage ¶
NewMessage returns a json encoded data
func (Message) MarshalJSON ¶
MarshalJSON implements json.Marshaler on Message.
func (Message) Unmarshal ¶
Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.
func (*Message) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler on Message. It should be retro- compatible with the old Message representation { Data, Type }.
type Scheduler ¶
type Scheduler interface { StartScheduler(broker Broker) error ShutdownScheduler(ctx context.Context) error PollScheduler(now int64) error AddTrigger(trigger Trigger) error GetTrigger(db prefixer.Prefixer, id string) (Trigger, error) UpdateCron(db prefixer.Prefixer, trigger Trigger, arguments string) error DeleteTrigger(db prefixer.Prefixer, id string) error GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error) HasTrigger(db prefixer.Prefixer, infos TriggerInfos) bool CleanRedis() error RebuildRedis(db prefixer.Prefixer) error }
Scheduler interface is used to represent a scheduler that is responsible to listen respond to triggers jobs requests and send them to the broker.
func NewMemScheduler ¶
func NewMemScheduler() Scheduler
NewMemScheduler creates a new in-memory scheduler that will load all registered triggers and schedule their work.
func NewRedisScheduler ¶
func NewRedisScheduler(client redis.UniversalClient) Scheduler
NewRedisScheduler creates a new scheduler that use redis to synchronize with other cozy-stack processes to schedule jobs.
type Trigger ¶
type Trigger interface { prefixer.Prefixer permission.Fetcher Type() string Infos() *TriggerInfos // Schedule should return a channel on which the trigger can send job // requests when it decides to. Schedule() <-chan *JobRequest // Unschedule should be used to clean the trigger states and should close // the returns jobs channel. Unschedule() CombineRequest() string }
Trigger interface is used to represent a trigger.
func NewTrigger ¶
func NewTrigger(db prefixer.Prefixer, infos TriggerInfos, data interface{}) (Trigger, error)
NewTrigger creates the trigger associates with the specified trigger options.
type TriggerInfos ¶
type TriggerInfos struct { TID string `json:"_id,omitempty"` TRev string `json:"_rev,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` Type string `json:"type"` WorkerType string `json:"worker"` Arguments string `json:"arguments"` Debounce string `json:"debounce"` Options *JobOptions `json:"options"` Message Message `json:"message"` CurrentState *TriggerState `json:"current_state,omitempty"` Metadata *metadata.CozyMetadata `json:"cozyMetadata,omitempty"` }
TriggerInfos is a struct containing all the options of a trigger.
func (*TriggerInfos) Clone ¶
func (t *TriggerInfos) Clone() couchdb.Doc
Clone implements the couchdb.Doc interface
func (*TriggerInfos) DBPrefix ¶
func (t *TriggerInfos) DBPrefix() string
DBPrefix implements the prefixer.Prefixer interface.
func (*TriggerInfos) DocType ¶
func (t *TriggerInfos) DocType() string
DocType implements the couchdb.Doc interface
func (*TriggerInfos) DomainName ¶
func (t *TriggerInfos) DomainName() string
DomainName implements the prefixer.Prefixer interface.
func (*TriggerInfos) Fetch ¶
func (t *TriggerInfos) Fetch(field string) []string
Fetch implements the permission.Fetcher interface
func (*TriggerInfos) ID ¶
func (t *TriggerInfos) ID() string
ID implements the couchdb.Doc interface
func (*TriggerInfos) JobRequest ¶
func (t *TriggerInfos) JobRequest() *JobRequest
JobRequest returns a job request associated with the scheduler informations.
func (*TriggerInfos) JobRequestWithEvent ¶
func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
JobRequestWithEvent returns a job request associated with the scheduler informations associated to the specified realtime event.
func (*TriggerInfos) Rev ¶
func (t *TriggerInfos) Rev() string
Rev implements the couchdb.Doc interface
func (*TriggerInfos) SetID ¶
func (t *TriggerInfos) SetID(id string)
SetID implements the couchdb.Doc interface
func (*TriggerInfos) SetRev ¶
func (t *TriggerInfos) SetRev(rev string)
SetRev implements the couchdb.Doc interface
type TriggerState ¶
type TriggerState struct { TID string `json:"trigger_id"` Status State `json:"status"` LastSuccess *time.Time `json:"last_success,omitempty"` LastSuccessfulJobID string `json:"last_successful_job_id,omitempty"` LastExecution *time.Time `json:"last_execution,omitempty"` LastExecutedJobID string `json:"last_executed_job_id,omitempty"` LastFailure *time.Time `json:"last_failure,omitempty"` LastFailedJobID string `json:"last_failed_job_id,omitempty"` LastError string `json:"last_error,omitempty"` LastManualExecution *time.Time `json:"last_manual_execution,omitempty"` LastManualJobID string `json:"last_manual_job_id,omitempty"` }
TriggerState represent the current state of the trigger
func GetTriggerState ¶
func GetTriggerState(db prefixer.Prefixer, triggerID string) (*TriggerState, error)
GetTriggerState returns the state of the trigger, calculated from the last launched jobs.
type WebhookTrigger ¶
type WebhookTrigger struct { *TriggerInfos // contains filtered or unexported fields }
WebhookTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.
func NewWebhookTrigger ¶
func NewWebhookTrigger(infos *TriggerInfos) (*WebhookTrigger, error)
NewWebhookTrigger returns a new instance of WebhookTrigger.
func (*WebhookTrigger) CombineRequest ¶
func (w *WebhookTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*WebhookTrigger) Fire ¶
func (w *WebhookTrigger) Fire(payload Payload)
Fire is called with a payload when the webhook has been requested.
func (*WebhookTrigger) Infos ¶
func (w *WebhookTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*WebhookTrigger) Schedule ¶
func (w *WebhookTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*WebhookTrigger) SetCallback ¶
func (w *WebhookTrigger) SetCallback(cb firer)
SetCallback registers a struct to be called when the webhook is fired.
func (*WebhookTrigger) Type ¶
func (w *WebhookTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*WebhookTrigger) Unschedule ¶
func (w *WebhookTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type Worker ¶
type Worker struct { Type string Conf *WorkerConfig // contains filtered or unexported fields }
Worker is a unit of work that will consume from a queue and execute the do method for each jobs it pulls.
func NewWorker ¶
func NewWorker(conf *WorkerConfig) *Worker
NewWorker creates a new instance of Worker with the given configuration.
type WorkerBeforeHook ¶
WorkerBeforeHook is an optional method that is always called before the job is being pushed into the queue. It can be useful to skip the job beforehand.
type WorkerCommit ¶
type WorkerCommit func(ctx *WorkerContext, errjob error) error
WorkerCommit is an optional method that is always called once after the execution of the WorkerFunc.
type WorkerConfig ¶
type WorkerConfig struct { WorkerInit WorkerInitFunc WorkerStart WorkerStartFunc WorkerFunc WorkerFunc WorkerCommit WorkerCommit WorkerType string BeforeHook WorkerBeforeHook ErrorHook JobErrorCheckerHook Concurrency int MaxExecCount int Reserved bool // true when the clients must not push jobs for this worker Timeout time.Duration RetryDelay time.Duration }
WorkerConfig is the configuration parameter of a worker defined by the job system. It contains parameters of the worker along with the worker main function that perform the work against a job's message.
func GetWorkersList ¶
func GetWorkersList() ([]*WorkerConfig, error)
GetWorkersList returns a list of all activated workers, configured as defined by the configuration file.
func (*WorkerConfig) Clone ¶
func (w *WorkerConfig) Clone() *WorkerConfig
Clone clones the worker config
type WorkerContext ¶
type WorkerContext struct { context.Context Instance *instance.Instance // contains filtered or unexported fields }
WorkerContext is a context.Context passed to the worker for each job execution and contains specific values from the job.
func NewWorkerContext ¶
func NewWorkerContext(workerID string, job *Job, inst *instance.Instance) *WorkerContext
NewWorkerContext returns a context.Context usable by a worker.
func (*WorkerContext) Cookie ¶
func (c *WorkerContext) Cookie() interface{}
Cookie returns the cookie associated with the worker context.
func (*WorkerContext) ID ¶
func (c *WorkerContext) ID() string
ID returns a unique identifier for the worker context.
func (*WorkerContext) Logger ¶
func (c *WorkerContext) Logger() *logger.Entry
Logger return the logger associated with the worker context.
func (*WorkerContext) Manual ¶
func (c *WorkerContext) Manual() bool
Manual returns if the job was started manually
func (*WorkerContext) NoRetry ¶
func (c *WorkerContext) NoRetry() bool
NoRetry returns the no-retry flag.
func (*WorkerContext) SetNoRetry ¶
func (c *WorkerContext) SetNoRetry()
SetNoRetry set the no-retry flag to prevent a retry on the next execution.
func (*WorkerContext) TriggerID ¶
func (c *WorkerContext) TriggerID() (string, bool)
TriggerID returns the possible trigger identifier responsible for launching the job.
func (*WorkerContext) UnmarshalEvent ¶
func (c *WorkerContext) UnmarshalEvent(v interface{}) error
UnmarshalEvent unmarshals the event contained in the worker context.
func (*WorkerContext) UnmarshalMessage ¶
func (c *WorkerContext) UnmarshalMessage(v interface{}) error
UnmarshalMessage unmarshals the message contained in the worker context.
func (*WorkerContext) UnmarshalPayload ¶
func (c *WorkerContext) UnmarshalPayload() (map[string]interface{}, error)
UnmarshalPayload unmarshals the payload contained in the worker context.
func (*WorkerContext) WithCookie ¶
func (c *WorkerContext) WithCookie(cookie interface{}) *WorkerContext
WithCookie returns a clone of the context with a new cookie value.
func (*WorkerContext) WithTimeout ¶
func (c *WorkerContext) WithTimeout(timeout time.Duration) (*WorkerContext, context.CancelFunc)
WithTimeout returns a clone of the context with a different deadline.
type WorkerFunc ¶
type WorkerFunc func(ctx *WorkerContext) error
WorkerFunc represent the work function that a worker should implement.
type WorkerInitFunc ¶
type WorkerInitFunc func() error
WorkerInitFunc is called at the start of the worker system, only once. It is not called before every job process. It can be useful to initialize a global variable used by the worker.
type WorkerStartFunc ¶
type WorkerStartFunc func(ctx *WorkerContext) (*WorkerContext, error)
WorkerStartFunc is optionally called at the beginning of the each job process and can produce a context value.
type WorkersList ¶
type WorkersList []*WorkerConfig
WorkersList is a map associating a worker type with its acutal configuration.