database

package module
v0.0.0-...-192d75f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: MIT Imports: 13 Imported by: 0

README

Overview

Go Reference Go codecov Go Report Card Mit License

Package database is a database backed implementation of the queue interface

Installation

go get -u -v github.com/gopi-frame/queue/driver/database

Import

import "github.com/gopi-frame/queue/driver/database"

Usage

package main

import (
    "fmt"
    "github.com/gopi-frame/queue/database"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
    "time"
)

type CustomJob struct {
    queue.Job `json:"-"`
}

func (c *CustomJob) Handle() error {
    // Do something
}

func (c *CustomJob) Failed(err error) {
    // Handle failed job
}

func main() {
    db, err := gorm.Open(sqlite.Open("queue.db"))
    if err != nil {
        panic(err)
    }
    q := database.NewQueue(&database.Config{
        DB:   db,
        Name: "queue",
        Job:  new(CustomJob),
    })
    q.Enqueue(new(CustomJob))
    q.Enqueue(new(CustomJob))
    q.Enqueue(new(CustomJob))
    fmt.Println("count:", q.Count()) // Output: count: 3
    for {
        if job, ok := q.Dequeue(); ok {
            if err := job.Handle(); err != nil {
                if job.GetQueueable().GetAttempts() < job.GetMaxAttempts() {
                    q.Release(job)
                } else {
                    job.Failed(err)
                }
            } else {
                q.Ack(job)
            }
        } else {
            time.Sleep(time.Millisecond * 100)
        }
    }
}    

Documentation

Overview

Package database implements a queue driver that uses a database to store the queue.

Index

Constants

View Source
const (
	ColumnID          = "id"
	ColumnQueue       = "queue"
	ColumnPayload     = "payload"
	ColumnReservedAt  = "reserved_at"
	ColumnAvailableAt = "available_at"
	ColumnCreatedAt   = "created_at"
	ColumnUpdatedAt   = "updated_at"
)

column names for Job Table

View Source
const DefaultJobTable = "jobs"

DefaultJobTable is the default Table name for jobs.

Variables

This section is empty.

Functions

func Open

func Open(options map[string]any) (queuecontract.Queue, error)

Open is a convenience function that calls Driver.Open.

Types

type Config

type Config struct {
	DB    *gorm.DB
	Name  string
	Table string
	Job   queue.Job
}

Config is the configuration for database queue.

func NewConfig

func NewConfig(db *gorm.DB, name string, job queue.Job) *Config

NewConfig returns a new config.

func (*Config) Apply

func (c *Config) Apply(opts ...Option) error

Apply applies options to config.

func (*Config) Valid

func (c *Config) Valid() error

Valid validates the config. It returns an exception if the config is invalid.

type Driver

type Driver struct{}

Driver is the database queue driver

func (Driver) Open

func (d Driver) Open(options map[string]any) (queuecontract.Queue, error)

Open opens the database queue

Options:

  • db: *gorm.DB (required)
  • name: string (required)
  • table: string (default: jobs)
  • job: queue.Job (required)

type Job

type Job struct {
	ID          uuid.UUID           `gorm:"column:id;primaryKey" json:"id"`
	Queue       string              `gorm:"column:queue" json:"queue"`
	Payload     queue.Job           `gorm:"column:payload;serializer:json" json:"payload"`
	Attempts    int                 `gorm:"column:attempts" json:"attempts"`
	ReservedAt  sql.Null[time.Time] `gorm:"column:reserved_at" json:"reserved_at"`
	AvailableAt time.Time           `gorm:"column:available_at" json:"available_at"`
	CreatedAt   time.Time           `gorm:"column:created_at" json:"created_at"`
	UpdatedAt   time.Time           `gorm:"column:updated_at" json:"updated_at"`
}

Job is a database queue job, it is a wrapper around the [queue.Job]

func NewJob

func NewJob(queue string, payload queue.Job) *Job

NewJob creates a new database queue job

func (*Job) GetAttempts

func (d *Job) GetAttempts() int

GetAttempts returns how many times the job has been attempted

func (*Job) GetID

func (d *Job) GetID() string

GetID returns the job ID

func (*Job) GetPayload

func (d *Job) GetPayload() queue.Job

GetPayload returns the job payload

func (*Job) GetQueue

func (d *Job) GetQueue() string

GetQueue returns the job queue

type Option

type Option func(cfg *Config) error

Option is the option for database queue.

func WithDB

func WithDB(db *gorm.DB) Option

WithDB sets the database.

func WithJob

func WithJob(job queue.Job) Option

WithJob sets the job type.

func WithName

func WithName(name string) Option

WithName sets the queue name.

func WithTable

func WithTable(table string) Option

WithTable sets the Table name.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is database backed queue implementation

func NewQueue

func NewQueue(cfg *Config, opts ...Option) *Queue

NewQueue returns a new queue.

func (*Queue) Ack

func (q *Queue) Ack(job queue.Job)

Ack acknowledges a job.

func (*Queue) Count

func (q *Queue) Count() int64

Count returns the number of jobs in the queue.

func (*Queue) Dequeue

func (q *Queue) Dequeue() (queue.Job, bool)

Dequeue removes a job from the queue and returns it.

func (*Queue) Empty

func (q *Queue) Empty() bool

Empty returns true if queue is empty.

func (*Queue) Enqueue

func (q *Queue) Enqueue(job queue.Job) (queue.Job, bool)

Enqueue adds a job to the queue.

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue name.

func (*Queue) Release

func (q *Queue) Release(job queue.Job)

Release releases a job back to the queue.

func (*Queue) Remove

func (q *Queue) Remove(job queue.Job)

Remove removes a job from the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL