Documentation ¶
Index ¶
- Constants
- func RecoverClosedChannel()
- func SETMAXPROCS()
- func SuggestBufferSize(max uint) uint
- func SuggestFileBufferSize(workers uint) uint
- func SuggestMaximumNumberOfWorkers(max uint) uint
- func SuggestMinimumBufferSize(min uint) uint
- func SuggestNumberOfWorkers() uint
- type DoneInterface
- type LockInterface
- type TickWorkInterface
- type TimeoutWorkInterface
- type WaitGroupInterface
- type WaitInterface
- type Work
- func (work *Work) Do(worker func())
- func (work *Work) Feed(feeder func()) WorkInterface
- func (work *Work) Go(worker func()) WorkInterface
- func (work *Work) Initialise(aworkers uint)
- func (work *Work) Run(worker func())
- func (work *Work) Start(worker func()) WorkInterface
- func (work *Work) SuggestBufferSize(_ uint) uint
- func (work *Work) SuggestFileBufferSize() uint
- func (work *Work) SuggestMinimumBufferSize(min uint) uint
- func (work *Work) Tick(duration time.Duration, tick func()) (cancel chan bool)
- func (work *Work) Timeout(duration time.Duration, cancel func()) WorkInterface
- func (work *Work) Wait()
- func (work *Work) Workers() uint
- type WorkHelpInterface
- type WorkInt
- type WorkInterface
- func OpenFileDoWork(helper *iotool.FileHelper, path string, worker func(*iotool.NamedBuffer)) (work WorkInterface)
- func OpenFilesDoWork(helper *iotool.FileHelper, paths <-chan string, ...) (work WorkInterface)
- func OpenFilesFromListDoWork(helper *iotool.FileHelper, worker func(*iotool.NamedBuffer), paths ...string) (work WorkInterface)
- func ReadFilesFilteredSequential(helper *iotool.FileHelper, paths []string, ...) (work WorkInterface)
- func ReadFilesSequential(helper *iotool.FileHelper, paths []string, worker func(*iotool.NamedBuffer)) (work WorkInterface)
- type WorkString
- type WorkUint
- type WorkUint64
Constants ¶
const ( // use N workers per cpu NUMBER_OF_WORKERS_MULTIPLIER = 2 // use N buffers per worker BUFFER_SIZE_MULTIPLIER = 4 // use N buffers per worker when working with files // this is useful, if fadvise is used FILE_BUFFER_SIZE_MULTIPLIER = 8 )
Variables ¶
This section is empty.
Functions ¶
func RecoverClosedChannel ¶
func RecoverClosedChannel()
recover the panic 'send on closed channel' and ignore it. otherwise panic some more.
func SETMAXPROCS ¶
func SETMAXPROCS()
func SuggestBufferSize ¶
usually it's a good idea to have 4 * more buffers then workers; please state a max
func SuggestFileBufferSize ¶
suggest a size for filepaths to buffer
func SuggestMaximumNumberOfWorkers ¶
suggest a number of workers.
func SuggestMinimumBufferSize ¶
suggest a minimum buffer size; please state a minimum
Types ¶
type DoneInterface ¶
type DoneInterface interface {
Done()
}
type TickWorkInterface ¶
type TimeoutWorkInterface ¶
type TimeoutWorkInterface interface { // after duration, call cancel function Timeout(duration time.Duration, cancel func()) WorkInterface }
type WaitGroupInterface ¶
type WaitGroupInterface interface { WaitInterface DoneInterface Add(delta int) }
type Work ¶
* Work has locks and can utilize wait groups. * Locks are a feature for the working in the goroutines. Internally * locking is only used for Wait() to be thread safe. * Wait groups are used with Start() and it's wrapper Do() to ensure * all work is done. With Run() wait groups are not used and a call * to Wait() will return immidiatly. This however is not enforced to * make it possible to use Start() and Run() at the same time for * different purposes. * * See work_test.go for examples.
func NewWork ¶
func NewWork() (work *Work)
New York; determine number of workers by number of CPU or amaxworkers, whichever smaller
func NewWorkFinallyManual ¶
New York Finally with manual setting of workers
func (*Work) Do ¶
func (work *Work) Do(worker func())
do some work; this is equivalent to calling Start() and then Wait(). calls finally func afterwards. guarantees all work is done. it blocks until all workers finish. should not be called with the go command.
func (*Work) Feed ¶
func (work *Work) Feed(feeder func()) WorkInterface
feed workers; yummy; loop over something and push each value on a channel. it is good practise to first start feeding and then to open the workers via Do(), Start() or Run().
func (*Work) Run ¶
func (work *Work) Run(worker func())
start workers - 1 in separate goroutines and run one in this goroutine. intended for (web)servers
func (*Work) Start ¶
func (work *Work) Start(worker func()) WorkInterface
start workers; a worker should read from a channel and from that do work does not call finally as it doesn't record when or if a goroutine finishes. can guarantee all work is done, if work.Wait is called. does not block.
func (*Work) SuggestBufferSize ¶
usually it's a good idea to have 4 * more buffers then workers; please state a max
func (*Work) SuggestFileBufferSize ¶
suggest a size for filepaths to buffer
func (*Work) SuggestMinimumBufferSize ¶
suggest a minimum buffer size; please state a minimum
func (*Work) Tick ¶
feed workers by ticks; the tick function must not loop indefinitly, but return as quickly as possible. returns a channel that must be closed or a boolean value send to, when not needed anymore otherwise the tick will run forever.
func (*Work) Timeout ¶
func (work *Work) Timeout(duration time.Duration, cancel func()) WorkInterface
cancel a work. this should be preferably done by closing a channel. alternativly use a second, bool typed, channel to send a cancel signal.
type WorkHelpInterface ¶
type WorkInt ¶
func NewIntsFeeder ¶
type WorkInterface ¶
type WorkInterface interface { LockInterface // wrapper around sync.WaitGroup.Wait(). must be thread safe (for example with sync.Once). // only works if Start() or it's wrapper Do() are used. WaitInterface // used to enable other packages to implement a custom worker Initialise(aworkers uint) // start workers - 1 in separate goroutines and run one // worker in this goroutine. the worker should block till the work is done. // it is not guaranteed that all work will be processed nor that the last // worker is really the last finishing (which can mean, that workers still // processing work can be interrupted and shut down by program termination). // intended for webservers, where the workers run in endless loops. Run(worker func()) // wrapper around Start() which Waits() until all workers finish and then returns Do(worker func()) // Start() N workers Start(worker func()) WorkInterface // start one feeding goroutine. can be used multiple times Feed(feeder func()) WorkInterface // suggest a buffer size, best according to number of CPUs // FIXME remove max uint argument SuggestBufferSize(max uint) uint // suggest a buffer size for filesystem i/o SuggestFileBufferSize() uint }
func OpenFileDoWork ¶
func OpenFileDoWork(helper *iotool.FileHelper, path string, worker func(*iotool.NamedBuffer)) (work WorkInterface)
Open File and Do Work
func OpenFilesDoWork ¶
func OpenFilesDoWork(helper *iotool.FileHelper, paths <-chan string, worker func(*iotool.NamedBuffer)) (work WorkInterface)
Open N Files and Do Work
func OpenFilesFromListDoWork ¶
func OpenFilesFromListDoWork(helper *iotool.FileHelper, worker func(*iotool.NamedBuffer), paths ...string) (work WorkInterface)
Open N Files and Do Work
func ReadFilesFilteredSequential ¶
func ReadFilesFilteredSequential(helper *iotool.FileHelper, paths []string, filter func(reader io.Reader) io.Reader, worker func(*iotool.NamedBuffer)) (work WorkInterface)
read files sequential to a buffer. uses file advice will need and thus relies on kernel page caching. this is good for being a team player, but may hurt performance especially in syntethic tests.
func ReadFilesSequential ¶
func ReadFilesSequential(helper *iotool.FileHelper, paths []string, worker func(*iotool.NamedBuffer)) (work WorkInterface)
read files sequential to a buffer
type WorkString ¶
func NewStringsFeeder ¶
func NewStringsFeeder(list ...string) (work *WorkString)
type WorkUint ¶
func NewUintsFeeder ¶
type WorkUint64 ¶
func NewUints64Feeder ¶
func NewUints64Feeder(list ...uint64) (work *WorkUint64)