queue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: BSD-3-Clause Imports: 7 Imported by: 0

README

codecov GoDoc

This is a dead simple queuing system based on MongoDB. It is primarily build upon MongoDB's change streams, this provides the possibility to use an event based system, instead of using a polling approach.

MongoDB change-streams are available, if you have configured a replica-set, as a fallback this packages supports polling too.

The motivation was to build an easy-to-integrate queuing system without sophisticated features, without external dependencies, and with direct integration into your application.

Install

go get github.com/mbretter/go-mongodb-queue

import

import queue "github.com/mbretter/go-mongodb-queue"

Features

There are not that many, it supports retries until a maximum number of tries have been reached, and it has a default timeout for tasks, which is set to 5 minutes, if running the selfcare function.

Along the task, any arbitrary data can be stored.

Each task belongs to a topic, when publishing to a topic, the handler of this topic gets the first unprocessed task.

You can use either the event based Subscribe function, or the GetNext function which is needed for polling. It both cases it is totally safe to have multiple consumers running on the same topic.

ctx := context.TODO()
// connect to the mongo database using the mongo-driver
// mongodbUri contains the uri to your mongodb instance
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongodbUri))
if err != nil {
    log.Fatal(err)
}
defer client.Disconnect(ctx)

// get database and collection
collection := client.Database("mydb").Collection("queue")

// make the queue-db
queueDb := queue.NewStdDb(collection, ctx)

// make the queue 
qu := queue.NewQueue(queueDb)

Publish

You can publish to any topic, the topic acts like a filter for your tasks, the payload can be any arbitrary data.

type Payload struct {
    Name string `bson:"name"`
    Desc string `bson:"desc"`
    Num  int    `bson:"num"`
}

payload := Payload{
    Name: "Arnold Schwarzenegger",
    Desc: "I'll be back",
    Num:  73,
}

task, err := qu.Publish("some.topic", &payload)
if err != nil {
    log.Fatal(err)
}

Subscribe

Any handler/application can subscribe to a certain topic, the callback function receives a copy of the task.

After processing the task you have to Ack it, or mark it as Err.

Here is a small snippet which demonstrates the usage of subscribe using goroutines.

// define your worker function
workerFunc := func(qu *queue.Queue, task queue.Task) {
    fmt.Println("worker", task)
	// after processing the task you have to acknowledge it
    _ = qu.Ack(task.Id.Hex())
}

var wg sync.WaitGroup
// subscribe and pass the worker function
err := qu.Subscribe("some.topic", func(t queue.Task) {
    wg.Add(1)
    go func() {
        defer wg.Done()
        workerFunc(qu, t)
    }()
})

if err != nil {
    log.Fatal(err)
}

wg.Wait()

On startup, the Subscribe function checks for unprocessed tasks scheduled before we subscribed, because existing tasks will not be covered by the MongoDB change-stream.

Ack/Err

After processing a task you have to acknowledge, that you have processed the task by using Ack. In case of an error you can use the Err function to mark the task as failed.

err := qu.Ack(task.Id.Hex())
if err != nil {
    log.Fatal(err)
}

qu.Err(task.Id.Hex(), errors.New("something went wrong"))

Polling

You have to loop over GetNext, GetNext returns a nil task, if no unprocessed task was found or the topic. It is safe to use GetNext for the same topic from different processes, there will be no race conditions, because MongoDB's atomic FindOneAndUpdate operation is used.

for {
    task, err := qu.GetNext("some.topic")
    if err != nil {
        log.Fatal(err)
    }
    
    if task == nil {
        time.Sleep(time.Millisecond * 100)
    } else {
        // process the task
        _ = qu.Ack(task.Id.Hex())
    }
}

Reschedule

If a task had an error, and you want to process this task again, you can use Reschedule.

newTask, err := Reschedule(task)

When rescheduling, the original task remains untouched, there will create a new task with the same payload and the initial tries value will be increased.

Selfcare

The selfcare function re-schedules long-running tasks, this might happen, if the application could not acknowledge the task, and it sets the task to the error state, if the maximum number of tries have been exceeded.

The selfcare function might be run per topic, if no topic was given, the selfcare runs over all topics. As second argument, the timeout for long-running tasks can be given, if no timeout was given it defaults to 5 minutes.

CreateIndexes

On first use, you have to call this function, or set the indexes manually on the queue collection. There will be created two indexes, one on topic and state, the other one is a TTL-index, which removes completed tasks after one hour.

mongodb docker-compose

This is a config sample for bringing up a MongoDB-replicaset for testing, you have to use fixed ip-addresses for your containers, otherwise you will get connection problems, because they are sent back to the client, when connecting.

services:
  mongo1:
    hostname: mongo1
    image: mongo:8
    ports:
      - 27027:27017
    restart: always
    volumes:
      - /data/mongodb-rs1:/data/db
    command: mongod --replSet rs0
    networks:
      customnetwork:
        ipv4_address: 172.20.0.3
  mongo2:
    hostname: mongo2
    image: mongo:8
    ports:
      - 27028:27017
    restart: always
    volumes:
      - /data/mongodb-rs2:/data/db
    command: mongod --replSet rs0
    networks:
      customnetwork:
        ipv4_address: 172.20.0.4
  mongo3:
    hostname: mongo3
    image: mongo:8
    ports:
      - 27029:27017
    restart: always
    volumes:
      - /data/mongodb-rs3:/data/db
    command: mongod --replSet rs0
    networks:
      customnetwork:
        ipv4_address: 172.20.0.5

networks:
  customnetwork:
    ipam:
      config:
        - subnet: 172.20.0.0/16

After bringing up the containers the first time, you have to initialize the replicaset:

docker compose exec mongo1 mongosh
> rs.initiate({ _id: "rs0", members: [ { _id: 0, host: "172.20.0.3:27017" }, { _id: 1, host: "172.20.0.4:27017" }, { _id: 2, host: "172.20.0.5:27017" }] })

Documentation

Index

Constants

View Source
const (
	StatePending   = "pending"
	StateRunning   = "running"
	StateCompleted = "completed"
	StateError     = "error"
)
View Source
const (
	DefaultTimeout  = time.Minute * 5
	DefaultMaxTries = 3
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Callback

type Callback func(t Task)

type ChangeStreamInterface

type ChangeStreamInterface interface {
	Next(ctx context.Context) bool
	Decode(v interface{}) error
	Close(ctx context.Context) error
}

type CollectionInterface

type CollectionInterface interface {
	InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
	FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
	UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	UpdateMany(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
	Indexes() mongo.IndexView
}

type DbInterface

type DbInterface interface {
	InsertOne(document interface{}) (primitive.ObjectID, error)
	FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
	UpdateOne(filter interface{}, update interface{}) error
	UpdateMany(filter interface{}, update interface{}) error
	Watch(pipeline interface{}) (ChangeStreamInterface, error)
	CreateIndexes(index []mongo.IndexModel) error
	Context() context.Context
}

type Meta

type Meta struct {
	Created    time.Time  `bson:"created"`
	Dispatched *time.Time `bson:"dispatched"`
	Completed  *time.Time `bson:"completed"`
}

type PublishOptions

type PublishOptions struct {
	MaxTries uint
	Tries    int
}

func NewPublishOptions

func NewPublishOptions() *PublishOptions

NewPublishOptions creates a new PublishOptions with default settings.

func (*PublishOptions) SetMaxTries

func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions

SetMaxTries sets the maximum number of retry attempts for publishing. Returns the updated PublishOptions instance.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(db DbInterface) *Queue

NewQueue initializes a new Queue instance with the provided DbInterface.

func (*Queue) Ack

func (q *Queue) Ack(id string) error

Ack acknowledges a task completion by its ID, updating its state to "completed" and setting the completion timestamp.

func (*Queue) CreateIndexes

func (q *Queue) CreateIndexes() error

CreateIndexes creates MongoDB indexes for the task collection to improve query performance and manage TTL for completed tasks.

func (*Queue) Err

func (q *Queue) Err(id string, err error) error

Err updates the state of a task to "error" by its ID, setting the completion time and storing the error message.

func (*Queue) GetNext

func (q *Queue) GetNext(topic string) (*Task, error)

GetNext retrieves the next item from the queue for the given topic, marks it as running, and increments its tries count.

func (*Queue) GetNextById

func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error)

GetNextById retrieves the next pending task by its ID, transitions it to the running state, and increments its tries count.

func (*Queue) Publish

func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Task, error)

Publish inserts a new task into the queue with the given topic, payload, and maxTries. If maxTries is zero, it defaults to DefaultMaxTries.

func (*Queue) Reschedule

func (q *Queue) Reschedule(task *Task) (*Task, error)

Reschedule republishes a task to the queue, retaining its topic, payload, tries, and maxTries settings.

func (*Queue) Selfcare

func (q *Queue) Selfcare(topic string, timeout time.Duration) error

Selfcare re-schedules long-running tasks and sets tasks exceeding max tries to error state. It updates tasks in an ongoing state that haven't been acknowledged within a specific timeout period. If timeout is zero, the default timeout value is used. Optionally, tasks can be filtered by topic.

func (*Queue) Subscribe

func (q *Queue) Subscribe(topic string, cb Callback) error

Subscribe listens for new tasks on a given topic and calls the provided callback when a new task is available. It processes unprocessed tasks scheduled before starting the watch and continuously monitors for new tasks.

type StdDb

type StdDb struct {
	// contains filtered or unexported fields
}

func NewStdDb

func NewStdDb(collection CollectionInterface, ctx context.Context) *StdDb

func (*StdDb) Context

func (d *StdDb) Context() context.Context

func (*StdDb) CreateIndexes

func (d *StdDb) CreateIndexes(indexes []mongo.IndexModel) error

func (*StdDb) FindOneAndUpdate

func (d *StdDb) FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult

func (*StdDb) InsertOne

func (d *StdDb) InsertOne(document interface{}) (primitive.ObjectID, error)

func (*StdDb) UpdateMany

func (d *StdDb) UpdateMany(filter interface{}, update interface{}) error

func (*StdDb) UpdateOne

func (d *StdDb) UpdateOne(filter interface{}, update interface{}) error

func (*StdDb) Watch

func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error)

type Task

type Task struct {
	Id       primitive.ObjectID `bson:"_id,omitempty"`
	Topic    string             `bson:"topic"`
	Payload  any                `bson:"payload"`
	Tries    uint               `bson:"tries"`
	MaxTries uint               `bson:"maxtries"`
	State    string             `bson:"state"`
	Message  string             `bson:"message"`
	Meta     Meta
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL