Documentation
¶
Overview ¶
Package queue provides a queue structure where you can add items to the queue that can then then switch between 4 sub-queues.
This package provides the functions for a server process to do the work of a jobqueue like beanstalkd. See the jobqueue package for the functions that allow interaction with clients on the network.
Items start in the delay queue. After the item's delay time, they automatically move to the ready queue. From there you can Reserve() an item to get the highest priority (or for those with equal priority, the oldest one (fifo)) which switches it from the ready queue to the run queue. Items can also have dependencies, in which case they start in the dependency queue and only move to the ready queue (bypassing the delay queue) once once all its dependencies have been Remove()d from the queue.
In the run queue the item starts a time-to-release (ttr) countdown; when that runs out the item is placed back on the ready queue. This is to handle a process Reserving an item but then crashing before it deals with the item; with it back on the ready queue, some other process can pick it up.
To stop it going back to the ready queue you either Remove() the item (you dealt with the item successfully), Touch() it to give yourself more time to handle the item, or you Bury() the item (the item can't be dealt with until the user takes some action). When you know you have a transient problem preventing you from handling the item right now, you can manually Release() the item back to the delay queue.
Index ¶
- Variables
- type Error
- type Item
- type ItemDef
- type ItemStats
- type Queue
- func (queue *Queue) Add(key string, data interface{}, priority uint8, delay time.Duration, ...) (item *Item, err error)
- func (queue *Queue) AddMany(items []*ItemDef) (added int, dups int, err error)
- func (queue *Queue) AllItems() (items []*Item)
- func (queue *Queue) Bury(key string) (err error)
- func (queue *Queue) Destroy() (err error)
- func (queue *Queue) Get(key string) (item *Item, err error)
- func (queue *Queue) GetRunningData() (data []interface{})
- func (queue *Queue) HasDependents(key string) (has bool, err error)
- func (queue *Queue) Kick(key string) (err error)
- func (queue *Queue) Release(key string) (err error)
- func (queue *Queue) Remove(key string) (err error)
- func (queue *Queue) Reserve() (item *Item, err error)
- func (queue *Queue) ReserveFiltered(filter ReserveFilter) (item *Item, err error)
- func (queue *Queue) SetChangedCallback(callback changedCallback)
- func (queue *Queue) SetDelay(key string, delay time.Duration) (err error)
- func (queue *Queue) SetReadyAddedCallback(callback readyAddedCallback)
- func (queue *Queue) Stats() *Stats
- func (queue *Queue) Touch(key string) (err error)
- func (queue *Queue) TriggerReadyAddedCallback()
- func (queue *Queue) Update(key string, data interface{}, priority uint8, delay time.Duration, ...) (err error)
- type ReserveFilter
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueClosed = errors.New("queue closed") ErrNothingReady = errors.New("ready queue is empty") ErrAlreadyExists = errors.New("already exists") ErrNotFound = errors.New("not found") ErrNotReady = errors.New("not ready") ErrNotRunning = errors.New("not running") ErrNotBuried = errors.New("not buried") )
queue has some typical errors
Functions ¶
This section is empty.
Types ¶
type Error ¶
type Error struct { Queue string // the queue's Name Op string // name of the method Item string // the item's key Err error // one of our Err vars }
Error records an error and the operation, item and queue that caused it.
type Item ¶
type Item struct { Key string Data interface{} // contains filtered or unexported fields }
Item holds the information about each item in our queue, and has thread-safe functions to update properties as we switch between sub-queues. The 'state' property can have one of the values 'delay', 'ready', 'run', 'bury', 'removed' or 'dependent'.
func (*Item) Dependencies ¶ added in v0.2.0
Dependencies returns the keys of the other items we are dependent upon.
type ItemDef ¶
type ItemDef struct { Key string Data interface{} Priority uint8 // highest priority is 255 Delay time.Duration TTR time.Duration Dependencies []string }
ItemDef makes it possible to supply a slice of Add() args to AddMany().
type ItemStats ¶
type ItemStats struct { State string Reserves uint32 Timeouts uint32 Releases uint32 Buries uint32 Kicks uint32 Age time.Duration Remaining time.Duration Priority uint8 Delay time.Duration TTR time.Duration }
ItemStats holds information about the Item's state. The 'state' property can have one of the values 'delay', 'ready', 'run', 'bury', 'removed' or 'dependent'. Remaining is the time remaining in the current sub-queue. This will be a duration of zero for all but the delay and run states. In the delay state it tells you how long before it can be reserved, and in the run state it tells you how long before it will be released automatically.
type Queue ¶
type Queue struct { Name string // contains filtered or unexported fields }
Queue is a synchronized map of items that can shift to different sub-queues, automatically depending on their delay or ttr expiring, or manually by calling certain methods.
func (*Queue) Add ¶
func (queue *Queue) Add(key string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, deps ...[]string) (item *Item, err error)
Add is a thread-safe way to add new items to the queue. After delay they will switch to the ready sub-queue from where they can be Reserve()d. Once reserved, they have ttr to Remove() the item, otherwise it gets released back to the ready sub-queue. The priority determines which item will be next to be Reserve()d, with priority 255 (the max) items coming before lower priority ones (with 0 being the lowest). Items with the same priority number are Reserve()d on a fifo basis. The final argument to Add() is an optional slice of item ids on which this item depends: this item will first enter the dependency sub-queue and only transfer to the ready sub-queue when items with these ids do not exist in the queue. Add() returns an item, which may have already existed (in which case, nothing was actually added or changed).
func (*Queue) AddMany ¶
AddMany is like Add(), except that you supply a slice of *ItemDef, and it returns the number that were actually added and the number of items that were not added because they were duplicates of items already in the queue. If an error occurs, nothing will have been added.
func (*Queue) AllItems ¶
AllItems returns the items in the queue. NB: You should NOT do anything to these items - use for read-only purposes.
func (*Queue) Bury ¶
Bury is a thread-safe way to switch an item in the run sub-queue to the bury sub-queue, for when the item can't be dealt with ever, at least until the user takes some action and changes something.
func (*Queue) Destroy ¶
Destroy shuts down a queue, destroying any contents. You can't do anything useful with it after that.
func (*Queue) GetRunningData ¶
func (queue *Queue) GetRunningData() (data []interface{})
GetRunningData gets all the item.Data of items currently in the run sub- queue.
func (*Queue) HasDependents ¶ added in v0.2.0
HasDependents tells you if the item with the given key has any other items depending upon it. You'd want to check this before Remove()ing this item if you're removing it because it was undesired as opposed to complete, as Remove() always triggers dependent items to become ready.
func (*Queue) Kick ¶
Kick is a thread-safe way to switch an item in the bury sub-queue to the ready sub-queue, for when a previously buried item can now be handled.
func (*Queue) Release ¶
Release is a thread-safe way to switch an item in the run sub-queue to the delay sub-queue, for when the item should be dealt with later, not now.
func (*Queue) Reserve ¶
Reserve is a thread-safe way to get the highest priority (or for those with equal priority, the oldest (by time since the item was first Add()ed) item in the queue, switching it from the ready sub-queue to the run sub-queue, and in so doing starting its ttr countdown. You need to Remove() the item when you're done with it. If you're still doing something and ttr is approaching, Touch() it, otherwise it will be assumed you died and the item will be released back to the ready sub-queue automatically, to be handled by someone else that gets it from a Reserve() call. If you know you can't handle it right now, but someone else might be able to later, you can manually call Release(), which moves it to the delay sub-queue.
func (*Queue) ReserveFiltered ¶
func (queue *Queue) ReserveFiltered(filter ReserveFilter) (item *Item, err error)
ReserveFiltered is like Reserve(), except you provide a callback function that will receive the next (highest priority || oldest) item's Data. If you don't want that one your callback returns false, and then it will be called again with the following item's Data, and so on until your say you want it by returning true. That will be the item that gets moved from the ready to the run queue and returned to you. If you don't want any you'll get an error as if the ready queue was empty.
func (*Queue) SetChangedCallback ¶
func (queue *Queue) SetChangedCallback(callback changedCallback)
SetChangedCallback sets a callback that will be called when items move from one sub-queue to another. The callback receives the name of the moved-from sub-queue ('new' in the case of entering the queue for the first time), the name of the moved-to sub-queue ('removed' in the case of the item being removed from the queue), and a slice of item.Data of everything that moved in this way. The callback will be initiated in a go routine.
func (*Queue) SetReadyAddedCallback ¶
func (queue *Queue) SetReadyAddedCallback(callback readyAddedCallback)
SetReadyAddedCallback sets a callback that will be called when new items have been added to the ready sub-queue. The callback will receive the name of the queue, and a slice of the Data properties of every item currently in the ready sub-queue. The callback will be initiated in a go routine.
func (*Queue) Stats ¶
Stats returns information about the number of items in the queue and each sub-queue.
func (*Queue) Touch ¶
Touch is a thread-safe way to extend the amount of time a Reserve()d item is allowed to run.
func (*Queue) TriggerReadyAddedCallback ¶
func (queue *Queue) TriggerReadyAddedCallback()
TriggerReadyAddedCallback allows you to manually trigger your readyAddedCallback at times when no new items have been added to the ready queue.
func (*Queue) Update ¶
func (queue *Queue) Update(key string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, deps ...[]string) (err error)
Update is a thread-safe way to change the data, priority, delay, ttr or dependencies of an item. You must supply all of these as per Add() - just supply the old values of those you are not changing (except for dependencies, which remain optional). The old values can be found by getting the item with Get() (giving you item.Key, item.Data and item.Dependencies()), and then calling item.Stats() to get stats.Priority, stats.Delay and stats.TTR.
type ReserveFilter ¶
type ReserveFilter func(data interface{}) bool
ReserveFilter is a callback for use when calling ReserveFiltered(). It will receive an item's Data property and should return false if that is not desired, true if it is.