red

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2022 License: GPL-3.0 Imports: 13 Imported by: 3

README

go-red

Golang distributed job scheduling.

Installation

go-red may be installed using the go get command:

go get github.com/pghq/go-red

Usage

import "github.com/pghq/go-red"

To create a new queue:

queue := red.New("redis://user:pass@example.com?queue=messages")
if err := queue.Enqueue(context.TODO(), "key", "value"); err != nil{
    panic(err)
}

Powered By

Documentation

Index

Constants

View Source
const (
	// DefaultSchedulerInterval is the default period between checking tasks to schedule
	DefaultSchedulerInterval = time.Millisecond

	// DefaultEnqueueTimeout is the default time allowed for queue write ops
	DefaultEnqueueTimeout = 100 * time.Millisecond

	// DefaultDequeueTimeout is the default time allowed for queue read ops
	DefaultDequeueTimeout = 100 * time.Millisecond
)
View Source
const (
	// DefaultInstances is the default number of simultaneous workers
	DefaultInstances = 1

	// DefaultWorkerInterval is the default period between running batches of jobs
	DefaultWorkerInterval = time.Millisecond
)
View Source
const (
	// BatchSize the total number of errors and messages consumed before dropping occurs
	BatchSize = 1024
)

Variables

View Source
var (
	// ErrNoMessages is an error returned when a request is made but no messages are available
	ErrNoMessages = trail.NewErrorBadRequest("no messages currently available")
)

Functions

This section is empty.

Types

type Log added in v0.0.10

type Log struct {
	// contains filtered or unexported fields
}

Log instance with quiet support

func (Log) Debugf added in v0.1.0

func (l Log) Debugf(format string, args ...interface{})

Debugf formatted value

func (Log) Infof added in v0.1.0

func (l Log) Infof(format string, args ...interface{})

Infof formatted value

type Message

type Message struct {
	Id    string `json:"id"`
	Value []byte `json:"value"`
	// contains filtered or unexported fields
}

Message is a single instance of a message within the queue.

func (*Message) Ack

func (m *Message) Ack(ctx context.Context) error

Ack notifies upstream of successful message handling

func (*Message) Decode

func (m *Message) Decode(v interface{}) error

Decode fills the supplied interface with the underlying message value

func (*Message) Reject

func (m *Message) Reject(ctx context.Context) error

Reject notifies upstream of unsuccessful message handling

type Red

type Red struct {
	Name string
	URL  *url.URL
	// contains filtered or unexported fields
}

Red is an instance of the exclusive message queue.

func New added in v0.0.17

func New(redisURL string) *Red

New creates a named instance of the exclusive queue

func (*Red) Dequeue

func (r *Red) Dequeue(ctx context.Context) (*Message, error)

Dequeue message from the queue

func (*Red) Enqueue

func (r *Red) Enqueue(ctx context.Context, key, value interface{}) error

Enqueue message into queue

func (Red) Error

func (r Red) Error() error

Error checks if any errors have occurred

func (Red) Lock added in v0.0.17

func (r Red) Lock(name string, opts ...redsync.Option) *redsync.Mutex

Lock attempts to obtain a named lock for read/write

func (Red) Once added in v0.0.18

func (r Red) Once(key string)

Once schedules a task to be done once

func (Red) RLock added in v0.0.17

func (r Red) RLock(name string, opts ...redsync.Option) *redsync.Mutex

RLock attempts to obtain a named lock for reading

func (Red) Repeat added in v0.0.18

func (r Red) Repeat(key, recurrence string) error

Repeat schedules a task to be done at least once

func (Red) StartScheduling added in v0.0.17

func (r Red) StartScheduling(handler func(task *Task), schedulers ...func())

StartScheduling tasks

func (Red) StopScheduling added in v0.0.17

func (r Red) StopScheduling()

StopScheduling tasks

func (Red) Wait added in v0.0.25

func (r Red) Wait()

Wait for next task to be handled

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is an instance of a persistent background scheduler

func NewScheduler

func NewScheduler(queue *Red) *Scheduler

NewScheduler creates a scheduler instance.

func (*Scheduler) Add

func (s *Scheduler) Add(tasks ...*Task) *Scheduler

Add adds a task to be scheduled.

func (*Scheduler) DequeueTimeout

func (s *Scheduler) DequeueTimeout(timeout time.Duration) *Scheduler

DequeueTimeout sets the maximum time to wait for remove items from queue.

func (*Scheduler) EnqueueTimeout

func (s *Scheduler) EnqueueTimeout(timeout time.Duration) *Scheduler

EnqueueTimeout sets the maximum time to wait for adding items to queue.

func (*Scheduler) Every

func (s *Scheduler) Every(interval time.Duration) *Scheduler

Every sets the interval for checking for new jobs to scheduler.

func (*Scheduler) Handle added in v0.0.17

func (s *Scheduler) Handle(fn func(task *Task))

Handle tasks when scheduled

func (*Scheduler) NewTask added in v0.0.36

func (s *Scheduler) NewTask(id string) *Task

NewTask creates a new instance of a task to be scheduled.

func (*Scheduler) Start

func (s *Scheduler) Start()

Start begins scheduling tasks.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the scheduler and waits for background jobs to finish.

type Task

type Task struct {
	Id       string       `json:"id"`
	Schedule TaskSchedule `json:"schedule"`
	*redsync.Mutex
}

Task is an instance of a thing to be scheduled.

func (*Task) CanSchedule

func (t *Task) CanSchedule(now time.Time) bool

CanSchedule determines if the task can be scheduled at given time.

func (*Task) IsComplete

func (t *Task) IsComplete() bool

IsComplete checks if the tasks can no longer be scheduled.

func (*Task) MarkScheduled

func (t *Task) MarkScheduled(at time.Time) *Task

MarkScheduled marks the task as scheduled.

func (*Task) Occurrences

func (t *Task) Occurrences() int

Occurrences gets the number of times the task has been scheduled.

func (*Task) SetRecurrence

func (t *Task) SetRecurrence(rfc string) error

SetRecurrence sets a new recurrence rule based on rfc 5545

type TaskSchedule

type TaskSchedule struct {
	Recurrence string    `json:"recurrence"`
	Count      int       `json:"count"`
	UpdatedAt  time.Time `json:"updatedAt"`
	sync.RWMutex
}

TaskSchedule is the schedule for when the task is to occur.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is an instance of a background worker.

func NewQWorker added in v0.0.17

func NewQWorker(name string, jobs ...func()) *Worker

NewQWorker creates a new worker instance with quiet mode enabled.

func NewWorker

func NewWorker(name string, jobs ...func()) *Worker

NewWorker creates a new worker instance.

func (*Worker) AddJobs added in v0.0.17

func (w *Worker) AddJobs(jobs ...func())

AddJobs adds jobs to the worker

func (*Worker) Concurrent

func (w *Worker) Concurrent(instances int) *Worker

Concurrent sets the number of simultaneous instances to process tasks.

func (*Worker) Every

func (w *Worker) Every(interval time.Duration) *Worker

Every sets the time between processing tasks.

func (*Worker) Start

func (w *Worker) Start()

Start begins processing tasks.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker and waits for all instances to terminate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL