ctxerrpool

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2021 License: Apache-2.0 Imports: 3 Imported by: 0

README

Go Report Card PkgGoDev

ctxerrpool

Create a pool of a given number of worker goroutines to behave as a worker pool. The pool will do work that is aware of context.Context and error handling.

Full example

This example will use a worker pool to HTTP GET https://golang.org 16 times and print the status codes with a logger.

package main

import (
	"bytes"
	"context"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/MicahParks/ctxerrpool"
)

func main() {

	// Create an error handler that logs all errors.
	var errorHandler ctxerrpool.ErrorHandler
	errorHandler = func(pool ctxerrpool.Pool, err error) {
		log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
	}

	// Create a worker pool with 4 workers.
	pool := ctxerrpool.New(4, errorHandler)

	// Create some variables to inherit through a closure.
	httpClient := &http.Client{}
	u := "https://golang.org"
	logger := log.New(os.Stdout, "status codes: ", 0)

	// Create the worker function.
	var work ctxerrpool.Work
	work = func(ctx context.Context) (err error) {

		// Create the HTTP request.
		var req *http.Request
		if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
			return err
		}

		// Do the HTTP request.
		var resp *http.Response
		if resp, err = httpClient.Do(req); err != nil {
			return err
		}

		// Log the status code.
		logger.Println(resp.StatusCode)

		return nil
	}

	// Do the work 16 times.
	for i := 0; i < 16; i++ {

		// Create a context for the work.
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()

		// Send the work to the pool.
		pool.AddWorkItem(ctx, work)
	}

	// Wait for the pool to finish.
	pool.Wait()
}

Terminology

Term Description
worker A goroutine dedicated to completing work items.
worker pool A number of workers who all consume work items from the same set and report errors via a common handler.
worker function A function matching a specific signature that can be run by a worker.
work item A worker function plus a unique context.Context and context.CancelFunc pair that will be run once by a worker.

Differences between golang.org/x/sync/errgroup

This package github.com/MicahParks/ctxerrpool and golang.org/x/sync/errgroup are similar, but serve different use cases.

The current overview for errgroup is:

Package errgroup provides synchronization, error propagation, and Context cancelation for pools of goroutines working on subtasks of a common task.

In terms of Context, this is to say that each group is associated to one context. Another key point is: all tasks (work items) for a group are subtasks of a common task.

In contrast, ctxerrpool makes work items and contexts have a many-to-one relationship. worker functions do not need to be subtasks of a common task as one of the primary features of the pool is to behave as a worker pool. I find worker pools helpful in performing work asynchronously without worrying about creating too many goroutines.

Purpose one to one with context.Context errors
errpool subtasks of a common task errpool.Pool Propagation
ctxerrpool goroutines as worker pools work item Handler function

Benefits

  • Async error handling simplified.
  • Familiar methods.
    • Done method mimics context.Context's.
    • Wait method mimics sync.WaitGroup's.
  • Flat and simple.
    • Only exported struct is ctxerrpool.Pool.
  • Apache 2.0 License.
  • No dependencies outside of the packages included with the Golang compiler.
  • Small code base.
    • Three source files with less than 350 lines of code including lots of comments.
  • Test coverage is greater than 90%.
  • The pool and its workers will all be cleaned up with pool.Kill(). (All work sent to the pool should exit as well, if it respects its own context.)

Usage

Basic Workflow

Create an error handler

The first step to using a worker pool is creating an error handler. The worker pool is expecting all worker functions to match the ctxerrpool.Work function signature: type Work func(ctx context.Context) (err error).

Error handlers have the function signature of type ErrorHandler func(pool Pool, err error) where the first argument is the ctxerrpool.Pool that the error handler is handling errors for and the second argument is the current error reported from a worker.

The example error handler below logs all errors with the build in logger.

// Create an error handler that logs all errors.
var errorHandler ctxerrpool.ErrorHandler
errorHandler = func(pool ctxerrpool.Pool, err error) {
	log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
}
Create a worker pool

After the error handler has been created, the worker pool can be created.

// Create a worker pool with 4 workers.
pool := ctxerrpool.New(4, errorHandler)

The first argument is the number of workers. The number of workers is the maximum number of goroutines that can be working on a work item at any one time. If the number of workers is 0, the worker pool will be useless.

The second argument is the error handler created in the previous step. All errors will be sent to the error handler asynchronously (in a separate goroutine).

Create worker functions

worker functions sent to the worker pool must match the ctxerrpool.Work function signature: type Work func(workCtx context.Context) (err error) and is expected to respect its given context, workCtx. If the context is not respected and the worker pool is killed, the goroutine performing the work will leak.

Here is an example of a worker function that respects its context:

// Create the worker function.
var work ctxerrpool.Work
work = func(ctx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}

Here is an example of a worker function that does the same thing without respecting its own context:

// Create the worker function.
var work ctxerrpool.Work
work = func(ctx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequest(http.MethodGet, u, bytes.NewReader(nil)); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}

Since most functions do not match the ctxerrpool.Work signature, function closures are typically a convenient way to create worker functions.

// Create some variables to inherit through a closure.
httpClient := &http.Client{}
u := "https://golang.org"
logger := log.New(os.Stdout, "status codes: ", 0)

// Create the worker function.
var work ctxerrpool.Work
work = func(workCtx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequestWithContext(workCtx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}
Create a context

work items and contexts have a many-to-one relationship. Individual work items can share contexts and others can act have unique contexts. This is helpful when a timeout for a work item is supposed to start right before the work starts.

Here is an example of multiple work items sharing a context:

// All these work items must be completed within 1 second.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Do the work 16 times.
for i := 0; i < 16; i++ {

	// Send the work to the pool.
	pool.AddWorkItem(ctx, work)
}

Here is an example of every work item being able to have 1 second of run time.

// Do the work 16 times.
for i := 0; i < 16; i++ {

	// Let the work go for a max of 1 second.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Send the work to the pool.
	pool.AddWorkItem(ctx, work)
}
Adding work items

Adding, in its simplest form. Was addressed above on these lines.

// Send the work to the pool.
pool.AddWorkItem(ctx, work)

Any time the AddWorkItem method is called, a new work item will be taken and performed by the worker pool.

Remember that worker functions can also be created via function closures. This allows access to variables that are needed but do not match the function signature: ctxerrpool.Work.

The first and second arguments are the unique context and cancellation functions for the given work item.

The third argument is the work function created in a previous step.

Let all work items finish

pool.Wait()

or

<-pool.Done()

Both statements will block until the worker pool has completed all given work items. The Done method is idea for select statements.

Clean up the worker pool

pool.Kill()

Killing the worker pools isn't required, but if the worker pool is no longer being used it's best to tell all its goroutines to return to reclaim their resources. If the program's main function is about to end, pool.Kill() will be accomplished regardless.

A worker pool can be killed before all work items finish. Outstanding work items' context.CancelFuncs will be called.

Test Coverage

Testing coverage for this repository is currently greater than 90%. Depending on how Go runtime schedules things, certain paths may or may not be executed. This is based on a sample of 1000 tests with coverage and race detection. All tests pass without any detected race conditions.

If you are interested in test coverage, please view the cmd/coverage.go tool and the cmd/profiles directory. The directory has log output from the tool and testing coverage profiles for multiple samples of the 1000 tests.

In my experience, test coverage that counts the number of lines executed can be a misleading number. While I believe the current written tests adequately cover the code base, if there are test cases that are not covered by the current testing files, please feel free to add an issue requesting the test case or create a PR that adds tests.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrCantDo indicates that there was a failure to send the function to work on to a worker before the context
	// expired.
	ErrCantDo = errors.New("failed to send work item to a worker before the context expired")
)

Functions

This section is empty.

Types

type ErrorHandler

type ErrorHandler func(pool Pool, err error)

ErrorHandler is a function that receives an error and handles it.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is the way to control a pool of worker goroutines that understand context.Context and error handling.

func New

func New(workers uint, errorHandler ErrorHandler) Pool

New creates a new Pool.

func (Pool) AddWorkItem

func (g Pool) AddWorkItem(ctx context.Context, work Work)

AddWorkItem takes in context information and a Work function and gives it to a worker. This can block if all workers are busy and the work item buffer is full. This function will block if no workers are ready. Call with the go keyword to launch it in another goroutine to guarantee no blocking.

func (Pool) Dead

func (g Pool) Dead() bool

Dead determines if the pool is dead.

func (Pool) Death

func (g Pool) Death() <-chan struct{}

Death returns a channel that will close when the Pool has died.

func (Pool) Done

func (g Pool) Done() <-chan struct{}

Done mimics the functionality of the context.Context Done method. It returns a channel that will close when all given work has been completed or when the pool dies.

func (Pool) Kill

func (g Pool) Kill()

Kill tells all the worker goroutines and work items to end.

func (Pool) Wait

func (g Pool) Wait()

Wait mimics the functionality of the sync.WaitGroup Wait method. It returns when all given work has been completed or when the pool dies.

type Work

type Work func(workCtx context.Context) (err error)

Work is a function that utilizes the given context properly and returns an error.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL