Documentation
¶
Overview ¶
Package queue provides an in-memory queue structure suitable for the safe and low latency implementation of a real job queue.
It's like beanstalkd, but faster, with the ability to query the queue for desired items, reject duplicates, and wait on dependencies.
Like beanstalkd, when you add items to the queue, they move between different sub-queues:
Items start in the delay queue. After the item's delay time, they automatically move to the ready queue. From there you can Reserve() an item to get the highest priority (or for those with equal priority, the oldest - fifo) one which switches it from the ready queue to the run queue. Items can also have dependencies, in which case they start in the dependency queue and only move to the ready queue (bypassing the delay queue) once all its dependencies have been Remove()d from the queue. Items can also belong to a reservation group, in which case you can Reserve() an item in a desired group.
In the run queue the item starts a time-to-release (ttr) countdown; when that runs out the item is placed back on the ready queue. This is to handle a process Reserving an item but then crashing before it deals with the item; with it back on the ready queue, some other process can pick it up.
To stop it going back to the ready queue you either Remove() the item (you dealt with the item successfully), Touch() it to give yourself more time to handle the item, or you Bury() the item (the item can't be dealt with until the user takes some action). When you know you have a transient problem preventing you from handling the item right now, you can manually Release() the item back to the delay queue.
import "github.com/VertebrateResequencing/wr/queue" q = queue.New("myQueue") q.SetReadyAddedCallback(func(queuename string, allitemdata []interface{}) { for _, item := range allitemdata { // cast item to the original type, then arrange to do something now // you know that the item is ready to be processed } }) // add an item to the queue ttr := 30 * time.Second item, err := q.Add("uuid", "group", "item data", 0, 0 * time.Second, ttr) // get it back out item, err = queue.Get("uuid") // reserve the next item item, err = queue.Reserve() // or reserve the next item in a particular group item, err = queue.Reserve("group") // queue.Touch() every < ttr seconds if you might take longer than ttr to // process the item // say you successfully handled the item item.Remove()
Index ¶
- Variables
- type Error
- type Item
- type ItemDef
- type ItemStats
- type Queue
- func (queue *Queue) Add(key string, reserveGroup string, data interface{}, priority uint8, ...) (item *Item, err error)
- func (queue *Queue) AddMany(items []*ItemDef) (added int, dups int, err error)
- func (queue *Queue) AllItems() (items []*Item)
- func (queue *Queue) Bury(key string) (err error)
- func (queue *Queue) Destroy() (err error)
- func (queue *Queue) Get(key string) (item *Item, err error)
- func (queue *Queue) GetRunningData() (data []interface{})
- func (queue *Queue) HasDependents(key string) (has bool, err error)
- func (queue *Queue) Kick(key string) (err error)
- func (queue *Queue) Release(key string) (err error)
- func (queue *Queue) Remove(key string) (err error)
- func (queue *Queue) Reserve(reserveGroup ...string) (item *Item, err error)
- func (queue *Queue) SetChangedCallback(callback changedCallback)
- func (queue *Queue) SetDelay(key string, delay time.Duration) (err error)
- func (queue *Queue) SetReadyAddedCallback(callback readyAddedCallback)
- func (queue *Queue) SetReserveGroup(key string, newGroup string) (err error)
- func (queue *Queue) Stats() *Stats
- func (queue *Queue) Touch(key string) (err error)
- func (queue *Queue) TriggerReadyAddedCallback()
- func (queue *Queue) Update(key string, reserveGroup string, data interface{}, priority uint8, ...) (err error)
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueClosed = errors.New("queue closed") ErrNothingReady = errors.New("ready queue is empty") ErrAlreadyExists = errors.New("already exists") ErrNotFound = errors.New("not found") ErrNotReady = errors.New("not ready") ErrNotRunning = errors.New("not running") ErrNotBuried = errors.New("not buried") )
queue has some typical errors
Functions ¶
This section is empty.
Types ¶
type Error ¶
type Error struct { Queue string // the queue's Name Op string // name of the method Item string // the item's key Err error // one of our Err vars }
Error records an error and the operation, item and queue that caused it.
type Item ¶
type Item struct { Key string ReserveGroup string Data interface{} // contains filtered or unexported fields }
Item holds the information about each item in our queue, and has thread-safe functions to update properties as we switch between sub-queues. The 'state' property can have one of the values 'delay', 'ready', 'run', 'bury', 'removed' or 'dependent'.
func (*Item) Dependencies ¶ added in v0.2.0
Dependencies returns the keys of the other items we are dependent upon. Note, do not add these back during a queue.Update(), or you could end up adding back dependencies that already got resolved, leaving you in a permanent dependent state; use UnresolvedDependencies() for that purpose instead.
func (*Item) UnresolvedDependencies ¶ added in v0.3.0
UnresolvedDependencies returns the keys of the other items we are still dependent upon.
type ItemDef ¶
type ItemDef struct { Key string ReserveGroup string Data interface{} Priority uint8 // highest priority is 255 Delay time.Duration TTR time.Duration Dependencies []string }
ItemDef makes it possible to supply a slice of Add() args to AddMany().
type ItemStats ¶
type ItemStats struct { State string Reserves uint32 Timeouts uint32 Releases uint32 Buries uint32 Kicks uint32 Age time.Duration Remaining time.Duration Priority uint8 Delay time.Duration TTR time.Duration }
ItemStats holds information about the Item's state. The 'state' property can have one of the values 'delay', 'ready', 'run', 'bury', 'removed' or 'dependent'. Remaining is the time remaining in the current sub-queue. This will be a duration of zero for all but the delay and run states. In the delay state it tells you how long before it can be reserved, and in the run state it tells you how long before it will be released automatically.
type Queue ¶
type Queue struct { Name string // contains filtered or unexported fields }
Queue is a synchronized map of items that can shift to different sub-queues, automatically depending on their delay or ttr expiring, or manually by calling certain methods.
func (*Queue) Add ¶
func (queue *Queue) Add(key string, reserveGroup string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, deps ...[]string) (item *Item, err error)
Add is a thread-safe way to add new items to the queue. After delay they will switch to the ready sub-queue from where they can be Reserve()d. Once reserved, they have ttr to Remove() the item, otherwise it gets released back to the ready sub-queue. The priority determines which item will be next to be Reserve()d, with priority 255 (the max) items coming before lower priority ones (with 0 being the lowest). Items with the same priority number are Reserve()d on a fifo basis. reserveGroup can be left as an empty string, but specifying it then lets you provide the same to Reserve() to get the next item with the given reserveGroup. The final argument to Add() is an optional slice of item ids on which this item depends: this item will first enter the dependency sub-queue and only transfer to the ready sub-queue when items with these ids get Remove()d from the queue. Add() returns an item, which may have already existed (in which case, nothing was actually added or changed).
func (*Queue) AddMany ¶
AddMany is like Add(), except that you supply a slice of *ItemDef, and it returns the number that were actually added and the number of items that were not added because they were duplicates of items already in the queue. If an error occurs, nothing will have been added.
func (*Queue) AllItems ¶
AllItems returns the items in the queue. NB: You should NOT do anything to these items - use for read-only purposes.
func (*Queue) Bury ¶
Bury is a thread-safe way to switch an item in the run sub-queue to the bury sub-queue, for when the item can't be dealt with ever, at least until the user takes some action and changes something.
func (*Queue) Destroy ¶
Destroy shuts down a queue, destroying any contents. You can't do anything useful with it after that.
func (*Queue) GetRunningData ¶
func (queue *Queue) GetRunningData() (data []interface{})
GetRunningData gets all the item.Data of items currently in the run sub- queue.
func (*Queue) HasDependents ¶ added in v0.2.0
HasDependents tells you if the item with the given key has any other items depending upon it. You'd want to check this before Remove()ing this item if you're removing it because it was undesired as opposed to complete, as Remove() always triggers dependent items to become ready.
func (*Queue) Kick ¶
Kick is a thread-safe way to switch an item in the bury sub-queue to the ready sub-queue, for when a previously buried item can now be handled.
func (*Queue) Release ¶
Release is a thread-safe way to switch an item in the run sub-queue to the delay sub-queue, for when the item should be dealt with later, not now.
func (*Queue) Reserve ¶
Reserve is a thread-safe way to get the highest priority (or for those with equal priority, the oldest (by time since the item was first Add()ed) item in the queue, switching it from the ready sub-queue to the run sub-queue, and in so doing starting its ttr countdown. By specifying the optional reserveGroup argument, you will get the next item that was added with the given ReserveGroup (conversely, if your items were added with ReserveGroups but you don't supply one here, you will not get an item).
You need to Remove() the item when you're done with it. If you're still doing something and ttr is approaching, Touch() it, otherwise it will be assumed you died and the item will be released back to the ready sub- queue automatically, to be handled by someone else that gets it from a Reserve() call. If you know you can't handle it right now, but someone else might be able to later, you can manually call Release(), which moves it to the delay sub-queue.
func (*Queue) SetChangedCallback ¶
func (queue *Queue) SetChangedCallback(callback changedCallback)
SetChangedCallback sets a callback that will be called when items move from one sub-queue to another. The callback receives the name of the moved-from sub-queue ('new' in the case of entering the queue for the first time), the name of the moved-to sub-queue ('removed' in the case of the item being removed from the queue), and a slice of item.Data of everything that moved in this way. The callback will be initiated in a go routine.
func (*Queue) SetReadyAddedCallback ¶
func (queue *Queue) SetReadyAddedCallback(callback readyAddedCallback)
SetReadyAddedCallback sets a callback that will be called when new items have been added to the ready sub-queue. The callback will receive the name of the queue, and a slice of the Data properties of every item currently in the ready sub-queue. The callback will be initiated in a go routine.
func (*Queue) SetReserveGroup ¶ added in v0.8.0
SetReserveGroup is a thread-safe way to change the ReserveGroup of an item.
func (*Queue) Stats ¶
Stats returns information about the number of items in the queue and each sub-queue.
func (*Queue) Touch ¶
Touch is a thread-safe way to extend the amount of time a Reserve()d item is allowed to run.
func (*Queue) TriggerReadyAddedCallback ¶
func (queue *Queue) TriggerReadyAddedCallback()
TriggerReadyAddedCallback allows you to manually trigger your readyAddedCallback at times when no new items have been added to the ready queue.
func (*Queue) Update ¶
func (queue *Queue) Update(key string, reserveGroup string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, deps ...[]string) (err error)
Update is a thread-safe way to change the data, ReserveGroup, priority, delay, ttr or dependencies of an item. You must supply all of these as per Add() - just supply the old values of those you are not changing (except for dependencies, which remain optional). The old values can be found by getting the item with Get() (giving you item.Key, item.ReserveGroup, item.Data and item.UnresolvedDependencies()), and then calling item.Stats() to get stats.Priority, stats.Delay and stats.TTR.