dque

package module
v0.0.0-...-2af3097 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: MIT Imports: 14 Imported by: 0

README

dque - a fast embedded durable queue for Go

Go Report Card GoDoc

dque is:

  • persistent -- survives program restarts
  • scalable -- not limited by your RAM, but by your disk space
  • FIFO -- First In First Out
  • embedded -- compiled into your Golang program
  • synchronized -- safe for concurrent usage
  • fast or safe, you choose -- turbo mode lets the OS decide when to write to disk
  • has a liberal license -- allows any use, commercial or personal

I love tools that do one thing well. Hopefully this fits that category.

I am indebted to Gabor Cselle who, years ago, inspired me with an example of an in-memory persistent queue written in Java. I was intrigued by the simplicity of his approach, which became the foundation of the "segment" part of this queue which holds the head and the tail of the queue in memory as well as storing the segment files in between.

performance

There are two performance modes: safe and turbo

safe mode
  • safe mode is the default
  • forces an fsync to disk every time you enqueue or dequeue an item.
  • while this is the safest way to use dque with little risk of data loss, it is also the slowest.
turbo mode
  • can be enabled/disabled with a call to DQue.TurboOn() or DQue.TurboOff()
  • lets the OS batch up your changes to disk, which makes it a lot faster.
  • also allows you to flush changes to disk at opportune times. See DQue.TurboSync()
  • comes with a risk that a power failure could lose changes. By turning on Turbo mode you accept that risk.
  • run the benchmark to see the difference on your hardware.
  • there is a todo item to force flush changes to disk after a configurable amount of time to limit risk.
implementation
  • The queue is held in segments of a configurable size.
  • The queue is protected against re-opening from other processes.
  • Each in-memory segment corresponds with a file on disk. Think of the segment files as a bit like rolling log files. The oldest segment files are eventually deleted, not based on time, but whenever their items have all been dequeued.
  • Segment files are only appended to until they fill up. At which point a new segment is created. They are never modified (other than being appended to and deleted when each of their items has been dequeued).
  • If there is more than one segment, new items are enqueued to the last segment while dequeued items are taken from the first segment.
  • Because the encoding/gob package is used to store the struct to disk:
    • Only structs can be stored in the queue.
    • Only one type of struct can be stored in each queue.
    • Only public fields in a struct will be stored.
    • A function is required that returns a pointer to a new struct of the type stored in the queue. This function is used when loading segments into memory from disk. I'd love to find a way to avoid this function.
  • Queue segment implementation:
    • For nice visuals, see Gabor Cselle's documentation here. Note that Gabor's implementation kept the entire queue in memory as well as disk. dque keeps only the head and tail segments in memory.
    • Enqueueing an item adds it both to the end of the last segment file and to the in-memory item slice for that segment.
    • When a segment reaches its maximum size a new segment is created.
    • Dequeueing an item removes it from the beginning of the in-memory slice and appends a 4-byte "delete" marker to the end of the segment file. This allows the item to be left in the file until the number of delete markers matches the number of items, at which point the entire file is deleted.
    • When a segment is reconstituted from disk, each "delete" marker found in the file causes a removal of the first element of the in-memory slice.
    • When each item in the segment has been dequeued, the segment file is deleted and the next segment is loaded into memory.
example

See the full example code here

Or a shortened version here:

package dque_test

import (
    "log"

    "github.com/joncrlsn/dque"
)

// Item is what we'll be storing in the queue.  It can be any struct
// as long as the fields you want stored are public.
type Item struct {
    Name string
    Id   int
}

// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
    return &Item{}
}

func main() {
    ExampleDQue_main()
}

// ExampleQueue_main() show how the queue works
func ExampleDQue_main() {
    qName := "item-queue"
    qDir := "/tmp"
    segmentSize := 50

    // Create a new queue with segment size of 50
    q, err := dque.New(qName, qDir, segmentSize, ItemBuilder)
    ...

    // Add an item to the queue
    err := q.Enqueue(&Item{"Joe", 1})
    ...

    // Properly close a queue
    q.Close()

    // You can reconsitute the queue from disk at any time
    q, err = dque.Open(qName, qDir, segmentSize, ItemBuilder)
    ...

    // Peek at the next item in the queue
    var iface interface{}
    if iface, err = q.Peek(); err != nil {
        if err != dque.ErrEmpty {
            log.Fatal("Error peeking at item ", err)
        }
    }

    // Dequeue the next item in the queue
    if iface, err = q.Dequeue(); err != nil {
        if err != dque.ErrEmpty {
            log.Fatal("Error dequeuing item ", err)
        }
    }

    // Dequeue the next item in the queue and block until one is available
    if iface, err = q.DequeueBlock(); err != nil {
        log.Fatal("Error dequeuing item ", err)
    }

    // Assert type of the response to an Item pointer so we can work with it
    item, ok := iface.(*Item)
    if !ok {
        log.Fatal("Dequeued object is not an Item pointer")
    }

    doSomething(item)
}

func doSomething(item *Item) {
    log.Println("Dequeued", item)
}
contributors
todo? (feel free to submit pull requests)
  • add option to enable turbo with a timeout that would ensure you would never lose more than n seconds of changes.
  • add Lock() and Unlock() methods so you can peek at the first item and then conditionally dequeue it without worrying that another goroutine has grabbed it out from under you. The use case is when you don't want to actually remove it from the queue until you know you were able to successfully handle it.
  • store the segment size in a config file inside the queue. Then it only needs to be specified on dque.New(...)
alternative tools
  • CurlyQ is a bit heavier (requires Redis) but has more background processing features.

Documentation

Overview

Package dque is a fast embedded durable queue for Go

Index

Examples

Constants

This section is empty.

Variables

View Source
var (

	// ErrEmpty is returned when attempting to dequeue from an empty queue.
	ErrEmpty = errors.New("dque is empty")
)
View Source
var ErrQueueClosed = errors.New("queue is closed")

ErrQueueClosed is the error returned when a queue is closed.

Functions

This section is empty.

Types

type DQue

type DQue struct {
	Name    string
	DirPath string
	// contains filtered or unexported fields
}

DQue is the in-memory representation of a queue on disk. You must never have two *active* DQue instances pointing at the same path on disk. It is acceptable to reconstitute a new instance from disk, but make sure the old instance is never enqueued to (or dequeued from) again.

Example

ExampleDQue shows how the queue works

package main

//
// Example usage
// Run with: go test -v example_test.go
//

import (
	"fmt"
	"log"

	"github.com/VoIPGRID/dque"
)

// Item is what we'll be storing in the queue.  It can be any struct
// as long as the fields you want stored are public.
type Item struct {
	Name string
	Id   int
}

// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
	return &Item{}
}

// ExampleDQue shows how the queue works
func main() {
	qName := "item-queue"
	qDir := "/tmp"
	segmentSize := 50

	// Create a new queue with segment size of 50
	q, err := dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
	if err != nil {
		log.Fatal("Error creating new dque ", err)
	}

	// Add an item to the queue
	if err := q.Enqueue(&Item{"Joe", 1}); err != nil {
		log.Fatal("Error enqueueing item ", err)
	}
	log.Println("Size should be 1:", q.Size())

	// Properly close a queue
	q.Close()

	// You can reconsitute the queue from disk at any time
	q, err = dque.Open(qName, qDir, segmentSize, ItemBuilder)
	if err != nil {
		log.Fatal("Error opening existing dque ", err)
	}

	// Peek at the next item in the queue
	var iface interface{}
	if iface, err = q.Peek(); err != nil {
		if err != dque.ErrEmpty {
			log.Fatal("Error peeking at item", err)
		}
	}
	log.Println("Peeked at:", iface)

	// Dequeue the next item in the queue
	if iface, err = q.Dequeue(); err != nil && err != dque.ErrEmpty {
		log.Fatal("Error dequeuing item:", err)
	}
	log.Println("Dequeued an interface:", iface)
	log.Println("Size should be zero:", q.Size())

	go func() {
		err := q.Enqueue(&Item{"Joe", 1})
		log.Println("Enqueued from goroutine", err == nil)
	}()

	// Dequeue the next item in the queue and block until one is available
	if iface, err = q.DequeueBlock(); err != nil {
		log.Fatal("Error dequeuing item ", err)
	}

	// Assert type of the response to an Item pointer so we can work with it
	item, ok := iface.(*Item)
	if !ok {
		log.Fatal("Dequeued object is not an Item pointer")
	}

	doSomething(item)
}

func doSomething(item *Item) {
	fmt.Println("Dequeued:", item)
}
Output:

Dequeued: &{Joe 1}

func New

func New(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

New creates a new durable queue

func NewOrOpen

func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

NewOrOpen either creates a new queue or opens an existing durable queue.

func Open

func Open(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error)

Open opens an existing durable queue.

func (*DQue) Close

func (q *DQue) Close() error

Close releases the lock on the queue rendering it unusable for further usage by this instance. Close will return an error if it has already been called.

func (*DQue) Dequeue

func (q *DQue) Dequeue() (interface{}, error)

Dequeue removes and returns the first item in the queue. When the queue is empty, nil and dque.ErrEmpty are returned.

func (*DQue) DequeueBlock

func (q *DQue) DequeueBlock() (interface{}, error)

DequeueBlock behaves similar to Dequeue, but is a blocking call until an item is available.

func (*DQue) Enqueue

func (q *DQue) Enqueue(obj interface{}) error

Enqueue adds an item to the end of the queue

func (*DQue) Peek

func (q *DQue) Peek() (interface{}, error)

Peek returns the first item in the queue without dequeueing it. When the queue is empty, nil and dque.ErrEmpty are returned. Do not use this method with multiple dequeueing threads or you may regret it.

func (*DQue) PeekBlock

func (q *DQue) PeekBlock() (interface{}, error)

PeekBlock behaves similar to Peek, but is a blocking call until an item is available.

func (*DQue) SegmentNumbers

func (q *DQue) SegmentNumbers() (int, int)

SegmentNumbers returns the number of both the first last segmment. There is likely no use for this information other than testing.

func (*DQue) Size

func (q *DQue) Size() int

Size locks things up while calculating so you are guaranteed an accurate size... unless you have changed the itemsPerSegment value since the queue was last empty. Then it could be wildly inaccurate.

func (*DQue) SizeUnsafe

func (q *DQue) SizeUnsafe() int

SizeUnsafe returns the approximate number of items in the queue. Use Size() if having the exact size is important to your use-case.

The return value could be wildly inaccurate if the itemsPerSegment value has changed since the queue was last empty. Also, because this method is not synchronized, the size may change after entering this method.

func (*DQue) Turbo

func (q *DQue) Turbo() bool

Turbo returns true if the turbo flag is on. Having turbo on speeds things up significantly.

func (*DQue) TurboOff

func (q *DQue) TurboOff() error

TurboOff re-enables the "safety" mode that syncs every file change to disk as they happen. If turbo is already off an error is returned

func (*DQue) TurboOn

func (q *DQue) TurboOn() error

TurboOn allows the filesystem to decide when to sync file changes to disk. Throughput is greatly increased by turning turbo on, however there is some risk of losing data if a power-loss occurs. If turbo is already on an error is returned

func (*DQue) TurboSync

func (q *DQue) TurboSync() error

TurboSync allows you to fsync changes to disk, but only if turbo is on. If turbo is off an error is returned

type ErrCorruptedSegment

type ErrCorruptedSegment struct {
	Path string
	Err  error
}

ErrCorruptedSegment is returned when a segment file cannot be opened due to inconsistent formatting. Recovery may be possible by clearing or deleting the file, then reloading using dque.New().

func (ErrCorruptedSegment) Error

func (e ErrCorruptedSegment) Error() string

Error returns a string describing ErrCorruptedSegment

func (ErrCorruptedSegment) Unwrap

func (e ErrCorruptedSegment) Unwrap() error

Unwrap returns the wrapped error

type ErrUnableToDecode

type ErrUnableToDecode struct {
	Path string
	Err  error
}

ErrUnableToDecode is returned when an object cannot be decoded.

func (ErrUnableToDecode) Error

func (e ErrUnableToDecode) Error() string

Error returns a string describing ErrUnableToDecode error

func (ErrUnableToDecode) Unwrap

func (e ErrUnableToDecode) Unwrap() error

Unwrap returns the wrapped error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL