Documentation ¶
Index ¶
- Variables
- type Config
- type Controller
- type ExecutionStatus
- type Job
- type JobExecution
- type Repository
- func (r *Repository) All() ([]Job, error)
- func (r *Repository) AssembleInfrastructure() error
- func (r *Repository) Begin() (*sqlx.Tx, error)
- func (r *Repository) ClaimFor(workerID uuid.UUID) error
- func (r *Repository) GetFor(workerID uuid.UUID) ([]Job, error)
- func (r *Repository) GetOne(id uuid.UUID) (Job, error)
- func (r *Repository) Heartbeat(workerID uuid.UUID) error
- func (r *Repository) Store(job Job) error
- func (r *Repository) StoreAll(jobs []Job) error
- type Service
Constants ¶
This section is empty.
Variables ¶
var ( JobConditionalErr = errors.New("") JobNotFound = fmt.Errorf("Job not found%w", JobConditionalErr) JobNotActive = fmt.Errorf("Job is not active%w", JobConditionalErr) JobNoStartTime = fmt.Errorf("Job has no start time%w", JobConditionalErr) OneShotJobUsed = fmt.Errorf("Job is one-shot and has already ran or been scheduled%w", JobConditionalErr) ExecutionAlreadyWaiting = fmt.Errorf("Job already has an execution waiting%w", JobConditionalErr) JobNotDue = errors.New("Job not due") JobNotMine = errors.New("Job does not belong to this worker") JobTaskUnknown = func(taskName string) error { return errors.New(fmt.Sprintf("Job's task is unknown: %s", taskName)) } UnknownJobStatus = func(status ExecutionStatus) error { return errors.New(fmt.Sprintf("Job has unknown status: %s", status)) } )
Functions ¶
This section is empty.
Types ¶
type Config ¶
func BuildConfig ¶
type Controller ¶
type Controller struct { LoopSeconds float64 // contains filtered or unexported fields }
Controller is the job controller, which runs control loops
func NewController ¶
func NewController(r *Repository) *Controller
NewController returns a new controller.
func (*Controller) Block ¶
func (c *Controller) Block(seconds int)
Block optionally runs the control loop, while blocking.
func (*Controller) FinishTaskForJob ¶
func (c *Controller) FinishTaskForJob(jobID uuid.UUID) error
finishTaskForJob records a job as finished.
func (*Controller) JobFinishingMiddleware ¶
func (c *Controller) JobFinishingMiddleware(next bus.CommandHandler) bus.CommandHandler
JobFinishingMiddleware hooks into the bus's command execution stack and allows it to report to the controller about the jobs execution status when it passes through. Should be inserted ABOVE recovery middleware so that panics don't stop job status being reported
func (*Controller) RegisterQueueAction ¶
func (c *Controller) RegisterQueueAction(qa queueAction)
registerQueueAction attaches the QA callback
func (*Controller) Run ¶
func (c *Controller) Run(done chan bool)
Run asynchronously runs the control loop, receiving a done signal.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus is the current status of one Job Execution.
const ( WAITING ExecutionStatus = "waiting" PROCESSING ExecutionStatus = "processing" COMPLETE ExecutionStatus = "complete" NONE ExecutionStatus = "none" )
type Job ¶
type Job struct { ID uuid.UUID Name string Frequency int SystemJob bool `json:"system_job" db:"system_job"` Task []byte UserID uuid.UUID `json:"user_id" db:"user_id"` Worker uuid.UUID Heartbeat time.Time Active bool StartAt time.Time `json:"start_at" db:"start_at"` Executions []JobExecution `db:"-"` CreatedAt time.Time `json:"created_at" db:"created_at"` }
Job is a domain entity for a Job, delayed execution task.
func (Job) NextExecution ¶
func (j Job) NextExecution() *JobExecution
NextExecution returns the next execution
func (Job) NextExecutionStatus ¶
func (j Job) NextExecutionStatus() ExecutionStatus
NextExecutionStatus returns the status of the next (aka currently pending) execution, or NONE
func (*Job) ScheduleNextExecution ¶
ScheduleNextExecution creates a the next execution
func (*Job) ScheduleNow ¶
ScheduleNow modifies the job after it's been scheduled/queued
type JobExecution ¶
type JobExecution struct { ID uuid.UUID JobID uuid.UUID `json:"job_id" db:"job_id"` Status ExecutionStatus Next time.Time Job Job `db:"-"` CreatedAt time.Time `json:"created_at" db:"created_at"` ScheduledAt time.Time `json:"scheduled_at" db:"scheduled_at"` CompletedAt time.Time `json:"completed_at" db:"completed_at"` }
JobExecution is a domain entity for one run of a Job.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository handles DB access to the Job aggregate
func NewRepository ¶
func NewRepository(c Config, db *sqlx.DB) *Repository
NewRepository returns a job repository.
func (*Repository) AssembleInfrastructure ¶
func (r *Repository) AssembleInfrastructure() error
AssembleInfrastructure creates the tables for the repository.
func (*Repository) Begin ¶
func (r *Repository) Begin() (*sqlx.Tx, error)
Begin starts a transaction.
func (*Repository) ClaimFor ¶
func (r *Repository) ClaimFor(workerID uuid.UUID) error
ClaimFor claims any unclaimed/abandoned jobs for a worker
func (*Repository) GetFor ¶
func (r *Repository) GetFor(workerID uuid.UUID) ([]Job, error)
GetFor retrieves a worker's jobs.
func (*Repository) GetOne ¶
func (r *Repository) GetOne(id uuid.UUID) (Job, error)
GetOne retrieves one Job aggregate.
func (*Repository) Heartbeat ¶
func (r *Repository) Heartbeat(workerID uuid.UUID) error
Heartbeat updates the heartbeat for all of a worker's jobs.
func (*Repository) Store ¶
func (r *Repository) Store(job Job) error
Store stores a job in the repository
func (*Repository) StoreAll ¶
func (r *Repository) StoreAll(jobs []Job) error
StoreAll stores a slice of Job aggregates.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func (*Service) AttachRouter ¶
func (s *Service) AttachRouter(qa queueAction)
func (*Service) Controller ¶
func (s *Service) Controller() *Controller