ctxerrgroup

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2020 License: MIT Imports: 3 Imported by: 0

README

Go Report CardPkgGoDev

ctxerrgroup

Groups of goroutines that understand context.Context and error handling.

Benefits

  • Async error handling simplified.
  • Familiar methods.
    • Done method mimics context.Context's.
    • Wait method mimics sync.WaitGroup's.
  • Flat and simple.
    • Only exported struct is ctxerrgroup.Group.
  • MIT License.
  • No dependencies outside of the packages included with the Golang compiler.
  • Small code base.
    • Two source files with less than 350 lines of code including lots of comments.
  • Test coverage is greater than 90%.
  • The group and its workers will all be cleaned up with group.Kill(). (All work sent to the group should exit as well, if it respects its own context.)

Full example

This example will use a worker group to HTTP GET https://golang.org 16 times and print the status codes with a logger.

package main

import (
	"bytes"
	"context"
	"log"
	"net/http"
	"os"
	"time"

	"gitlab.com/MicahParks/ctxerrgroup"
)

func main() {

	// Create an error handler that logs all errors.
	var errorHandler ctxerrgroup.ErrorHandler
	errorHandler = func(group ctxerrgroup.Group, err error) {
		log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
	}

	// Create a worker group with 8 workers and a buffer that can queue work functions. Do not handle errors in a
	// separate goroutine.
	group := ctxerrgroup.New(4, 8, false, errorHandler)

	// Create some variables to inherit through a closure.
	httpClient := &http.Client{}
	u := "https://golang.org"
	logger := log.New(os.Stdout, "status codes: ", 0)

	// Create the worker function.
	var work ctxerrgroup.Work
	work = func(ctx context.Context) (err error) {

		// Create the HTTP request.
		var req *http.Request
		if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader([]byte{})); err != nil {
			return err
		}

		// Do the HTTP request.
		var resp *http.Response
		if resp, err = httpClient.Do(req); err != nil {
			return err
		}

		// Log the status code.
		logger.Println(resp.StatusCode)

		return nil
	}

	// Do the work 16 times.
	for i := 0; i < 16; i++ {

		// Create a context for the work.
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)

		// Send the work to the group.
		group.AddWorkItem(ctx, cancel, work)
	}

	// Wait for the group to finish.
	group.Wait()
}

Usage

Basic Workflow

Create an error handler

The very first step to using a worker group is creating an error handler. The worker group is expecting all work to match the ctxerrgroup.Work function signature: type Work func(ctx context.Context) (err error).

Error handlers have the function signature of type ErrorHandler func(group Group, err error) where the first argument is the ctxerrgroup.Group that the error handler is handling errors for and the second argument is the most recent error reported from a worker.

The example error handler below logs all errors with the build in logger.

// Create an error handler that logs all errors.
var errorHandler ctxerrgroup.ErrorHandler
errorHandler = func(group ctxerrgroup.Group, err error) {
	log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
}
Create a worker group

After the error handler has been created, the worker group can be created.

// Create a worker group with 8 workers and a buffer that can queue work functions. Do not handle errors in a
// separate goroutine.
group := ctxerrgroup.New(4, 8, false, errorHandler)

The first argument is the number of workers. The number of workers is the maximum number of goroutines that can be working on work at any one time. If the number of workers is 0, the group will be useless.

The second argument is the buffer size. The buffer size is the quantity of ctxerrgroup.Work functions that are queued to be sent to the group. If the buffer size is zero, all calls to group.AddWorkItem will block until a worker is free to accept the given work (unless the async argument is set to true).

The third argument indicates whether error should be handled asynchronously. If set to true a new goroutine will be used to handle every error outgoing from the worker group. A value of false will cut down on the amount of goroutines used, but block future workers with errors until the error handler returns.

The fourth argument is the error handler created in the previous step.

Create work

Work sent to the worker group must match the ctxerrgroup.Work function signature: type Work func(workCtx context.Context) (err error) and is expected to respect its given context, ctx. If the context is not respected and the worker group is killed, the goroutine performing the work will leak.

Here is an example of some work that respects its context:

// Create the worker function.
var work ctxerrgroup.Work
work = func(ctx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader([]byte{})); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}

Here is an example of some work that does the same thing without respecting its own context:

// Create the worker function.
var work ctxerrgroup.Work
work = func(ctx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequest(http.MethodGet, u, bytes.NewReader([]byte{})); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}

Since most functions do not match the ctxerrgroup.Work signature, function closures are typically the desired way to create worker functions.

// Create some variables to inherit through a closure.
httpClient := &http.Client{}
u := "https://golang.org"
logger := log.New(os.Stdout, "status codes: ", 0)

// Create the worker function.
var work ctxerrgroup.Work
work = func(ctx context.Context) (err error) {

	// Create the HTTP request.
	var req *http.Request
	if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader([]byte{})); err != nil {
		return err
	}

	// Do the HTTP request.
	var resp *http.Response
	if resp, err = httpClient.Do(req); err != nil {
		return err
	}

	// Log the status code.
	logger.Println(resp.StatusCode)

	return nil
}
Create a context

Work and context have a 1:1 relationship when used in a worker group. Each context's cancel function is called on work completion. Never send work with a context with a context that has been used before.

Here is an example of what not to do:

// This context will cause up to 4 jobs to fail.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

// Do the work 16 times.
for i := 0; i < 16; i++ {

	// Send the work to the group.
	group.AddWorkItem(ctx, cancel, work)
}

Instead, create the context in the loop and do not shadow a context variable from out of scope.

// Do the work 16 times.
for i := 0; i < 16; i++ {

	// Create a context for the work.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)

	// Send the work to the group.
	group.AddWorkItem(ctx, cancel, false, work)
}
Send work

// Wait for the group to finish.
group.Wait()

The first and second arguments are the unique context and cancellation functions for the given work.

The third argument indicates if the work should be sent asynchronously. If true, the SendWork method will not block when sending work to the worker group. This effectively creates a buffer of infinity work functions ready to be sent to worker goroutines. If set to true, an additional goroutine will be created to send the work to a worker in the group. If false the SendWork method will block if the group's buffer is full and no worker is ready for more work. (The buffer was created when the group was created.)

The fourth argument is the work function created in a previous step.

Let the work finish

group.Wait()

or

<-group.Done()

Both statements will block until the worker group has completed all given work. The Done method is idea for select statements.

Clean up the worker group

group.Kill()

Killing the worker groups isn't required, but if the worker group is no longer being used it's best to tell all its goroutines to return to reclaim their resources. If the program's main function is about to end, group.Kill() will be accomplished regardless.

Test Coverage

Testing coverage for this repository is currently greater than 90%. Depending on how Go runtime schedules things, certain paths may or may not be executed. This is based on a sample of 1000 tests with coverage and race detection. All tests pass without any detected race conditions.

If you are interested in test coverage, please view the cmd/coverage.go tool and the cmd/profiles directory. The directory has log output from the tool and testing coverage profiles for multiple samples of the 1000 tests.

In my experience, test coverage that counts the number of lines executed can be a misleading number. While I believe the current written tests adequately cover the code base, if there are test cases that are not covered by the current testing files, please feel free to add an issue requesting the test case or create an MR that adds tests.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrCantDo indicates that there was a failure to send the function to work on to a worker before the context
	// expired.
	ErrCantDo = errors.New("failed to send work item to a worker before the context expired")
)

Functions

This section is empty.

Types

type ErrorHandler

type ErrorHandler func(group Group, err error)

ErrorHandler is a function that receives an error and handles it.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group is the way to control a group of worker goroutines that understand context.Context and error handling.

func New

func New(workers uint, workItemBuffer uint, asyncErrors bool, errorHandler ErrorHandler) Group

New creates a new Group.

func (Group) AddWorkItem

func (g Group) AddWorkItem(ctx context.Context, cancel context.CancelFunc, work Work)

AddWorkItem takes in context information and a Work function and gives it to a worker. This can block if all workers are busy and the work item buffer is full. This function will block if no workers are ready. Call with the go keyword to launch it in another goroutine to guarantee no blocking.

func (Group) Dead

func (g Group) Dead() bool

Dead determines if the group is dead.

func (Group) Death

func (g Group) Death() <-chan struct{}

Death returns a channel that will close when the Group has died.

func (Group) Done

func (g Group) Done() <-chan struct{}

Done mimics the functionality of the context.Context Done method. It returns a channel that will close when all given work has been completed or when the group dies.

func (Group) Kill

func (g Group) Kill()

Kill tells all the worker goroutines and work items to end.

func (Group) Wait

func (g Group) Wait()

Wait mimics the functionality of the sync.WaitGroup Wait method. It returns when all given work has been completed or when the group dies.

type Work

type Work func(workCtx context.Context) (err error)

Work is a function that utilizes the given context properly and returns an error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL