Documentation
¶
Index ¶
- Constants
- Variables
- func ValidateAddJob(j *Job) error
- func ValidateID(id ID) error
- func ValidateName(name string) error
- func ValidatePayload(p []byte) error
- func ValidateResult(r []byte) error
- func ValidateRunJob(j *Job) error
- func ValidateScheduleJob(j *Job) error
- func ValidateTTL(ttl uint64) error
- func ValidateTTR(ttr uint32) error
- func ValidateTime(t time.Time) error
- func ValidateTimeout(t uint32) error
- type Adder
- type Completer
- type Controller
- func (c *Controller) Add(j *Job) error
- func (c *Controller) Complete(id ID, result []byte) error
- func (c *Controller) Delete(id ID) error
- func (c *Controller) Expire(id ID)
- func (c *Controller) ExpireFunc() func(ID)
- func (c *Controller) Fail(id ID, result []byte) error
- func (c *Controller) HandleExpire(f func(ID))
- func (c *Controller) HandleTimeoutAttempt(f func(ID))
- func (c *Controller) Lease(names []string, timeout uint32) (*Job, error)
- func (c *Controller) Run(j *Job, timeout uint32) (*RunResult, error)
- func (c *Controller) Schedule(j *Job) error
- func (c *Controller) StartAttempt(id ID) error
- func (c *Controller) Stats() Stats
- func (c *Controller) TimeoutAttempt(id ID)
- func (c *Controller) TimeoutAttemptFunc() func(ID)
- type ControllerInterface
- type Deleter
- type Failer
- type ID
- type Inspector
- type Iterator
- type Job
- type JobProxy
- type Leaser
- type QueueController
- func (c *QueueController) Add(j *Job) bool
- func (c *QueueController) Awake(j *Job) bool
- func (c *QueueController) Delete(j *Job) bool
- func (c *QueueController) Exists(j *Job) bool
- func (c *QueueController) Lease(name string) <-chan JobProxy
- func (c *QueueController) Queue(name string) QueueInterface
- func (c *QueueController) Queues() (map[string]QueueInterface, *sync.RWMutex)
- func (c *QueueController) Run(j *Job) bool
- func (c *QueueController) Schedule(j *Job) bool
- type QueueControllerInterface
- type QueueInterface
- type Registry
- type Result
- type RunRecord
- type RunResult
- type Runner
- type Scheduler
- type Stats
- type Timer
- type Wait
- type WorkQueue
Constants ¶
const ( MaxName = 255 // 1 MiB = 1,024 KiB = 1,048,576 bytes MaxPayload = 1048576 MaxResult = 1048576 MaxAttempts = 255 MaxFails = 255 // 24 hours in ms MaxTTR = 86400000 // 30 days in ms MaxTTL = 2592000000 MaxHardAttempts = 255 TimeFormat = "2006-01-02T15:04:05Z" // Max timeout for wait related cmds (lease). MaxTimeout = 86400000 )
const ( StateNew = 0 StateCompleted = 1 StateFailed = 2 StatePending = 3 StateLeased = 4 MaxRunRecAttempts = 1<<64 - 1 RunRecTTRTimerIdx = 0 RunRecTTLTimerIdx = 1 RunRecSchedTimerIdx = 2 )
Variables ¶
var ( ErrDuplicateJob = errors.New("Duplicate job") ErrDuplicateResult = errors.New("Duplicate result") ErrLeaseExpired = errors.New("Lease expired") ErrMaxAttemptsReached = errors.New("Max attempts reached") ErrAlreadyLeased = errors.New("Job already leased") ErrAlreadyProcessed = errors.New("Job already processed") ErrEnqueue = errors.New("Unable to enqueue") ErrNotFound = errors.New("Not found") ErrTimeout = errors.New("Timeout") ErrQueueOutOfSync = errors.New("Queue out of sync") )
var ( ErrInvalidID = errors.New("Invalid ID") ErrInvalidName = errors.New("Invalid Name") ErrInvalidPayload = errors.New("Invalid Payload") ErrInvalidResult = errors.New("Invalid Result") ErrInvalidMaxAttempts = errors.New("Invalid Max Attempts") ErrInvalidMaxFails = errors.New("Invalid Max Fails") ErrInvalidPriority = errors.New("Invalid Priority") ErrInvalidTTR = errors.New("Invalid TTR") ErrInvalidTTL = errors.New("Invalid TTL") ErrInvalidTime = errors.New("Invalid Time") ErrInvalidTimeout = errors.New("Invalid Timeout") )
var (
ErrDuplicate = errors.New("Duplicate")
)
Functions ¶
func ValidateAddJob ¶
func ValidateID ¶
func ValidateName ¶
func ValidatePayload ¶
func ValidateResult ¶
func ValidateRunJob ¶
func ValidateScheduleJob ¶
func ValidateTTL ¶
Valid TTL is 2^64 - 1, non zero, and non-negative.
func ValidateTTR ¶
A valid TTR is 2^32 - 1, non zero, and non-negative.
func ValidateTime ¶
Valid time is in UTC, and greater or equal to current time.
func ValidateTimeout ¶
Types ¶
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
func NewController ¶
func NewController(reg *Registry, qc QueueControllerInterface) *Controller
func (*Controller) Add ¶
func (c *Controller) Add(j *Job) error
Adds a job to its named work queue with respect for <priority>. TTL timer starts immmediately.
func (*Controller) Complete ¶
func (c *Controller) Complete(id ID, result []byte) error
Sucessfully complete a job with an optional result. Stops TTR timer.
func (*Controller) Delete ¶
func (c *Controller) Delete(id ID) error
Delete a job by ID regardless of existing state.
func (*Controller) Expire ¶
func (c *Controller) Expire(id ID)
Expire job by ID. Invoked by TTL timers and removes job regardless of state. See the expire() method for the implementation.
func (*Controller) ExpireFunc ¶
func (c *Controller) ExpireFunc() func(ID)
Return the current Expire callback.
func (*Controller) Fail ¶
func (c *Controller) Fail(id ID, result []byte) error
Fail a job with a result. Stops TTR timer.
func (*Controller) HandleExpire ¶
func (c *Controller) HandleExpire(f func(ID))
HandleExpire sets an the func to be used for the Expire() method. This is primarily used to allow the proxying of a job controller in full as the expire method is not invoked directly, but rather as a secondary call from "add" or "schedule". Example can be found in the cmdlog package.
func (*Controller) HandleTimeoutAttempt ¶
func (c *Controller) HandleTimeoutAttempt(f func(ID))
HandleTimeoutAttempt sets the func to be used for the TimeoutAttempt() method. This is primarily used to allow the proxying of a job controller in full as the TimeoutAttempt method is not invoked directly, but rather as a background call from "lease". Example can be found in the cmdlog package.
func (*Controller) Lease ¶
func (c *Controller) Lease(names []string, timeout uint32) (*Job, error)
Lease a job by name blocking until <wait-timeout>. Multiple job names can be specified and they will be processed uniformly by random selection.
Returns a leased job on success within <wait-timeout> or a timeout error. TTR timer starts immediately on success.
func (*Controller) Run ¶
func (c *Controller) Run(j *Job, timeout uint32) (*RunResult, error)
Run a job, blocking until wait-timeout if no workers are available, or until TTR if a worker is processing.
Returns the job result on successful completion. TIMEOUT error is returned if no workers were available within <wait-timeout> or if the job failed to complete within TTR.
This is the syncronous form of "add job". All job related data is deleted
func (*Controller) Schedule ¶
func (c *Controller) Schedule(j *Job) error
Schedules a job to run at a UTC time with respect for <priority> TTL timer starts when scheduled time is met.
func (*Controller) StartAttempt ¶
func (c *Controller) StartAttempt(id ID) error
Start an attempt by ID. Ensures jobs respect TTR & max attempts policy.
func (*Controller) Stats ¶
func (c *Controller) Stats() Stats
Stats returns job controllers stats at the current time.
func (*Controller) TimeoutAttempt ¶
func (c *Controller) TimeoutAttempt(id ID)
Timeout job attempt by ID. See the "timeoutAttempt()" method for the implementation.
func (*Controller) TimeoutAttemptFunc ¶
func (c *Controller) TimeoutAttemptFunc() func(ID)
Return TimeoutAttempt callback.
type ControllerInterface ¶
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector allows for deeper inspection of a WorkQueue.
func NewInspector ¶
type Job ¶
type Job struct { ID ID Name string // Unique name of job Payload []byte // 1MB limit Priority int32 // Priority from lowest to highest MaxAttempts uint8 // Num of allowed attempts MaxFails uint8 // Num of allowed failures TTR uint32 // time to run in ms TTL uint64 // max time to live in ms Time time.Time // Scheduled Time to Exec Created time.Time }
func NewEmptyJob ¶
func NewEmptyJob() *Job
New Empty Job returns an Job with its created time initiliazed.
func (*Job) Expiration ¶
type JobProxy ¶
JobProxy interface A function that returns a job when invoked following comma ok idiom Used in WorkQueue leases
type QueueController ¶
type QueueController struct {
// contains filtered or unexported fields
}
Queue controller owns/manages all queues. Almost all calls are proxied to the QueueInterface holder. Allows for management of any queue by name in a single location. Friendly for future queue call interception.
func NewQueueController ¶
func NewQueueController() *QueueController
func (*QueueController) Add ¶
func (c *QueueController) Add(j *Job) bool
Add a job Automatically creates the queue if it does not exist.
func (*QueueController) Awake ¶
func (c *QueueController) Awake(j *Job) bool
Awake an existing scheduled job
func (*QueueController) Delete ¶
func (c *QueueController) Delete(j *Job) bool
Delete a job Automatically creates the queue if it does not exist.
func (*QueueController) Exists ¶
func (c *QueueController) Exists(j *Job) bool
Verify if a job exists by object.
func (*QueueController) Lease ¶
func (c *QueueController) Lease(name string) <-chan JobProxy
Return a job lease by name
func (*QueueController) Queue ¶
func (c *QueueController) Queue(name string) QueueInterface
Return a queue by name, creating it if it does not exist.
func (*QueueController) Queues ¶
func (c *QueueController) Queues() (map[string]QueueInterface, *sync.RWMutex)
Return all queues as a map with an explicit sync.RWMutex. All calls to the map require locking.
func (*QueueController) Run ¶
func (c *QueueController) Run(j *Job) bool
Run a job @FYI When durability is added, run cmd's will not be durable. Automatically creates the queue if it does not exist.
func (*QueueController) Schedule ¶
func (c *QueueController) Schedule(j *Job) bool
Schedule a job Automatically creates the queue if it does not exist.
type QueueInterface ¶
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry holds all Job Run Records
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) Add ¶
Add a RunRecord indexed by attached job id. Returns false on duplicate or invalid job id.
type RunRecord ¶
type RunRecord struct { Attempts uint64 // Number of attempts made State uint8 // One of state constants Fails uint8 // Number of explicit failures Job *Job // attached job object Result Result // Result of job Wait Wait // Wait channel for job result Timers [3]*Timer // Timer container for scheduled time, TTR, TTL Mu sync.RWMutex }
Run Record encapsulates the meta for the execution of jobs. This detached run record allows Job objects to stay immutable after creation.
type RunResult ¶
RunResult is sent over the Wait channel. Allows readers to wait for a result and determine the success.
type Stats ¶
type Stats struct { // Number of incomplete jobs expired by TTL. EvictedJobs uint64 }
Stats represents job controller specific stats.
type Timer ¶
type Timer struct { C <-chan time.Time Cancellation chan struct{} // Read only timer deadline, useful for approximations. // Slightly behind actual timer as the calculation is: // NOW + Duration. Deadline time.Time // contains filtered or unexported fields }
Timer container encapsulates the original time.Timer with support for Cancellation.
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
WorkQueue encapsulates 3 separate queues for a single type of job. Queues implement skiplists with different compare functions.
func (*WorkQueue) Add ¶
Add a job Returns false on duplicate job Returns true on success and sends a job proxy to signal leasers.