jobqueue

package module
v0.0.0-...-8c89544 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2024 License: MIT Imports: 17 Imported by: 0

README

go-jobqueue

Codecov Go Report Card Go Reference


A lightweight, durable, and embedded job queue for Go applications. Powered by BadgerDB.

Warning: this package is a work in progress, use at your own risk.

Features

  • Portable alternative to full-fledged message brokers (i.e. RabbitMQ).
  • Built on top of BadgerDB for durability
  • Automatic job processing with support for multiple concurrent workers.
  • Strong type safety using generics.

Installation

go get github.com/argus-labs/go-jobqueue

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrJobNotFound = errors.New("job not found")

returned by ReadJob or UpdateJob if the job is not found

Functions

This section is empty.

Types

type JobContext

type JobContext interface {
	JobID() uint64
	JobCreatedAt() time.Time
}

JobContext provides context for a job which is injected into the job Process method.

type JobQueue

type JobQueue[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](name string, workers int, handler func(JobContext, T) error, opts ...Option[T],
) (*JobQueue[T], error)

New creates a new JobQueue with the specified database, name, and number of worker goroutines. It initializes the job queue, starts the worker goroutines, and returns the JobQueue instance and an error, if any.

func (*JobQueue[T]) Enqueue

func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error)

func (*JobQueue[T]) Stop

func (jq *JobQueue[T]) Stop() error

type JobQueueDb

type JobQueueDb[T any] interface {
	Open(path string, queueName string) error
	Close() error
	GetNextJobId() (uint64, error)
	FetchJobs(count int) ([]*job[T], error)
	ReadJob(jobID uint64) (*job[T], error)
	AddJob(job *job[T]) (uint64, error) // returns the job ID
	DeleteJob(jobID uint64) error
}

func NewJobQueueDbBadger

func NewJobQueueDbBadger[T any](inMemory bool) JobQueueDb[T]

func NewJobQueueDbMongo

func NewJobQueueDbMongo[T any](ctx context.Context) JobQueueDb[T]

NewJobQueueDbMongo creates a new JobQueueDbMongo instance

type JobQueueDbBadger

type JobQueueDbBadger[T any] struct {
	// contains filtered or unexported fields
}

func (*JobQueueDbBadger[T]) AddJob

func (jqdb *JobQueueDbBadger[T]) AddJob(job *job[T]) (uint64, error)

func (*JobQueueDbBadger[T]) Close

func (jqdb *JobQueueDbBadger[T]) Close() error

func (*JobQueueDbBadger[T]) DeleteJob

func (jqdb *JobQueueDbBadger[T]) DeleteJob(jobID uint64) error

func (*JobQueueDbBadger[T]) FetchJobs

func (jqdb *JobQueueDbBadger[T]) FetchJobs(count int) ([]*job[T], error)

func (*JobQueueDbBadger[T]) GetNextJobId

func (jqdb *JobQueueDbBadger[T]) GetNextJobId() (uint64, error)

func (*JobQueueDbBadger[T]) Open

func (jqdb *JobQueueDbBadger[T]) Open(path string, queueName string) error

func (*JobQueueDbBadger[T]) ReadJob

func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error)

type JobQueueDbMongo

type JobQueueDbMongo[T any] struct {
	// contains filtered or unexported fields
}

JobQueueDbMongo is the MongoDB implementation of the JobQueueDb interface

func (*JobQueueDbMongo[T]) AddJob

func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error)

AddJob(job *job[T]) (uint64, error) // returns the job ID

func (*JobQueueDbMongo[T]) Close

func (jqdb *JobQueueDbMongo[T]) Close() error

Close the MongoDB database

func (*JobQueueDbMongo[T]) DeleteJob

func (jqdb *JobQueueDbMongo[T]) DeleteJob(jobID uint64) error

DeleteJob(jobID uint64) error

func (*JobQueueDbMongo[T]) FetchJobs

func (jqdb *JobQueueDbMongo[T]) FetchJobs(count int) ([]*job[T], error)

FetchJobs(count int) ([]*job[T], error)

func (*JobQueueDbMongo[T]) GetNextJobId

func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error)

GetNextJobId() (uint64, error)

func (*JobQueueDbMongo[T]) Open

func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error

Open the MongoDB database

func (*JobQueueDbMongo[T]) ReadJob

func (jqdb *JobQueueDbMongo[T]) ReadJob(jobID uint64) (*job[T], error)

ReadJob(jobID uint64) (*job[T], error)

type JobStatus

type JobStatus string
const (
	JobStatusPending   JobStatus = "pending"
	JobStatusCompleted JobStatus = "complete"
)

type Option

type Option[T any] func(*JobQueue[T])

func WithBadgerDB

func WithBadgerDB[T any](path string) Option[T]

WithBadgerDB sets the JobQueue to use BadgerDB instead of MongoDB. if WithInMemDB or WithBadgerDB was previously called, we will warn and ignore this option.

func WithFetchInterval

func WithFetchInterval[T any](interval time.Duration) Option[T]

WithFetchInterval sets the interval at which the job queue fetches jobs from BadgerDB.

func WithInMemDB

func WithInMemDB[T any]() Option[T]

WithInmemDB uses an in-memory BadgerDB instead of a persistent one. Useful for testing, but provides no durability guarantees. if we previously called UseMongoDB, we will warn and ignore this option.

func WithJobBufferSize

func WithJobBufferSize[T any](size int) Option[T]

WithJobBufferSize sets the size of the job channel.

func WithJobsPerFetch

func WithJobsPerFetch[T any](count int) Option[T]

how many jobs at once are retrieved from the DB in a single fetch operation

func WithMongoDB

func WithMongoDB[T any](uri string) Option[T]

WithMongoDB sets the JobQueue to use MongoDB instead of BadgerDB. if WithInMemDB was previously called, we will warn and ignore this option.

type TimeStat

type TimeStat struct {
	TotalTime time.Duration
	MinTime   time.Duration
	MaxTime   time.Duration
	Count     int64
}

func (*TimeStat) AvgTime

func (ts *TimeStat) AvgTime() time.Duration

func (*TimeStat) RecordTime

func (ts *TimeStat) RecordTime(duration time.Duration)

func (*TimeStat) Reset

func (ts *TimeStat) Reset()

func (*TimeStat) String

func (ts *TimeStat) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL