Documentation ¶
Index ¶
- Variables
- func GenerateIds(ctx context.Context) <-chan JobID
- func Less(left, right Job) bool
- type DelayService
- type Job
- type JobID
- type JobPriorityQueue
- type JobRegistry
- type LockService
- type Priority
- type Server
- type Service
- type StatisticsService
- type StorageService
- func (s *StorageService) Add(j *Job) error
- func (s *StorageService) DeleteByID(id JobID) error
- func (s *StorageService) PeekNextDelayed() (*Job, error)
- func (s *StorageService) PopNextReady() (*Job, error)
- func (s *StorageService) Read(id JobID) (*Job, error)
- func (s *StorageService) Update(j *Job) error
- type TTRService
- type Tube
- type TubePriorityQueue
Constants ¶
This section is empty.
Variables ¶
var ( ErrJobAlreadyExist = errors.New("job already exists in registry") ErrJobMissing = errors.New("job doesn't already exist") ErrQueueMissing = errors.New("queue doesn't already exist") ErrEmptyRegistry = errors.New("registry is empty") ErrEmptyQueue = errors.New("queue is empty") ErrQueueAlreadyExist = errors.New("queue already exists") )
TODO: Split these up into separate variable groups.
var ( // ErrNoJobReady is returned when there is no job ready. ErrNoJobReady = errors.New("no job ready") // ErrNoJobDelayed is returned when there is no delayed job ready. ErrNoJobDelayed = errors.New("no delayed job ready") )
var (
ErrDraining = errors.New("server is draining. No new jobs can be added")
)
Common `Server` errors.
Functions ¶
func GenerateIds ¶
GenerateIds returns a channel with strictly monotonically increasing job IDs. Generation stops are soon ctx is done.
func Less ¶
Less implements the Job comparison of which Job to run first. A job which is "Less" is processed before another job.
Jobs are first compared according to RunnableAt, then by Priority and last by Job ID. Notice that this covers both ready job heap as well as delayed job heap cases (but might make unnecessary comparisons, which is a future optimization to inject a job comparator if it speeds things up).
Types ¶
type DelayService ¶
type DelayService interface { Service }
DelayService converts delayed jobs to READY state when their delayed has passed.
Two possible implementations of this:
- One in-memory. Probably using a heap or something.
- One that that continuously polls the lock service for next job that should be transformed from DELAYED to READY. Will use much less memory.
Uses `LockService` and `StatisticsService`.
type Job ¶
type Job struct { ID JobID RunnableAt *time.Time TimeToRun time.Duration Body []byte Priority Priority }
Job is the structure containing all the metadata for a job.
type JobID ¶
type JobID uint64
JobID is the unique identifier of a job. Every new job gets assigned an incremented id.
type JobPriorityQueue ¶
type JobPriorityQueue interface { // Notify the priority queue that the Job's priority might have changed and // that internal datastructures must be updated to reflect that. Returns // `ErrJobMissing` if the job was not in the queue. Update(*Job) error // Remove and return the Job with the highest priority. Returns // `ErrEmptyQueue` if there are no jobs in the queue. Pop() (*Job, error) // Return the highest priority Job. Return nil if there are no jobs in the // queue. Returns `ErrEmptyQueue` if there are no jobs in the queue. Peek() (*Job, error) // Put a new Job on the queue. Returns `ErrJobAlreadyExist` if job already // exists. Push(*Job) error // RemoveByID removes a Job from the queue with a specific ID. Returns // `ErrQueueMissing` if the job was not in the queue. RemoveByID(JobID) error }
A JobPriorityQueue is a queue which orders jobs according to a specific priority. The queue MAY be backed by a heap, but could equally be backed by a B-tree/LSM on disk. Must be thread-safe.
Jobs have the following priority:
- Jobs with RunnableAt. If two jobs have it defined, the one with the earliest value has higher precedence.
- Jobs with the lower priority value are returned before higher priority value.
- Jobs with lower ID have higher precedence.
type JobRegistry ¶
type JobRegistry interface { // Insert stores a new job in the registry. If a job with the same job // identifier already exist, `ErrJobAlreadyExist` returned. Insert(*Job) error // Update the registry to reflect possible changes made to Job. Returns // `ErrJobMissing` if the Job could not be found. Update(*Job) error // GetByID returns the job with a given job identifier. If the job can't be // found, it returns `ErrJobMissing`. GetByID(JobID) (*Job, error) // DeleteByID deletes a job by job identitifer. If the job can't be found, // it returns `ErrJobMissing`. DeleteByID(JobID) error // GetLargestID returns the JobID of the job with the highest ID. This // method is only called on initialization. Returns `ErrEmptyRegistry` if // the registry contains no jobs. GetLargestID() (JobID, error) }
A JobRegistry stores and queries jobs. Must be thread-safe.
type LockService ¶
type LockService struct {
// contains filtered or unexported fields
}
LockService handles long-polling and locking to orchestrate `StorageService`.
func NewLockService ¶
func NewLockService(storage *StorageService) *LockService
NewLockService creates a new `LockService` which delegates the actual storage to storage.
func (*LockService) Add ¶
func (ls *LockService) Add(j *Job) error
Add adds a new job and notifies other goroutines that there is a new job available. If the storage returns an error, it is returned here.
func (*LockService) DeleteByID ¶
func (ls *LockService) DeleteByID(id JobID) error
DeleteByID deletes a job with the given ID. If an error is returned, it has been relayed from the storage.Delete() call.
func (*LockService) Poll ¶
func (ls *LockService) Poll(ctx context.Context) (*Job, error)
Poll polls a new job. If there is no job available it waits for one to become available, or until the ctx is Done. Error is either an error returned from the storage's PopNextReady() call, or an error returns from context.
type Priority ¶
type Priority uint64
Priority if the priority for a job. When jobs are being polled for, jobs with a smaller priority value has higher priority than larger priority values.
type Server ¶
type Server struct { Storage *LockService // TODO: Investigate if a sync.RWMutex will be useful. Ids <-chan (JobID) // contains filtered or unexported fields }
Server is the facade through which all interactions to geanstalk go from the net layer.
func (*Server) DeleteByID ¶
DeleteByID deletes a job with the given ID from this Server.
type StatisticsService ¶
type StatisticsService interface { }
StatisticsService keeps track of statistics.
type StorageService ¶
type StorageService struct { Jobs JobRegistry ReadyQueue JobPriorityQueue DelayQueue JobPriorityQueue }
StorageService stores jobs. All operations are atomic in terms of storage. Calls to all of its functions are non-blocking.
func (*StorageService) Add ¶
func (s *StorageService) Add(j *Job) error
Add adds a new job to the storage service. Returns ErrJobAlreadyExist if a job with the given ID has already been added.
func (*StorageService) DeleteByID ¶
func (s *StorageService) DeleteByID(id JobID) error
DeleteByID deletes a job with the given ID. Returns ErrJobMissing if the job could not be found.
func (*StorageService) PeekNextDelayed ¶
func (s *StorageService) PeekNextDelayed() (*Job, error)
PeekNextDelayed return the next ready job. Returns `ErrNoJobReady` if there are no jobs ready.
func (*StorageService) PopNextReady ¶
func (s *StorageService) PopNextReady() (*Job, error)
PopNextReady returns the next ready job. Returns ErrNoJobReady if no job is ready.
func (*StorageService) Read ¶
func (s *StorageService) Read(id JobID) (*Job, error)
Read queries a preexisting job by ID. Returns ErrJobMissing if the job could not be found.
func (*StorageService) Update ¶
func (s *StorageService) Update(j *Job) error
Update updates a preexisting job's metadata. Returns ErrJobMissing if the job could not be found.
type TTRService ¶
TTRService keeps track of jobs that are reserved and puts them back in READY state if they are not `touch`ed or `delete`d.
type TubePriorityQueue ¶
type TubePriorityQueue interface { // Peek returns the JobPriorityQueue with highest priority. Returns // ErrEmptyQueue if the queue is epty. Peek() (JobPriorityQueue, error) // Push adds a new JobPriorityQueue to the queue. Push(name Tube, queue JobPriorityQueue) error // FixByTube notifies the TubePriorityQueue the tube might have been // updated. Must be called everytime the tube changes. Returns // ErrQueueMissing if the tube couldn't be found. FixByTube(Tube) error // RemoveByTube returns the JobPriorityQueue for the equivalent tube. // Returns ErrQueueMissing if the tube couldn't be found. RemoveByTube(Tube) error }
TubePriorityQueue is a queue which orders `JobPriorityQueue`s according to a specific priority. The queue MAY be backed by a heap, but could equally be backed by a B-tree/LSM on disk. Must be thread-safe.
Tubes' priority is based on their Peek() return value and uses the same ordering defined for JobPriorityQueue.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package net implements all network communication.
|
Package net implements all network communication. |
Package testing contains common test functions and reusable tests.
|
Package testing contains common test functions and reusable tests. |