workerpool

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2023 License: MIT Imports: 3 Imported by: 17

README

Worker Pool

The library implements worker pool pattern by channels. Now realized 2 kinds of worker pool: Pool and TimedPool. Pool is the generic realization of worker pool pattern when tasks are received from outside. TimedPool - is the worker pool implementation where tasks are received by dispatcher on ticker event.

Install

go get github.com/dipdup.net/workerpool

Examples

Usage of Pool

package main

import (
	"context"
	"log"
	"time"
)

func main() {
	pool := NewPool(worker, 2)

	ctx, cancel := context.WithCancel(context.Background())
	pool.Start(ctx)

	dispatcher(ctx, pool)

	time.Sleep(time.Minute)

	cancel()

	if err := pool.Close(); err != nil {
		log.Panic(err)
	}
}

func worker(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			time.Sleep(time.Second)
			log.Printf("hello, %s", name)
			return
		}
	}
}

func dispatcher(ctx context.Context, pool *Pool[string]) {
	for _, name := range []string{"John", "Mark", "Peter", "Mike"} {
		select {
		case <-ctx.Done():
			return
		default:
			pool.AddTask(name)
		}
	}
}

Usage of TimedPool

package main

import (
	"context"
	"log"
	"time"
)

func main() {
	pool := NewTimedPool(dispatcher, worker, nil, 2, 60*1000) // tasks will be received over 60 seconds

	ctx, cancel := context.WithCancel(context.Background())
	pool.Start(ctx)

	time.Sleep(time.Minute)

	cancel()

	if err := pool.Close(); err != nil {
		log.Panic(err)
	}
}

func worker(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			time.Sleep(time.Second)
			log.Printf("hello, %s", name)
			return
		}
	}
}

func dispatcher(ctx context.Context) ([]string, error) {
	return []string{"John", "Mark", "Peter", "Mike"}, nil
}

NewTimedPool receives as arguments 3 handlers: Dispatcher, Worker and ErrorHandler.

// Worker - worker handler. Calls on task arriving.
type Worker[Task any] func(ctx context.Context, task Task)

// Dispatcher - the function is called by timer for receiving tasks
type Dispatcher[Task any] func(ctx context.Context) ([]Task, error)

// ErrorHandler - the function is called when error occured in dispatcher
type ErrorHandler[Task any] func(ctx context.Context, err error)

Also it receives 2 integers: workers count and time between dispatcher calls in milliseconds.

Group

Group can be used for running processes with wait group. For example:

// this program runs 2 ticker functions and waits its executed.
func main() {
	group := NewGroup()
	group.Go(ticker3)
	group.Go(ticker4)
	group.Wait()
}

func ticker3() {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	var count int

	for range ticker.C {
		count++
		log.Print("second")

		if count == 5 {
			return
		}
	}
}

func ticker4() {
	ticker := time.NewTicker(time.Second * 2)
	defer ticker.Stop()

	var count int

	for range ticker.C {
		count++
		log.Print("2 second")

		if count == 5 {
			return
		}
	}
}

With using context:

// the program runs two tickers and after 10 seconds cancel it using context cancellation
func main() {
	ctx, cancel := context.WithCancel(context.Background())

	group := NewGroup()
	group.GoCtx(ctx, ticker1)
	group.GoCtx(ctx, ticker2)

	time.Sleep(10 * time.Second)
	cancel()

	group.Wait()
}

func ticker1(ctx context.Context) {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			log.Print("second")
		}
	}
}

func ticker2(ctx context.Context) {
	ticker := time.NewTicker(time.Second * 2)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			log.Print("2 second")
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher[Task any] func(ctx context.Context) ([]Task, error)

Dispatcher - the function is called by timer for receiving tasks

type ErrorHandler

type ErrorHandler[Task any] func(ctx context.Context, err error)

ErrorHandler - the function is called when error occured in dispatcher

type Group added in v0.0.4

type Group struct {
	// contains filtered or unexported fields
}

Runs functions with one wait group

func NewGroup added in v0.0.4

func NewGroup() Group

NewGroup - creates new Group

func (Group) Go added in v0.0.4

func (g Group) Go(f func())

Go - runs function with wait group

func (Group) GoCtx added in v0.0.4

func (g Group) GoCtx(ctx context.Context, f func(ctx context.Context))

GoCtx - runs function with wait group using context

func (Group) Wait added in v0.0.4

func (g Group) Wait()

Wait - waits until grouped functions end

type Pool

type Pool[Task any] struct {
	// contains filtered or unexported fields
}

Pool - worker pool entity

func NewPool

func NewPool[Task any](worker Worker[Task], workersCount int) *Pool[Task]

NewPool - creates new worker pool. `worker` - is a job. `workersCount` - count of workers executing jobs.

func (*Pool[Task]) AddTask

func (pool *Pool[Task]) AddTask(task Task)

AddTask - adds task to worker pool. Blocks if worker pool is full.

func (*Pool[Task]) Clear

func (pool *Pool[Task]) Clear()

Clear - clears queue

func (*Pool[Task]) Close

func (pool *Pool[Task]) Close() error

Close - wait returnning from workers and closing pool object

func (*Pool[Task]) QueueSize

func (pool *Pool[Task]) QueueSize() int

QueueSize - current count of tasks in pool

func (*Pool[Task]) Start

func (pool *Pool[Task]) Start(ctx context.Context)

Start - begin worker pool

func (*Pool[Task]) WorkersCount

func (pool *Pool[Task]) WorkersCount() int

WorkersCount - returns workers count in the pool

type TimedPool

type TimedPool[Task any] struct {
	*Pool[Task]
	// contains filtered or unexported fields
}

TimedPool - worker pool which receives task by executing `dispatcher` function by ticker.

func NewTimedPool

func NewTimedPool[Task any](
	dispatcher Dispatcher[Task],
	worker Worker[Task],
	errorHandler ErrorHandler[Task],
	workersCount int,
	periodReceivingTasks int,

) *TimedPool[Task]

NewTimedPool - creates new `TimedPool`. `dispatcher` - is a function returning tasks. If it's null `Start` function immidiately exits. `worker` - is a job executing by a task. `errorHandler` - callback to error handling in `dispatcher` executing. May be null. `workersCount` - count of workers executing jobs. `periodReceivingTasks` - ticker period in milliseconds when `dispatcher` will be called.

func (*TimedPool[Task]) Start

func (pool *TimedPool[Task]) Start(ctx context.Context)

Start - starts pool and dispatcher.

type Worker

type Worker[Task any] func(ctx context.Context, task Task)

Worker - worker handler. Calls on task arriving.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL