goworker

package module
v0.0.0-...-c5702c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2021 License: MIT Imports: 3 Imported by: 0

README

goworker

Based on http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

Example:
package main

import (
	"fmt"
	"github.com/magicalbanana/goworker"
	"log"
	"os"
	"os/signal"
	"syscall"
)

type Job struct {
	SomeJobData string
}

// check interface implementation
var _ goworker.Job = (*Job)(nil)

// Perform - Job struct must implements GoJob interface, need this function
func (j *Job) Perform() {
	log.Println("Worker do this job: ", j.SomeJobData)
}

func main() {

	//init system signal channel, need for Ctrl+C and, kill and other system signal handling

	sChan := make(chan os.Signal, 1)
	signal.Notify(sChan,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT)

	// start dispatcher with 10 workers (goroutines) and jobsQueue channel size 20
	d := goworker.NewDispatcher(10, 20)

	go func() {
		// catch quit signal
		s := <-sChan
		log.Println("os.Signal", s, "received, finishing application...")
		// stop dispatcher
		d.Stop()
		return
	}()

	go func() {
		for i := 1; i < 100; i++ {
			j := &Job{
				SomeJobData: "job number" + fmt.Sprintf("%d", i),
			}

			// add job to dispatcher
			d.AddJob(j)
		}
	}()

	// start dispatcher
	d.Run()
}

If you want to get unperformed jobs after d.Stop() use d.GetUnperformedJobs() method:

// get unperformedJobs slice
unperformedJobsChan := d.GetUnperformedJobs()

If you want to clean unperformed jobs after d.Stop() use d.CleanUnperformedJobs() method:

d.CleanUnperformedJobs()

Warning! Method Start() clears all unperformed jobs!

If you want to use it to restart jobs use this trick:

// stop and get jobs
d.Stop()
unperformedJobs := d.GetUnperformedJobs()

// restart
d.Start()

// add unperformed jobs again
for _,job := range unperformedJobs {
	d.AddJob(job)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// A pool of workers channels that are registered with the goworker
	Verbose    bool
	WorkerPool chan chan JobPerformer
	Workers    chan *Worker
	Logger     Logger
	// contains filtered or unexported fields
}

Dispatcher starts workers and route jobs for it

func NewDispatcher

func NewDispatcher(maxWorkers int, jobsQueueSize uint) *Dispatcher

NewDispatcher construct new Dispatcher

func (*Dispatcher) AddJob

func (d *Dispatcher) AddJob(job JobPerformer)

AddJob adds new job to dispatcher

func (*Dispatcher) CleanUnperformedJobs

func (d *Dispatcher) CleanUnperformedJobs()

CleanUnperformedJobs remove unperformedJobs

func (*Dispatcher) CountJobs

func (d *Dispatcher) CountJobs() int

CountJobs counts the number of jobs in the queue

func (*Dispatcher) GetUnperformedJobs

func (d *Dispatcher) GetUnperformedJobs() []JobPerformer

GetUnperformedJobs method returns a chan of Jobs that have not been done before Stop() executed

func (*Dispatcher) Log

func (d *Dispatcher) Log(msg string, fields ...zapcore.Field)

Log ...

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run dispatcher

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop dispatcher

type JobPerformer

type JobPerformer interface {
	Perform()
}

JobPerformer is interface for jobs

type Logger

type Logger interface {
	Info(string, ...zapcore.Field)
}

Logger ...

type Worker

type Worker struct {
	WorkerPool chan chan JobPerformer
	JobChannel chan JobPerformer
	// contains filtered or unexported fields
}

Worker represents the worker that executes the job

func NewWorker

func NewWorker(num int, workerPool chan chan JobPerformer) *Worker

NewWorker constructs new Worker

func (*Worker) Start

func (w *Worker) Start(wg *sync.WaitGroup)

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL