wq

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: MIT Imports: 7 Imported by: 0

README

wq

Go Report Card

Simple workqueue library

Documentation

Index

Constants

View Source
const (
	// DefaultRetryAfter sets default value for retry
	DefaultRetryAfter = 30 * time.Second

	// DefaultQueueLength
	DefaultQueueLength = 1000

	// DefaultMaxRetries set default max retries for scheduler
	DefaultMaxRetries = -1
)

Variables

View Source
var (
	// ErrorDuplicate is returnd for duplicated tasks
	ErrorDuplicate = errors.New("Dropped task as duplicate")
)

Functions

This section is empty.

Types

type ProcessFunc

type ProcessFunc func(context.Context, TaskInterface) error

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(opts ...SchedulerOpt) *Scheduler

func (*Scheduler) GetWorker

func (s *Scheduler) GetWorker(key string) (*Worker, error)

func (*Scheduler) Name

func (s *Scheduler) Name() string

Name returns scheduler name

func (*Scheduler) ProcessQueue

func (s *Scheduler) ProcessQueue()

ProcessQueue trigger queue processing

func (*Scheduler) QueueTask

func (s *Scheduler) QueueTask(t *Task) error

func (*Scheduler) Start

func (s *Scheduler) Start() error

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop terminates all running workers and stop

func (*Scheduler) WorkerCount

func (s *Scheduler) WorkerCount() int

type SchedulerOpt

type SchedulerOpt func(*Scheduler)

func WithMaxRetries

func WithMaxRetries(n int) SchedulerOpt

WithMaxRetries sets number or retries for tasks use WithMaxRetries(0) to disable any retry (tak will run only once) WithMaxRetries(1) means task will be retried once Default value is -1 meaning retry forever

func WithMaxTasks

func WithMaxTasks(n int) SchedulerOpt

WithMaxTasks set maxTasks for scheduler

func WithMaxWorkers

func WithMaxWorkers(n int) SchedulerOpt

WithMaxWorkers set maxWorkers for scheduler

func WithName

func WithName(name string) SchedulerOpt

WithName sets scheduler name

func WithQueueLength

func WithQueueLength(n int) SchedulerOpt

WithQueueLength defines queue lenght for tasks Required to be > 10

func WithQueueLoopSeconds

func WithQueueLoopSeconds(n float64) SchedulerOpt

WithQueueLoopSeconds sets how often scheduler read waiting queue

func WithRetryAfterSeconds

func WithRetryAfterSeconds(n float64) SchedulerOpt

WithRetryAfterSeconds set how long scheduler waits before retrying failed task

func WithVerbose

func WithVerbose() SchedulerOpt

WithVerbose enables verbose mode

func WithWorkerTaskBuffer

func WithWorkerTaskBuffer(n int) SchedulerOpt

WithWorkerTaskBuffer set buffer lenght for task

func WithWorkerTerminationTimeoutSeconds

func WithWorkerTerminationTimeoutSeconds(n float64) SchedulerOpt

WithWorkerTerminationTimeoutSeconds defines how long we wait for worker

type Task

type Task struct {
	// Unique task identification
	ID string
	// Queue key, default will be used when empty
	Key string

	// Object which will be passed to worker
	Obj TaskInterface

	// Timing options
	Queued   time.Time
	Started  time.Time
	Finished time.Time

	// Task result
	Result     error
	RetryAfter time.Time
	// How long wait before retry
	RetryTimeout time.Duration

	// Function for processing this task
	// It will be executed by worker
	ProcessFunc ProcessFunc
	// contains filtered or unexported fields
}

func NewTask

func NewTask(key string, obj TaskInterface, pf ProcessFunc) *Task

func (*Task) GetFailures

func (t *Task) GetFailures() int

GetFailures returns number of task failures

func (*Task) GetResult

func (t *Task) GetResult() error

GetResult returns task result

func (*Task) GetRetryAfter

func (t *Task) GetRetryAfter() time.Time

GetRetryAfter returns retry after

func (*Task) Run

func (t *Task) Run(ctx context.Context)

func (*Task) SetFailures

func (t *Task) SetFailures(n int)

SetFailures sets number of failures direclty It's useful when external tool is adding task to schedulers and this task was already failed before.

func (*Task) Validate

func (t *Task) Validate() error

Validate if task is correct

func (*Task) WaitMore

func (t *Task) WaitMore() time.Duration

Should wait indicate if this task is in Retry period

type TaskInterface

type TaskInterface interface{}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(key string, rc chan *Task, resChan chan<- *Task, stopChan chan<- string) *Worker

func (*Worker) GetKey

func (w *Worker) GetKey() string

func (*Worker) LifeTime

func (w *Worker) LifeTime() time.Duration

func (*Worker) Start

func (w *Worker) Start()

Start worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop wait for current task and stop worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL