queue

package
v0.0.0-...-988a430 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2017 License: MIT Imports: 4 Imported by: 0

README

Queue

That is a mechanism for storing data temporarily and making it available to be consumed asynchronously by other parts of the system.

Characteristics

This implementation works only in memory, there is no automatic persistence if the memory limit is reached. The safest way to use the queue system is to persist the data in a database using a transaction and in the same way update that database in a transaction whenever a queue item is processed. In case there is an error, undo the change in the database.

Basic Operation

Put

Places an item in the queue.

Example
q, err := New()
if err != nil {
	fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)
Reserve

Reserve picks up an item from the queue and makes it available for processing. The system has 30 seconds or the time in MaxReserveTime to process the item, this time can be extended using the Renew function. If the item is not deleted or released until the time runs out the system automatically returns the item to the list.

Example
q, err := New()
if err != nil {
	fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)

var r interface{}
_, r, err = q.Reserve()

x := r.([]byte)

fmt.Println(string(x))
Renew

The Renew function reset the timer of the reserved item so that the system has more time to process the data. In long processes call Renew periodically.

Example
q, err := New()
if err != nil {
    fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)

var r interface{}
var hash string
hash, r, err = q.Reserve()
if err != nil {
    fmt.Println(err.Error())
    return
}

...
	
err = q.Renew(hash)
if err != nil {
    fmt.Println(err.Error())
    return
}
Release

The Release function frees the reserved item by returning it to the queue and leaving it available to by used by another process. Must be used when the current instance can not process the reserved item.

Example
q, err := New()
if err != nil {
    fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)

var r interface{}
var hash string
hash, r, err = q.Reserve()
if err != nil {
    fmt.Println(err.Error())
    return
}

...
	
err = q.Release(hash)
if err != nil {
    fmt.Println(err.Error())
    return
}
Remove

The Remove function is used to remove the reserved item from the list. Must be used when the current instance was able to process the data and it can be removed.

Example
q, err := New()
if err != nil {
    fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)

var r interface{}
var hash string
hash, r, err = q.Reserve()
if err != nil {
    fmt.Println(err.Error())
    return
}

...
	
err = q.Remove(hash)
if err != nil {
    fmt.Println(err.Error())
    return
}
Count

The Count function is used to know how many items still exist in the list.

Example
q, err := New()
if err != nil {
    fmt.Println(err.Error())
    return
}

b := []byte{'a', 'b', 'c'}
q.Put(b)

fmt.Printf("Count: %d",q.Count())

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorHashNotFound = errors.New("Hash not found")
View Source
var ErrorItemNotReserved = errors.New("Item not reserved")
View Source
var ErrorNoItemsAvailable = errors.New("No items available")

Functions

This section is empty.

Types

type Data

type Data struct {
	MaxReserveTime float64         `json:"max_reserve_time"`
	ItemList       map[string]Item `json:"item"`
	// contains filtered or unexported fields
}

Data contains all data in the current queue

func New

func New() (q *Data, err error)

New queue

func (*Data) Count

func (q *Data) Count() int

Count items in the queue

func (*Data) Put

func (q *Data) Put(v interface{})

Put data in the queue

func (*Data) Release

func (q *Data) Release(hash string) (err error)

Release reserved item

func (*Data) Remove

func (q *Data) Remove(hash string) (err error)

Remove item from the queue, the item must be reserved

func (*Data) Renew

func (q *Data) Renew(hash string) (err error)

Renew the reservation of an item in the queue

func (*Data) Reserve

func (q *Data) Reserve() (hash string, value interface{}, err error)

Reserve searches for the next available item in the queue If the item is not removed or the reservation time is not renewed, the item will returns to the queue automatically

type Item

type Item struct {
	ReservedAt time.Time   `json:"reserved_at"`
	Value      interface{} `json:"value"`
}

Item struct is the basic queue item

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL