neoq

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2023 License: AGPL-3.0 Imports: 5 Imported by: 1

README

Neoq

Background job processing for Go

Go Reference Gitter chat

Installation

go get github.com/acaloiaro/neoq

About

Neoq is a queue-agnostic background job framework for Go.

Neoq job handlers are the same, whether queues are in-memory for development/testing, or Postgres, Redis, or a custom queue for production -- allowing queue infrastructure to change without code change.

Developing/testing or don't need a durable queue? Use the in-memory queue.

Running an application in production? Use Postgres.

Have higher throughput demands in production? Use Redis.

Neoq does not aim to be the fastest background job processor. It aims to be fast, reliable, and demand a minimal infrastructure footprint.

What it does

  • Multiple Backends: In-memory, Postgres, Redis, or user-supplied custom backends.
  • Retries: Jobs may be retried a configurable number of times with exponential backoff and jitter to prevent thundering herds
  • Job uniqueness: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple jobs with the same payload are not re-queued)
  • Job Timeouts: Queue handlers can be configured with per-job timeouts with millisecond accuracy
  • Periodic Jobs: Jobs can be scheduled periodically using standard cron syntax
  • Future Jobs: Jobs can be scheduled either for the future or immediate execution
  • Concurrency: Concurrency is configurable for every queue

Getting Started

Getting started is as simple as declaring queue handlers and adding jobs.

Additional documentation can be found in the wiki: https://github.com/acaloiaro/neoq/wiki

Error handling in this section is excluded for simplicity.

Add queue handlers

Queue handlers listen for Jobs on queues. Jobs may consist of any payload that is JSON-serializable.

Queue Handlers are simple Go functions that accept a Context parameter.

Example: Add a listener on the hello_world queue using the default in-memory backend

ctx := context.Background()
nq, _ := neoq.New(ctx)
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

Enqueue jobs

Enqueuing jobs adds jobs to the specified queue to be processed asynchronously.

Example: Add a "Hello World" job to the hello_world queue using the default in-memory backend.

ctx := context.Background()
nq, _ := neoq.New(ctx)
nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Redis

Example: Process jobs on the "hello_world" queue and add a job to it using the redis backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(redis.Backend),
  redis.WithAddr("localhost:6379"),
  redis.WithPassword(""),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Postgres

Example: Process jobs on the "hello_world" queue and add a job to it using the postgres backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(postgres.Backend),
  postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Example Code

Additional example integration code can be found at https://github.com/acaloiaro/neoq/tree/main/examples

Status

This project is currently in alpha. Future releases may change the API. It currently leaks some resources. It can handle unimportant workloads.

Documentation

Overview

Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a _minimal infrastructure footprint_.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(ctx context.Context, opts ...config.Option) (b types.Backend, err error)

New creates a new backend instance for job processing.

By default, neoq initializes memory.Backend if New() is called without a backend configuration option.

Use neoq.WithBackend to initialize different backends.

For available configuration options see config.ConfigOption.

Example
ctx := context.Background()
nq, err := New(ctx, WithBackend(memory.Backend))
if err != nil {
	fmt.Println("initializing a new Neoq with no params should not return an error:", err)
	return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with default memory backend")
Output:

neoq initialized with default memory backend
Example (Postgres)
ctx := context.Background()
var pgURL string
var ok bool
if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
	fmt.Println("Please set TEST_DATABASE_URL environment variable")
	return
}

nq, err := New(ctx, WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
if err != nil {
	fmt.Println("neoq's postgres backend failed to initialize:", err)
	return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with postgres backend")
Output:

neoq initialized with postgres backend

func WithBackend

func WithBackend(initializer config.BackendInitializer) config.Option

WithBackend configures neoq to initialize a specific backend for job processing.

Neoq provides two config.BackendInitializer that may be used with WithBackend

Example
ctx := context.Background()
nq, err := New(ctx, WithBackend(memory.Backend))
if err != nil {
	fmt.Println("initializing a new Neoq with no params should not return an error:", err)
	return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with memory backend")
Output:

neoq initialized with memory backend
Example (Postgres)
ctx := context.Background()
var pgURL string
var ok bool
if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
	fmt.Println("Please set TEST_DATABASE_URL environment variable")
	return
}

nq, err := New(ctx, WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
if err != nil {
	fmt.Println("initializing a new Neoq with no params should not return an error:", err)
	return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with postgres backend")
Output:

neoq initialized with postgres backend

func WithJobCheckInterval

func WithJobCheckInterval(interval time.Duration) config.Option

WithJobCheckInterval configures the duration of time between checking for future jobs

Types

This section is empty.

Directories

Path Synopsis
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/types.Backend
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/types.Backend
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL