queuelite

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2024 License: MIT Imports: 6 Imported by: 0

README

QueueLite

QueueLite is a simple and persistent queue backed by SQLite. Wrote following the guide made by Silvain Kerkour.

Getting started

Install
go get github.com/JorgeLNJunior/queuelite
Enqueue
import "github.com/JorgeLNJunior/queuelite"

queue, err := queuelite.NewSQLiteQueue("queue.db")
if err != nil {
	return err
}
defer queue.Close()

job := queuelite.NewJob([]byte("{ \"key\": \"value\" }"))

if err = queue.Enqueue(context.Background(), job); err != nil {
	return err
}
Dequeue
job, err := queue.Dequeue(context.Background())
if err != nil {
	return err
}
Complete
if err := queue.Complete(context.Background(), job); err != nil {
  return err
}
Retry
if err := queue.Retry(context.Background(), job); err != nil {
  return err
}
Fail
if err := queue.Fail(context.Background(), job, "reason"); err != nil {
  return err
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var JobNotFoundErr = errors.New("job not found in the queue")

Functions

This section is empty.

Types

type Job

type Job struct {
	ID            int64          `json:"id"`
	State         JobState       `json:"state"`
	Data          []byte         `json:"data"`
	AddedAt       int64          `json:"added_at"`
	RetryCount    int            `json:"retry_count"`
	FailureReason sql.NullString `json:"failure_reason"`
}

Job represents a job in the queue.

func NewJob

func NewJob(data []byte) Job

NewJob returns a Job instance.

type JobCount

type JobCount struct {
	Pending   int `json:"pending"`
	Running   int `json:"running"`
	Retry     int `json:"retry"`
	Failed    int `json:"failed"`
	Completed int `json:"completed"`
	Total     int `json:"total"`
}

type JobState

type JobState string

JobState represents the state of a job in the queue.

const (
	JobStatePending   JobState = "pending"
	JobStateRunning   JobState = "running"
	JobStateRetry     JobState = "retry"
	JobStateFailed    JobState = "failed"
	JobStateCompleted JobState = "completed"
)

type ListOption added in v0.2.0

type ListOption interface {
	// contains filtered or unexported methods
}

func WithLimit added in v0.2.0

func WithLimit(limit int) ListOption

type SQLiteQueue

type SQLiteQueue struct {
	// contains filtered or unexported fields
}

func NewSQLiteQueue

func NewSQLiteQueue(db string) (*SQLiteQueue, error)

NewSQLiteQueue return an instance of SQLiteQueue.

func (*SQLiteQueue) BatchEnqueue

func (q *SQLiteQueue) BatchEnqueue(ctx context.Context, jobs []Job) error

BatchEnqueue adds a list of jobs to the queue at once. If inserting a task fails, the previous ones are rolled back and the error is returned.

func (*SQLiteQueue) Close

func (q *SQLiteQueue) Close() error

Close closes the queue and it's underhood database.

func (*SQLiteQueue) Complete

func (q *SQLiteQueue) Complete(ctx context.Context, jobID int64) error

Complete sets a Job in the queue with JobStateCompleted state. If the job is not in the queue returns JobNotFoundErr.

func (*SQLiteQueue) Count

func (q *SQLiteQueue) Count(ctx context.Context) (*JobCount, error)

Count returns how many jobs are in the queue.

func (*SQLiteQueue) Dequeue

func (q *SQLiteQueue) Dequeue(ctx context.Context) (*Job, error)

Dequeue returns the oldest job in the queue and set it's state to JobStateRunning.

func (*SQLiteQueue) Enqueue

func (q *SQLiteQueue) Enqueue(ctx context.Context, job Job) error

Enqueue adds a new job to the queue with JobStatePending state.

func (*SQLiteQueue) Fail

func (q *SQLiteQueue) Fail(ctx context.Context, jobID int64, reason string) error

Fail sets a job in JobStateFailed state. Returns JobNotFoundErr if the job is not in the queue.

func (*SQLiteQueue) GetJob added in v0.3.0

func (q *SQLiteQueue) GetJob(ctx context.Context, id int64) (*Job, error)

GetJob returns a job by its id. If the job is not in the queue returns JobNotFoundErr.

func (*SQLiteQueue) IsEmpty

func (q *SQLiteQueue) IsEmpty(ctx context.Context) (bool, error)

IsEmpty returns true if the queue has no jobs with the state JobStatePending otherwise returns false.

func (*SQLiteQueue) ListFailed added in v0.2.0

func (q *SQLiteQueue) ListFailed(ctx context.Context, opts ...ListOption) ([]Job, error)

ListFailed returns a list of Job with the JobStateFailed state.

func (*SQLiteQueue) ListPending added in v0.2.0

func (q *SQLiteQueue) ListPending(ctx context.Context, opts ...ListOption) ([]Job, error)

ListPending returns a list of Job with the JobStatePending state.

func (*SQLiteQueue) ListRetry added in v0.2.0

func (q *SQLiteQueue) ListRetry(ctx context.Context, opts ...ListOption) ([]Job, error)

ListRetry returns a list of Job with the JobStateRetry state.

func (*SQLiteQueue) ListRunning added in v0.2.0

func (q *SQLiteQueue) ListRunning(ctx context.Context, opts ...ListOption) ([]Job, error)

ListRunning returns a list of Job with the JobStateRunning state.

func (*SQLiteQueue) RemoveJob added in v0.3.0

func (q *SQLiteQueue) RemoveJob(ctx context.Context, id int64) error

RemoveJob removes a job from the queue. If the job is not in the queue return JobNotFoundErr.

func (*SQLiteQueue) Retry

func (q *SQLiteQueue) Retry(ctx context.Context, jobID int64) error

Retry re-adds a Job in the queue with JobStateRetry state. If the job is not in the queue returns JobNotFoundErr.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL