parallel

package
v0.0.0-...-6244057 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2018 License: AGPL-3.0 Imports: 6 Imported by: 16

Documentation

Index

Constants

View Source
const (
	// use N workers per cpu
	NUMBER_OF_WORKERS_MULTIPLIER = 2

	// use N buffers per worker
	BUFFER_SIZE_MULTIPLIER = 4

	// use N buffers per worker when working with files
	// this is useful, if fadvise is used
	FILE_BUFFER_SIZE_MULTIPLIER = 8
)

Variables

This section is empty.

Functions

func RecoverClosedChannel

func RecoverClosedChannel()

recover the panic 'send on closed channel' and ignore it. otherwise panic some more.

func SETMAXPROCS

func SETMAXPROCS()

func SuggestBufferSize

func SuggestBufferSize(max uint) uint

usually it's a good idea to have 4 * more buffers then workers; please state a max

func SuggestFileBufferSize

func SuggestFileBufferSize(workers uint) uint

suggest a size for filepaths to buffer

func SuggestMaximumNumberOfWorkers

func SuggestMaximumNumberOfWorkers(max uint) uint

suggest a number of workers.

func SuggestMinimumBufferSize

func SuggestMinimumBufferSize(min uint) uint

suggest a minimum buffer size; please state a minimum

func SuggestNumberOfWorkers

func SuggestNumberOfWorkers() uint

suggest a number of workers.

Types

type DoneInterface

type DoneInterface interface {
	Done()
}

type LockInterface

type LockInterface interface {
	Lock()
	Unlock()
}

backup copy

type TickWorkInterface

type TickWorkInterface interface {
	// use time.Tick to execute a function periodically
	Tick(duration time.Duration, tick func()) chan bool
}

type TimeoutWorkInterface

type TimeoutWorkInterface interface {
	// after duration, call cancel function
	Timeout(duration time.Duration, cancel func()) WorkInterface
}

type WaitGroupInterface

type WaitGroupInterface interface {
	WaitInterface
	DoneInterface
	Add(delta int)
}

type WaitInterface

type WaitInterface interface {
	Wait()
}

wait for something

type Work

type Work struct {
	sync.Mutex
	// contains filtered or unexported fields
}

* Work has locks and can utilize wait groups. * Locks are a feature for the working in the goroutines. Internally * locking is only used for Wait() to be thread safe. * Wait groups are used with Start() and it's wrapper Do() to ensure * all work is done. With Run() wait groups are not used and a call * to Wait() will return immidiatly. This however is not enforced to * make it possible to use Start() and Run() at the same time for * different purposes. * * See work_test.go for examples.

func NewWork

func NewWork() (work *Work)

New York; determine number of workers by number of CPU or amaxworkers, whichever smaller

func NewWorkFinally

func NewWorkFinally(afinally func()) (work *Work)

New York Finally

func NewWorkFinallyManual

func NewWorkFinallyManual(aworkers uint, afinally func()) (work *Work)

New York Finally with manual setting of workers

func NewWorkManual

func NewWorkManual(aworkers uint) (work *Work)

manually set number of workers

func (*Work) Do

func (work *Work) Do(worker func())

do some work; this is equivalent to calling Start() and then Wait(). calls finally func afterwards. guarantees all work is done. it blocks until all workers finish. should not be called with the go command.

func (*Work) Feed

func (work *Work) Feed(feeder func()) WorkInterface

feed workers; yummy; loop over something and push each value on a channel. it is good practise to first start feeding and then to open the workers via Do(), Start() or Run().

func (*Work) Go

func (work *Work) Go(worker func()) WorkInterface

go one worker with waitgroup

func (*Work) Initialise

func (work *Work) Initialise(aworkers uint)

i n i t i a l i z e

func (*Work) Run

func (work *Work) Run(worker func())

start workers - 1 in separate goroutines and run one in this goroutine. intended for (web)servers

func (*Work) Start

func (work *Work) Start(worker func()) WorkInterface

start workers; a worker should read from a channel and from that do work does not call finally as it doesn't record when or if a goroutine finishes. can guarantee all work is done, if work.Wait is called. does not block.

func (*Work) SuggestBufferSize

func (work *Work) SuggestBufferSize(_ uint) uint

usually it's a good idea to have 4 * more buffers then workers; please state a max

func (*Work) SuggestFileBufferSize

func (work *Work) SuggestFileBufferSize() uint

suggest a size for filepaths to buffer

func (*Work) SuggestMinimumBufferSize

func (work *Work) SuggestMinimumBufferSize(min uint) uint

suggest a minimum buffer size; please state a minimum

func (*Work) Tick

func (work *Work) Tick(duration time.Duration, tick func()) (cancel chan bool)

feed workers by ticks; the tick function must not loop indefinitly, but return as quickly as possible. returns a channel that must be closed or a boolean value send to, when not needed anymore otherwise the tick will run forever.

func (*Work) Timeout

func (work *Work) Timeout(duration time.Duration, cancel func()) WorkInterface

cancel a work. this should be preferably done by closing a channel. alternativly use a second, bool typed, channel to send a cancel signal.

func (*Work) Wait

func (work *Work) Wait()

a wrapper around the internally used *sync.WaitGroup.Wait(); thread safe

func (*Work) Workers

func (work *Work) Workers() uint

return number of configured workers

type WorkHelpInterface

type WorkHelpInterface interface {
	// suggest a buffer size, best according to number of CPUs
	SuggestBufferSize() uint

	// suggest a buffer size for filesystem i/o
	SuggestFileBufferSize() uint
}

type WorkInt

type WorkInt struct {
	Work

	Talk chan int
}

func NewIntsFeeder

func NewIntsFeeder(list ...int) (work *WorkInt)

type WorkInterface

type WorkInterface interface {
	LockInterface

	// wrapper around sync.WaitGroup.Wait(). must be thread safe (for example with sync.Once).
	// only works if Start() or it's wrapper Do() are used.
	WaitInterface

	// used to enable other packages to implement a custom worker
	Initialise(aworkers uint)

	// start workers - 1 in separate goroutines and run one
	// worker in this goroutine. the worker should block till the work is done.
	// it is not guaranteed that all work will be processed nor that the last
	// worker is really the last finishing (which can mean, that workers still
	// processing work can be interrupted and shut down by program termination).
	// intended for webservers, where the workers run in endless loops.
	Run(worker func())

	// wrapper around Start() which Waits() until all workers finish and then returns
	Do(worker func())

	// Start() N workers
	Start(worker func()) WorkInterface

	// start one feeding goroutine. can be used multiple times
	Feed(feeder func()) WorkInterface

	// suggest a buffer size, best according to number of CPUs
	// FIXME remove max uint argument
	SuggestBufferSize(max uint) uint

	// suggest a buffer size for filesystem i/o
	SuggestFileBufferSize() uint
}

func OpenFileDoWork

func OpenFileDoWork(helper *iotool.FileHelper, path string, worker func(*iotool.NamedBuffer)) (work WorkInterface)

Open File and Do Work

func OpenFilesDoWork

func OpenFilesDoWork(helper *iotool.FileHelper, paths <-chan string, worker func(*iotool.NamedBuffer)) (work WorkInterface)

Open N Files and Do Work

func OpenFilesFromListDoWork

func OpenFilesFromListDoWork(helper *iotool.FileHelper, worker func(*iotool.NamedBuffer), paths ...string) (work WorkInterface)

Open N Files and Do Work

func ReadFilesFilteredSequential

func ReadFilesFilteredSequential(helper *iotool.FileHelper, paths []string,
	filter func(reader io.Reader) io.Reader,
	worker func(*iotool.NamedBuffer)) (work WorkInterface)

read files sequential to a buffer. uses file advice will need and thus relies on kernel page caching. this is good for being a team player, but may hurt performance especially in syntethic tests.

func ReadFilesSequential

func ReadFilesSequential(helper *iotool.FileHelper, paths []string, worker func(*iotool.NamedBuffer)) (work WorkInterface)

read files sequential to a buffer

type WorkString

type WorkString struct {
	Work

	Talk chan string
}

func NewStringsFeeder

func NewStringsFeeder(list ...string) (work *WorkString)

type WorkUint

type WorkUint struct {
	Work

	Talk chan uint
}

func NewUintsFeeder

func NewUintsFeeder(list ...uint) (work *WorkUint)

type WorkUint64

type WorkUint64 struct {
	Work

	Talk chan uint64
}

func NewUints64Feeder

func NewUints64Feeder(list ...uint64) (work *WorkUint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL