Documentation ¶
Overview ¶
Package qr is an in process queue with disk based overflow. Element order is not strictly preserved.
When everything is fine elements flow over Qr.q. This is a simple channel connecting the producer(s) and the consumer(s). If that channel is full elements are written to the Qr.planb channel. swapout() will write all elements from Qr.planb to disk. It makes a new file every `timeout`. At the same time swapin() will deal with completed files. swapin() will open the oldest file and write the elements to Qr.q.
---> Enqueue() ------ .q -----> merge() -> .out -> Dequeue() ---> \ ^ .planb .confluence \ / \--> swapout() swapin() --/ \ ^ \--> fs() --/
Gob is used to serialize entries; custom types should be registered using gob.Register().
Example ¶
package main import ( "fmt" "github.com/alicebob/qr" ) func main() { q, err := qr.New( "/tmp/", "example", qr.OptionBuffer(100), qr.OptionTest("your datatype"), ) if err != nil { panic(err) } defer q.Close() go func() { for e := range q.Dequeue() { fmt.Printf("We got: %v\n", e) } }() // elsewhere: q.Enqueue("aap") q.Enqueue("noot") }
Output:
Index ¶
Examples ¶
Constants ¶
const ( // DefaultTimeout can be changed with OptionTimeout. DefaultTimeout = 10 * time.Second // DefaultBuffer can be changed with OptionBuffer. DefaultBuffer = 1000 )
Variables ¶
var ( // ErrInvalidPrefix is potentially returned by New. ErrInvalidPrefix = errors.New("invalid prefix") )
Functions ¶
This section is empty.
Types ¶
type Option ¶
Option is an option to New(), which can change some settings.
func OptionBuffer ¶
OptionBuffer is an option for New(). It specifies the in-memory size of the queue. Smaller means the disk will be used sooner, larger means more memory.
func OptionLogger ¶
OptionLogger is an option for New(). Is sets the logger, the default is log.Printf, but glog.Errorf would also work.
func OptionTest ¶
func OptionTest(t interface{}) Option
OptionTest is an option for New(). It tests that the given sample item can be serialized to disk and deserialized successfully. This verifies that disk access works, and that the type can be fully serialized and deserialized with gob. The option can be repeated.
func OptionTimeout ¶
OptionTimeout is an option for New(). It specifies the time after which a queue file is closed. Smaller means more files.
type Qr ¶
type Qr struct {
// contains filtered or unexported fields
}
Qr is a disk-based queue. Create one with New().
func New ¶
New starts a Queue which stores files in <dir>/<prefix>-.<timestamp>.qr 'prefix' must be a simple ASCII string.
func (*Qr) Close ¶
func (qr *Qr) Close()
Close shuts down all Go routines and closes the Dequeue() channel. It'll write all in-flight entries to disk. Calling Enqueue() after Close will panic.
func (*Qr) Dequeue ¶
func (qr *Qr) Dequeue() <-chan interface{}
Dequeue is the channel where elements come out the queue. It'll be closed on Close().