geanstalkd

package module
v0.0.0-...-71312fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2018 License: GPL-3.0 Imports: 5 Imported by: 4

README

geanstalkd

Build Status Go Report Card

A beanstalkd clone written in Go. beanstalkd is a small distributed task queue that supports multiple task queue (called "tubes"), task priority and task delay.

Why a reimplementation?

Don't get me wrong - beanstalkd is amazing, fast and stable! This is somewhat of a hobby project where I implement beanstalkd in Go, but long-term this project might also allow some neat stuff that beanstalkd doesn't want.

What differs from beanstalkd?

  • A single client can reserve multiple jobs.
  • Uses multiple cores.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobAlreadyExist   = errors.New("job already exists in registry")
	ErrJobMissing        = errors.New("job doesn't already exist")
	ErrQueueMissing      = errors.New("queue doesn't already exist")
	ErrEmptyRegistry     = errors.New("registry is empty")
	ErrEmptyQueue        = errors.New("queue is empty")
	ErrQueueAlreadyExist = errors.New("queue already exists")
)

TODO: Split these up into separate variable groups.

View Source
var (
	// ErrNoJobReady is returned when there is no job ready.
	ErrNoJobReady = errors.New("no job ready")
	// ErrNoJobDelayed is returned when there is no delayed job ready.
	ErrNoJobDelayed = errors.New("no delayed job ready")
)
View Source
var (
	ErrDraining = errors.New("server is draining. No new jobs can be added")
)

Common `Server` errors.

Functions

func GenerateIds

func GenerateIds(ctx context.Context) <-chan JobID

GenerateIds returns a channel with strictly monotonically increasing job IDs. Generation stops are soon ctx is done.

func Less

func Less(left, right Job) bool

Less implements the Job comparison of which Job to run first. A job which is "Less" is processed before another job.

Jobs are first compared according to RunnableAt, then by Priority and last by Job ID. Notice that this covers both ready job heap as well as delayed job heap cases (but might make unnecessary comparisons, which is a future optimization to inject a job comparator if it speeds things up).

Types

type DelayService

type DelayService interface {
	Service
}

DelayService converts delayed jobs to READY state when their delayed has passed.

Two possible implementations of this:

  • One in-memory. Probably using a heap or something.
  • One that that continuously polls the lock service for next job that should be transformed from DELAYED to READY. Will use much less memory.

Uses `LockService` and `StatisticsService`.

type Job

type Job struct {
	ID         JobID
	RunnableAt *time.Time
	TimeToRun  time.Duration
	Body       []byte
	Priority   Priority
}

Job is the structure containing all the metadata for a job.

func (Job) Copy

func (j Job) Copy() Job

Copy creates a new copy of the job.

type JobID

type JobID uint64

JobID is the unique identifier of a job. Every new job gets assigned an incremented id.

type JobPriorityQueue

type JobPriorityQueue interface {
	// Notify the priority queue that the Job's priority might have changed and
	// that internal datastructures must be updated to reflect that. Returns
	// `ErrJobMissing` if the job was not in the queue.
	Update(*Job) error

	// Remove and return the Job with the highest priority. Returns
	// `ErrEmptyQueue` if there are no jobs in the queue.
	Pop() (*Job, error)

	// Return the highest priority Job. Return nil if there are no jobs in the
	// queue. Returns `ErrEmptyQueue` if there are no jobs in the queue.
	Peek() (*Job, error)

	// Put a new Job on the queue. Returns `ErrJobAlreadyExist` if job already
	// exists.
	Push(*Job) error

	// RemoveByID removes a Job from the queue with a specific ID.  Returns
	// `ErrQueueMissing` if the job was not in the queue.
	RemoveByID(JobID) error
}

A JobPriorityQueue is a queue which orders jobs according to a specific priority. The queue MAY be backed by a heap, but could equally be backed by a B-tree/LSM on disk. Must be thread-safe.

Jobs have the following priority:

  1. Jobs with RunnableAt. If two jobs have it defined, the one with the earliest value has higher precedence.
  2. Jobs with the lower priority value are returned before higher priority value.
  3. Jobs with lower ID have higher precedence.

type JobRegistry

type JobRegistry interface {
	// Insert stores a new job in the registry. If a job with the same job
	// identifier already exist, `ErrJobAlreadyExist` returned.
	Insert(*Job) error

	// Update the registry to reflect possible changes made to Job. Returns
	// `ErrJobMissing` if the Job could not be found.
	Update(*Job) error

	// GetByID returns the job with a given job identifier. If the job can't be
	// found, it returns `ErrJobMissing`.
	GetByID(JobID) (*Job, error)

	// DeleteByID deletes a job by job identitifer. If the job can't be found,
	// it returns `ErrJobMissing`.
	DeleteByID(JobID) error

	// GetLargestID returns the JobID of the job with the highest ID. This
	// method is only called on initialization. Returns `ErrEmptyRegistry` if
	// the registry contains no jobs.
	GetLargestID() (JobID, error)
}

A JobRegistry stores and queries jobs. Must be thread-safe.

type LockService

type LockService struct {
	// contains filtered or unexported fields
}

LockService handles long-polling and locking to orchestrate `StorageService`.

func NewLockService

func NewLockService(storage *StorageService) *LockService

NewLockService creates a new `LockService` which delegates the actual storage to storage.

func (*LockService) Add

func (ls *LockService) Add(j *Job) error

Add adds a new job and notifies other goroutines that there is a new job available. If the storage returns an error, it is returned here.

func (*LockService) DeleteByID

func (ls *LockService) DeleteByID(id JobID) error

DeleteByID deletes a job with the given ID. If an error is returned, it has been relayed from the storage.Delete() call.

func (*LockService) Poll

func (ls *LockService) Poll(ctx context.Context) (*Job, error)

Poll polls a new job. If there is no job available it waits for one to become available, or until the ctx is Done. Error is either an error returned from the storage's PopNextReady() call, or an error returns from context.

type Priority

type Priority uint64

Priority if the priority for a job. When jobs are being polled for, jobs with a smaller priority value has higher priority than larger priority values.

type Server

type Server struct {
	Storage *LockService

	// TODO: Investigate if a sync.RWMutex will be useful.
	Ids <-chan (JobID)
	// contains filtered or unexported fields
}

Server is the facade through which all interactions to geanstalk go from the net layer.

func (*Server) Add

func (s *Server) Add(j *Job) error

Add adds a new job to this Server.

func (*Server) BuildJob

func (s *Server) BuildJob(pri Priority, at time.Time, ttr time.Duration, jobdata []byte) Job

BuildJob constructs a new job with an ID unique to this Server.

func (*Server) DeleteByID

func (s *Server) DeleteByID(id JobID) error

DeleteByID deletes a job with the given ID from this Server.

type Service

type Service interface {
	io.Closer
}

Service is the generic interface for all services that are closeable.

type StatisticsService

type StatisticsService interface {
}

StatisticsService keeps track of statistics.

type StorageService

type StorageService struct {
	Jobs       JobRegistry
	ReadyQueue JobPriorityQueue
	DelayQueue JobPriorityQueue
}

StorageService stores jobs. All operations are atomic in terms of storage. Calls to all of its functions are non-blocking.

func (*StorageService) Add

func (s *StorageService) Add(j *Job) error

Add adds a new job to the storage service. Returns ErrJobAlreadyExist if a job with the given ID has already been added.

func (*StorageService) DeleteByID

func (s *StorageService) DeleteByID(id JobID) error

DeleteByID deletes a job with the given ID. Returns ErrJobMissing if the job could not be found.

func (*StorageService) PeekNextDelayed

func (s *StorageService) PeekNextDelayed() (*Job, error)

PeekNextDelayed return the next ready job. Returns `ErrNoJobReady` if there are no jobs ready.

func (*StorageService) PopNextReady

func (s *StorageService) PopNextReady() (*Job, error)

PopNextReady returns the next ready job. Returns ErrNoJobReady if no job is ready.

func (*StorageService) Read

func (s *StorageService) Read(id JobID) (*Job, error)

Read queries a preexisting job by ID. Returns ErrJobMissing if the job could not be found.

func (*StorageService) Update

func (s *StorageService) Update(j *Job) error

Update updates a preexisting job's metadata. Returns ErrJobMissing if the job could not be found.

type TTRService

type TTRService interface {
	Service

	Reserve(Job)
	Delete(Job)
	Touch(Job)
}

TTRService keeps track of jobs that are reserved and puts them back in READY state if they are not `touch`ed or `delete`d.

type Tube

type Tube string

Tube is a queue.

type TubePriorityQueue

type TubePriorityQueue interface {
	// Peek returns the JobPriorityQueue with highest priority. Returns
	// ErrEmptyQueue if the queue is epty.
	Peek() (JobPriorityQueue, error)

	// Push adds a new JobPriorityQueue to the queue.
	Push(name Tube, queue JobPriorityQueue) error

	// FixByTube notifies the TubePriorityQueue the tube might have been
	// updated. Must be called everytime the tube changes. Returns
	// ErrQueueMissing if the tube couldn't be found.
	FixByTube(Tube) error

	// RemoveByTube returns the JobPriorityQueue for the equivalent tube.
	// Returns ErrQueueMissing if the tube couldn't be found.
	RemoveByTube(Tube) error
}

TubePriorityQueue is a queue which orders `JobPriorityQueue`s according to a specific priority. The queue MAY be backed by a heap, but could equally be backed by a B-tree/LSM on disk. Must be thread-safe.

Tubes' priority is based on their Peek() return value and uses the same ordering defined for JobPriorityQueue.

Directories

Path Synopsis
cmd
Package net implements all network communication.
Package net implements all network communication.
Package testing contains common test functions and reusable tests.
Package testing contains common test functions and reusable tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL