Documentation ¶
Index ¶
- Constants
- Variables
- type AutomatedInstanceJobParams
- type Cleaner
- type DatabaseStore
- func (s *DatabaseStore) GC() error
- func (s *DatabaseStore) GetJob(inst flux.InstanceID, id JobID) (Job, error)
- func (s *DatabaseStore) Heartbeat(id JobID) error
- func (s *DatabaseStore) NextJob(queues []string) (Job, error)
- func (s *DatabaseStore) PutJob(inst flux.InstanceID, job Job) (JobID, error)
- func (s *DatabaseStore) PutJobIgnoringDuplicates(inst flux.InstanceID, job Job) (JobID, error)
- func (s *DatabaseStore) Transaction(f func(*DatabaseStore) error) error
- func (s *DatabaseStore) UpdateJob(job Job) error
- type Handler
- type Job
- type JobID
- type JobPopper
- type JobReadPusher
- type JobStore
- type JobUpdater
- type JobWritePopper
- type ReleaseJobParams
- type Worker
Constants ¶
const ( // DefaultQueue is the queue to use if none is set. DefaultQueue = "default" // ReleaseJob is the method for a release job ReleaseJob = "release" // AutomatedInstanceJob is the method for a check automated instance job AutomatedInstanceJob = "automated_instance" // PriorityBackground is priority for background jobs PriorityBackground = 100 // PriorityInteractive is priority for interactive jobs PriorityInteractive = 200 )
const StatusQueued = "Queued."
Variables ¶
var ( // This is a user-facing error ErrNoSuchJob = flux.Missing{&flux.BaseError{ Help: `The release you requested does not exist. This may mean that it has expired, or that you have mistyped the release ID.`, Err: errors.New("no such release job found"), }} ErrNoJobAvailable = errors.New("no job available") ErrUnknownJobMethod = errors.New("unknown job method") ErrJobAlreadyQueued = errors.New("job is already queued") ErrNoResultExpected = errors.New("no result expected") )
var (
ErrNoHandlerForJob = fmt.Errorf("no handler for job type")
)
Functions ¶
This section is empty.
Types ¶
type AutomatedInstanceJobParams ¶
type AutomatedInstanceJobParams struct {
InstanceID flux.InstanceID
}
AutomatedInstanceJobParams are the params for an automated_instance job
type DatabaseStore ¶
type DatabaseStore struct {
// contains filtered or unexported fields
}
DatabaseStore is a job store backed by a sql.DB.
func NewDatabaseStore ¶
func NewDatabaseStore(driver, datasource string, oldest time.Duration) (*DatabaseStore, error)
NewDatabaseStore returns a usable DatabaseStore. The DB should have a jobs table.
func (*DatabaseStore) GC ¶
func (s *DatabaseStore) GC() error
func (*DatabaseStore) GetJob ¶
func (s *DatabaseStore) GetJob(inst flux.InstanceID, id JobID) (Job, error)
func (*DatabaseStore) Heartbeat ¶
func (s *DatabaseStore) Heartbeat(id JobID) error
func (*DatabaseStore) NextJob ¶
func (s *DatabaseStore) NextJob(queues []string) (Job, error)
Take the next job from specified queues. If queues is nil, all queues are used.
func (*DatabaseStore) PutJob ¶
func (s *DatabaseStore) PutJob(inst flux.InstanceID, job Job) (JobID, error)
PutJob schedules a job to run. Users should set the Queue, Method, Params, and ScheduledAt fields of the job. If ScheduledAt is nil, the job will run immediately. If job Key is not blank, it will be checked for any other unfinished duplicate jobs.
func (*DatabaseStore) PutJobIgnoringDuplicates ¶
func (s *DatabaseStore) PutJobIgnoringDuplicates(inst flux.InstanceID, job Job) (JobID, error)
PutJobIgnoringDuplicates schedules a job to run. Key field and any duplicates are ignored.
func (*DatabaseStore) Transaction ¶
func (s *DatabaseStore) Transaction(f func(*DatabaseStore) error) error
func (*DatabaseStore) UpdateJob ¶
func (s *DatabaseStore) UpdateJob(job Job) error
type Job ¶
type Job struct { Instance flux.InstanceID `json:"instanceID"` ID JobID `json:"id"` // To be set when scheduling the job Queue string `json:"queue"` Method string `json:"method"` Params interface{} `json:"params"` ScheduledAt time.Time `json:"scheduled_at"` Priority int `json:"priority"` // Key is an optional field, and can be used to create jobs iff a pending // job with the same key doesn't exist. Key string `json:"key,omitempty"` // To be used by the worker Submitted time.Time `json:"submitted"` Claimed time.Time `json:"claimed,omitempty"` Heartbeat time.Time `json:"heartbeat,omitempty"` Finished time.Time `json:"finished,omitempty"` Log []string `json:"log,omitempty"` Result interface{} `json:"result"` // may be updated to reflect progress Status string `json:"status"` Done bool `json:"done"` Success bool `json:"success"` // only makes sense after done is true Error *flux.BaseError `json:"error,omitempty"` }
Job describes a worker job
func (*Job) UnmarshalJSON ¶
type JobReadPusher ¶
type JobReadPusher interface { GetJob(flux.InstanceID, JobID) (Job, error) PutJob(flux.InstanceID, Job) (JobID, error) PutJobIgnoringDuplicates(flux.InstanceID, Job) (JobID, error) }
type JobStore ¶
type JobStore interface { JobReadPusher JobWritePopper GC() error }
func InstrumentedJobStore ¶
type JobWritePopper ¶
type JobWritePopper interface { JobUpdater JobPopper }
type ReleaseJobParams ¶
type ReleaseJobParams struct { flux.ReleaseSpec Cause flux.ReleaseCause }
ReleaseJobParams are the params for a release job
func (ReleaseJobParams) Spec ¶
func (params ReleaseJobParams) Spec() flux.ReleaseSpec
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker grabs jobs from the job store and executes them.
func NewWorker ¶
NewWorker returns a usable worker pulling jobs from the JobPopper. Run Work in its own goroutine to start execution.