gopq

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: MIT Imports: 8 Imported by: 0

README

gopq: Go Persistent-Queue

A lightweight, sqlite-backed persistent queue implementation in Go.

Go GoDoc

gopq is a lightweight, persistent queue implementation in Go, using SQLite as the underlying storage mechanism. It provides various queue types to suit different use cases, including simple queues, acknowledged queues, and unique item queues.

Features

  • Persistent storage using SQLite
  • Unique/Non-Unique Queues
  • Acknowledged/Non-Acknowledged Queues
  • Blocking and non-blocking dequeue operations
  • Context support for cancellation and timeouts
  • Thread-safe operations
  • Easy and Composable Dead Letter Queues

Installation

To use gopq in your Go project, run:

go get github.com/mattdeak/gopq

Make sure you have SQLite installed on your system.

Getting Started

Here's a minimal example to get you started with gopq:

package main

import (
    "fmt"
    "github.com/mattdeak/gopq"
)

func main() {
    // Create a new simple queue
    queue, err := gopq.NewSimpleQueue("myqueue.db")
    if err != nil {
        panic(err)
    }
    defer queue.Close()

    // Enqueue an item
    err = queue.Enqueue([]byte("Hello, gopq!"))
    if err != nil {
        panic(err)
    }

    // Dequeue an item
    msg, err := queue.Dequeue()
    if err != nil {
        panic(err)
    }

    fmt.Println(string(msg.Item)) // Output: Hello, gopq!
}

Usage

Here are some examples of how to use different queue types:

Basic Queue

A simple FIFO queue.

import "github.com/mattdeak/gopq"

// Create a new simple queue
queue, err := gopq.NewSimpleQueue("queue.db")
if err != nil {
    // Handle error
}

// Enqueue an item
err = queue.Enqueue([]byte("Hello, World!"))
if err != nil {
    // Handle error
}

// Dequeue an item (blocks until a message is available)
msg, err := queue.Dequeue()
if err != nil {
    // Handle error
}

fmt.Println(string(msg.Item))
Acknowledged Queue

When an item is dequeued from an acknowledged queue, an ack deadline is set internally. If the ack deadline passes without an ack being received, the item becomes available at the front of the queue.

import "github.com/mattdeak/gopq"
// Create a new acknowledged queue with custom options
queue, err := gopq.NewAckQueue("ack_queue.db", gopq.AckOpts{
    AckTimeout: 30 * time.Second,
    MaxRetries: 3,
    RetryBackoff: 5 * time.Second,
})
if err != nil {
// Handle error
}
// Enqueue an item
err = queue.Enqueue([]byte("Process me"))
if err != nil {
// Handle error
}
// Dequeue an item
msg, err := queue.Dequeue()
if err != nil {
// Handle error
}
// Process the item...
// Acknowledge the item if processing was successful
err = queue.Ack(msg.ID)
if err != nil {
// Handle error
}
// Or, if processing failed, negative acknowledge the item
err = queue.Nack(msg.ID)
if err != nil {
// Handle error
}
Unique Queue

A unique queue ensures that every item in the queue is unique. Once an item is successfully dequeued, it can be enqueued again immediately.

import "github.com/mattdeak/gopq"

// Create a new unique queue
queue, err := gopq.NewUniqueQueue("unique_queue.db")
if err != nil {
    // Handle error
}

// Enqueue items (duplicates will be ignored)
queue.Enqueue([]byte("unique_item_1"))
queue.Enqueue([]byte("unique_item_1")) // This will be ignored
queue.Enqueue([]byte("unique_item_2"))

// Dequeue items
msg1, _ := queue.Dequeue()
msg2, _ := queue.Dequeue()
msg3, err := queue.TryDequeue() // This will return an error (queue is empty)

Queue Methods

All queue types (SimpleQueue, AckQueue, UniqueQueue, UniqueAckQueue) provide the following methods for enqueueing and dequeueing:

Enqueue Methods
  • Enqueue(item []byte) error: Adds an item to the queue. Blocks if necessary.
  • EnqueueCtx(ctx context.Context, item []byte) error: Adds an item to the queue with context support.
  • TryEnqueue(item []byte) error: Attempts to add an item to the queue immediately, non-blocking.
  • TryEnqueueCtx(ctx context.Context, item []byte) error: Attempts to add an item to the queue immediately with context support.
Dequeue Methods
  • Dequeue() (Msg, error): Removes and returns an item from the queue. Blocks if the queue is empty.
  • DequeueCtx(ctx context.Context) (Msg, error): Removes and returns an item with context support.
  • TryDequeue() (Msg, error): Attempts to remove and return an item immediately, non-blocking.
  • TryDequeueCtx(ctx context.Context) (Msg, error): Attempts to remove and return an item immediately with context support.

All methods return a Msg struct containing the item and its ID, along with an error if the operation fails. For AckableQueue, the dequeue methods also update the acknowledgement deadline for the dequeued item.

Method Patterns
  • Methods without Ctx use a background context internally.
  • Try methods are non-blocking and return immediately.
  • Methods with Ctx allow for cancellation and timeouts using the provided context.
Additional Methods for AckableQueue
  • Ack(id int64) error: Acknowledges successful processing of an item.
  • Nack(id int64) error: Indicates failed processing, potentially requeueing the item.

Queue Types

  1. SimpleQueue: Basic FIFO queue with no additional features. Ideal for simple task queues or message passing where order matters but acknowledgment isn't necessary.
  2. AckQueue: Queue with acknowledgment support. Items must be acknowledged after processing, or they will be requeued after a timeout. Suitable for ensuring task completion in distributed systems or when processing reliability is crucial.
  3. UniqueQueue: Queue that only allows unique items. Duplicate enqueue attempts are silently ignored. Useful for de-duplication scenarios or when you need to ensure only one instance of a task is queued.
  4. UniqueAckQueue: Combination of UniqueQueue and AckQueue. Ensures unique items with acknowledgment support. Ideal for scenarios requiring both de-duplication and reliable processing.

Advanced Features

In-Memory Queue

If you don't require persistence, you can use an in-memory queue by calling the constructor with an empty string.

q := gopq.NewSimpleQueue("") // Now uses an in-memory database

This can be useful for testing.

Dead Letter Queues and Failure Callbacks

GoPQ now supports dead letter queues through a more flexible callback system. Instead of directly specifying a dead letter queue, you can register failure callbacks that are called when a message fails to acknowledge after all retries have been exhausted.

To set up a dead letter queue:

  1. Create an AcknowledgeableQueue for your main queue.
  2. Create another queue to serve as your dead letter queue.
  3. Use the RegisterDeadLetterQueue method to set up the dead letter functionality.

Example:

mainQueue := NewAckQueue(...)
deadLetterQueue := NewSimpleQueue(...)
mainQueue.RegisterDeadLetterQueue(deadLetterQueue)

Dead letter queues can be any queue type or anything supporting the Enqueuer interface. You can use multiple queues with different settings, and even chain dead letter queues if needed. This method is a convenient shorthand for registering a failure callback that enqueues failed messages to the specified dead letter queue.

Custom Failure Callbacks

For more complex scenarios, you can register custom failure callbacks:


mainQueue.RegisterOnFailureCallback(func(msg Msg) error {
// Custom logic for handling failed messages
log.Printf("Message failed: %v", msg)
    return nil
})

See the examples directory for more.

Configurable Retry Mechanism

AckQueue and UniqueAckQueue support configurable retry mechanisms:

queue, := gopq.NewAckQueue("queue.db", gopq.AckOpts{
    AckTimeout: 1 * time.Minute, // Any message that takes longer than 1 minute to ack will be requeued.
    MaxRetries: 5 // 0 for no retries, -1 for infinite retries
    RetryBackoff: 10 * time.Second, // Sets a new ack deadline to be the max of (current deadline, now + retry backoff)
})

Examples

There are several, more detailed examples demonstrating various features of gopq. These examples are located in the examples directory at the root of the project. To run an example, navigate to its directory and use go run main.go. For instance:

Available examples:

  • Simple Queue Usage: examples/simple_queue_example
  • Acknowledged Queue: examples/ack_queue_example
  • Unique Queue: examples/unique_queue_example
  • Dead Letter Queue: examples/dead_letter_queue_example
  • Multiple Tiered Dead Letter Queues: examples/tiered_dlq_example
  • Concurrent Queue Usage: examples/concurrent_queue_example

Each example demonstrates different features and use cases of gopq. We encourage you to explore these examples to better understand how to use the library in your projects.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change. Ensure to update tests as appropriate.

License

This project is licensed under the MIT License.

Future Work
  • More Queue Configurability (LIFO, Priority Queues, etc.)
  • Batch Enqueue/Dequeue operations
  • Various Efficiency Improvements

Documentation

Index

Constants

View Source
const (
	InfiniteRetries = -1
)

AckOpts represents the queue-level settings for how acknowledgement of messages is handled.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckOpts

type AckOpts struct {
	AckTimeout   time.Duration
	MaxRetries   int
	RetryBackoff time.Duration

	// Has a default behaviour of dropping the message
	BehaviourOnFailure func(msg Msg) error
	FailureCallbacks   []func(msg Msg) error
}

type AckableQueue

type AckableQueue interface {
	Queuer

	// Ack acknowledges that an item has been successfully processed.
	// It takes the ID of the message to acknowledge.
	// Returns an error if the operation fails or the message doesn't exist.
	Ack(id int64) error

	// Nack indicates that an item processing has failed and should be requeued.
	// It takes the ID of the message to negative acknowledge.
	// Returns an error if the operation fails or the message doesn't exist.
	Nack(id int64) error
}

AckableQueue extends the DQueue interface with acknowledgement capabilities. It allows for explicit acknowledgement or negative acknowledgement of processed items.

type AcknowledgeableQueue

type AcknowledgeableQueue struct {
	Queue
	AckOpts
	// contains filtered or unexported fields
}

func NewAckQueue

func NewAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)

NewAckQueue creates a new ack queue. If filePath is empty, the queue will be created in memory.

func NewUniqueAckQueue

func NewUniqueAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)

NewUniqueAckQueue creates a new unique ack queue.

func (*AcknowledgeableQueue) Ack

func (q *AcknowledgeableQueue) Ack(id int64) error

Ack acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. It uses a background context internally.

func (*AcknowledgeableQueue) AckCtx added in v0.2.1

func (q *AcknowledgeableQueue) AckCtx(ctx context.Context, id int64) error

AckCtx acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. If the db is locked, this will block until the db is unlocked.

func (*AcknowledgeableQueue) Dequeue

func (q *AcknowledgeableQueue) Dequeue() (Msg, error)

Dequeue removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available. It uses a background context internally.

func (*AcknowledgeableQueue) DequeueCtx

func (q *AcknowledgeableQueue) DequeueCtx(ctx context.Context) (Msg, error)

DequeueCtx removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available or the context is cancelled.

func (*AcknowledgeableQueue) ExpireAck

func (q *AcknowledgeableQueue) ExpireAck(id int64) error

ExpireAck expires the acknowledgement deadline for an item, which requeues it to the front of the queue. It takes the ID of the message to expire the acknowledgement deadline for. Returns an error if the operation fails or the message doesn't exist.

func (*AcknowledgeableQueue) Len

func (q *AcknowledgeableQueue) Len() (int, error)

Len returns the number of items in the queue. It returns the count and any error encountered during the operation.

func (*AcknowledgeableQueue) Nack

func (q *AcknowledgeableQueue) Nack(id int64) error

Nack indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. Returns an error if the operation fails or the message doesn't exist. It uses a background context internally.

func (*AcknowledgeableQueue) NackCtx added in v0.2.1

func (q *AcknowledgeableQueue) NackCtx(ctx context.Context, id int64) error

NackCtx indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge and returns an error if the operation fails. If the db is locked, this will block until the db is unlocked.

func (*AcknowledgeableQueue) RegisterBehaviourOnFailure

func (q *AcknowledgeableQueue) RegisterBehaviourOnFailure(fn func(msg Msg) error)

SetBehaviourOnFailure sets the behaviour on failure for the queue. This occurs if a message receives more Nacks than the max retries. It takes a function that takes a message and returns an error. You can manually requeue it, put it in a different queue, or do whatever else. The default behaviour is to drop the message.

func (*AcknowledgeableQueue) RegisterDeadLetterQueue

func (q *AcknowledgeableQueue) RegisterDeadLetterQueue(dlq Enqueuer)

RegisterDeadLetterQueue sets the dead letter queue for this AcknowledgeableQueue. This is shorthand for RegisterFailureCallback -> dlq.Enqueue.

func (*AcknowledgeableQueue) RegisterOnFailureCallback

func (q *AcknowledgeableQueue) RegisterOnFailureCallback(fn func(msg Msg) error)

RegisterOnFailureCallback adds a callback to the queue that is called when a message fails to acknowledge.

func (*AcknowledgeableQueue) TryAck added in v0.2.1

func (q *AcknowledgeableQueue) TryAck(id int64) error

Ack acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails.

func (*AcknowledgeableQueue) TryAckCtx added in v0.2.1

func (q *AcknowledgeableQueue) TryAckCtx(ctx context.Context, id int64) error

TryAckCtx acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. This is non-blocking, and will return immediately.

func (*AcknowledgeableQueue) TryDequeue

func (q *AcknowledgeableQueue) TryDequeue() (Msg, error)

TryDequeue attempts to remove and return the next item from the queue. It returns immediately, even if the queue is empty. It uses a background context internally.

func (*AcknowledgeableQueue) TryDequeueCtx

func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to remove and return the next item from the queue. It returns immediately if an item is available, or waits until the context is cancelled.

func (*AcknowledgeableQueue) TryNack added in v0.2.1

func (q *AcknowledgeableQueue) TryNack(id int64) error

TryNack indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. This is non-blocking, and will return immediately.

func (*AcknowledgeableQueue) TryNackCtx added in v0.2.1

func (q *AcknowledgeableQueue) TryNackCtx(ctx context.Context, id int64) error

TryNackCtx indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. This is non-blocking, and will return immediately.

type Dequeuer

type Dequeuer interface {
	// Dequeue removes and returns the next item from the queue.
	// It blocks if the queue is empty until an item becomes available.
	// Returns an error if the operation fails.
	Dequeue() (Msg, error)
	// DequeueCtx removes and returns the next item from the queue.
	// It blocks if the queue is empty until an item becomes available or the context is cancelled.
	// Returns an error if the operation fails or the context is cancelled.
	DequeueCtx(ctx context.Context) (Msg, error)
	// TryDequeue attempts to remove and return the next item from the queue.
	// It returns immediately, even if the queue is empty.
	// Returns an error if the operation fails or the queue is empty.
	TryDequeue() (Msg, error)
	// TryDequeueCtx attempts to remove and return the next item from the queue.
	// It returns immediately if an item is available, or waits until the context is cancelled.
	// Returns an error if the operation fails, the queue is empty, or the context is cancelled.
	TryDequeueCtx(ctx context.Context) (Msg, error)
}

Dequeuer provides methods for dequeueing items from the queue.

type Enqueuer

type Enqueuer interface {
	// Enqueue adds an item to the queue.
	// It returns an error if the operation fails.
	Enqueue(item []byte) error
	EnqueueCtx(ctx context.Context, item []byte) error
	TryEnqueue(item []byte) error
	TryEnqueueCtx(ctx context.Context, item []byte) error
}

Enqueuer provides methods for enqueueing items to the queue.

type ErrDBLocked

type ErrDBLocked struct{}

func (*ErrDBLocked) Error

func (e *ErrDBLocked) Error() string

type ErrNoItemsWaiting

type ErrNoItemsWaiting struct{}

func (*ErrNoItemsWaiting) Error

func (e *ErrNoItemsWaiting) Error() string

type Msg

type Msg struct {
	// ID is a unique identifier for the message within the queue.
	ID int64

	// Item contains the actual message data.
	Item []byte
}

Msg represents a message in the queue. It contains the message ID and the actual data.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents the basic queue structure. It contains the database connection, queue name, and other necessary fields for queue operations.

func NewSimpleQueue

func NewSimpleQueue(filePath string) (*Queue, error)

NewSimpleQueue creates a new simple queue. If filePath is empty, the queue will be created in memory.

func NewUniqueQueue

func NewUniqueQueue(filePath string) (*Queue, error)

NewUniqueQueue creates a new unique queue.

func (*Queue) Close

func (q *Queue) Close() error

Close closes the database connection associated with the queue. It should be called when the queue is no longer needed to free up resources.

func (*Queue) Dequeue

func (q *Queue) Dequeue() (Msg, error)

Dequeue blocks until an item is available. Uses background context.

func (*Queue) DequeueCtx

func (q *Queue) DequeueCtx(ctx context.Context) (Msg, error)

Dequeue blocks until an item is available or the context is canceled. If the context is canceled, it returns an empty Msg and an error.

func (*Queue) Enqueue

func (q *Queue) Enqueue(item []byte) error

Enqueue adds an item to the queue. It returns an error if the operation fails.

func (*Queue) EnqueueCtx

func (q *Queue) EnqueueCtx(ctx context.Context, item []byte) error

EnqueueCtx adds an item to the queue. It returns an error if the operation fails or the context is cancelled.

func (*Queue) Len

func (q *Queue) Len() (int, error)

Len returns the number of items in the queue. It returns the count and any error encountered during the operation.

func (*Queue) TryDequeue

func (q *Queue) TryDequeue() (Msg, error)

TryDequeue attempts to remove and return the next item from the queue. It returns immediately, even if the queue is empty. It uses a background context internally.

func (*Queue) TryDequeueCtx

func (q *Queue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to remove and return the next item from the queue. This is non-blocking, and will return immediately.

func (*Queue) TryEnqueue

func (q *Queue) TryEnqueue(item []byte) error

TryEnqueue attempts to add an item to the queue. It returns immediately, even if the queue is empty. It uses a background context internally.

func (*Queue) TryEnqueueCtx

func (q *Queue) TryEnqueueCtx(ctx context.Context, item []byte) error

TryEnqueueCtx attempts to add an item to the queue. This is non-blocking, and will return immediately.

type Queuer

type Queuer interface {
	Enqueuer
	Dequeuer
	Close() error
}

Queue represents a durable queue interface. It provides methods for enqueueing and dequeueing items, with both blocking and non-blocking operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL