workgroup

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: MIT Imports: 4 Imported by: 1

README

Work Group

Go Reference

This module (github.com/tschaub/workgroup) provides a group for executing tasks with limited concurrency.

Installation

Requires Go >= 1.18

go get github.com/tschaub/workgroup

Use

See the reference documentation for example usage.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmptyQueue = errors.New("empty queue")

ErrEmptyEqueue is returned when the queue is empty.

Functions

func NewDefaultQueue added in v0.4.1

func NewDefaultQueue[T any]() *defaultQueue[T]

NewDefaultQueue returns an in-memory queue. Workgroups created without a queue will get one of these.

Types

type Options added in v0.3.0

type Options[T any] struct {
	Context context.Context
	Limit   int
	Queue   Queue[T]
	Work    WorkFunc[T]
}

Options for constructing a new worker. Only the Work function is required. No more than the Limit number of tasks will be executed concurrently.

By default, a limit of 1 is used and an in-memory queue is used. You can provide your own queue that implements the Queue interface.

type Queue added in v0.3.0

type Queue[T any] interface {
	// Add task data to the queue.
	Add(ctx context.Context, data T) error

	// Next task data in the queue.  Returns ErrEmptyQueue if the queue is empty.
	Next(ctx context.Context) (data T, err error)

	// HasNext returns true if the queue has more data.
	HasNext(ctx context.Context) bool
}

Queue holds task data to be executed.

By default, the workgroup uses an in-memory queue. You can provide your own queue that implements this interface.

type WorkFunc

type WorkFunc[T any] func(*Worker[T], T) error

WorkFunc is called by the worker with task data. The work function receives a pointer to the worker so that it can add additional task data.

type Worker

type Worker[T any] struct {
	// contains filtered or unexported fields
}

Worker executes a group of tasks that may grow over time.

Example
package main

import (
	"fmt"
	"time"

	"github.com/tschaub/workgroup"
)

func main() {
	worker := workgroup.New(&workgroup.Options[string]{
		Work: func(w *workgroup.Worker[string], data string) error {
			if len(data) == 0 {
				return nil
			}

			// do some work
			fmt.Printf("working on %s...\n", data)
			time.Sleep(10 * time.Millisecond)

			// spawn more work
			err := w.Add(data[1:])
			if err != nil {
				fmt.Printf("unexpected errror: %s\n", err)
			}

			return nil
		},
	})

	err := worker.Add("abcdef")
	if err != nil {
		fmt.Printf("unexpected errror: %s\n", err)
	}

	err = worker.Wait()
	if err != nil {
		fmt.Printf("unexpected errror: %s\n", err)
	}

}
Output:

working on abcdef...
working on bcdef...
working on cdef...
working on def...
working on ef...
working on f...
Example (Context)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/tschaub/workgroup"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	worker := workgroup.New(&workgroup.Options[string]{
		Context: ctx,
		Work: func(w *workgroup.Worker[string], data string) error {
			if len(data) == 3 {
				cancel()
				return nil
			}

			// do some work
			fmt.Printf("working on %s...\n", data)
			time.Sleep(10 * time.Millisecond)

			// spawn more work
			err := w.Add(data[1:])
			if err != nil {
				fmt.Printf("unexpected errror: %s\n", err)
			}

			return nil
		},
	})

	err := worker.Add("abcdef")
	if err != nil {
		fmt.Printf("unexpected errror: %s\n", err)
	}

	err = worker.Wait()
	if err != nil {
		fmt.Printf("unexpected errror: %s\n", err)
	}

}
Output:

working on abcdef...
working on bcdef...
working on cdef...

func New added in v0.3.0

func New[T any](opts *Options[T]) *Worker[T]

New creates a new worker.

func (*Worker[T]) Add

func (w *Worker[T]) Add(data T) error

Add queues up additional task data.

func (*Worker[T]) Context

func (w *Worker[T]) Context() context.Context

Context returns the group context while tasks are being executed.

func (*Worker[T]) Wait

func (w *Worker[T]) Wait() error

Wait blocks until all tasks have been executed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL