Documentation ¶
Overview ¶
Package to apply a function over an array or stream of data.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Lazily ¶
func Lazily(f Evaluator, lookahead int, reaper <-chan struct{}, init ...interface{}) func() interface{}
Lazily is function to generate a lazy evaluator.
Lazy functions are terminated by closing the reaper channel. nil should be passed as a reaper for perpetual lazy functions.
Example ¶
package main import ( "github.com/biogo/biogo/concurrent" "fmt" ) func main() { sentence := "A sentence to be slowly ROT'ed." ROT13 := func(b byte) byte { c := b & ('a' - 'A') i := b&^c - 'A' if i < 26 { return (i+13)%26 + 'A' | c } return b } mutator := concurrent.Lazily( func(state ...interface{}) (interface{}, concurrent.State) { b, c := []byte(state[0].(string)), state[1].(int) b[c] = ROT13(b[c]) ms := string(b) return state[0], concurrent.State{ms, (c + 1) % len(ms)} }, 0, // No lookahead nil, // Perpetual evaluator sentence, 0, // Initial state ) var r string for i := 0; i < len(sentence)*2; i++ { r = mutator().(string) if i%10 == 0 { fmt.Println(r) } } fmt.Println(r) }
Output: A sentence to be slowly ROT'ed. N fragrapr to be slowly ROT'ed. N fragrapr gb or fybwly ROT'ed. N fragrapr gb or fybjyl EBG'rq. A sentencr gb or fybjyl EBG'rq. A sentence to be slbjyl EBG'rq. A sentence to be slowly ROT'eq. A sentence to be slowly ROT'ed.
func Map ¶
Map routines to iterate a function over an array, potentially splitting the array slice into chunks so that each chunk is processed concurrently. When using concurrent processing the Chunk size is either the nearest even division of the total array over the chosen concurrent processing goroutines or a specified maximum chunk size, whichever is smaller. Reducing chunk size can reduce the impact of divergence in time for processing chunks, but may add to overhead.
Example ¶
package main import ( "github.com/biogo/biogo/concurrent" "fmt" ) type CountConsumer []int func (c CountConsumer) Slice(i, j int) concurrent.Mapper { return c[i:j] } func (c CountConsumer) Len() int { return len(c) } func (c CountConsumer) Operation() (r interface{}, err error) { var sum int for i, v := range c { sum += v c[i] = 0 } return sum, nil } func main() { c := CountConsumer{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} fmt.Println(c) for c.Len() > 1 { result, err := concurrent.Map(c, 1, 2) if err != nil { fmt.Println(err) } else { fmt.Println(result) c = c[:0] for _, r := range result { c = append(c, r.(int)) } } } }
Output: [1 2 3 4 5 6 7 8 9 10] [3 7 11 15 19] [10 26 19] [36 19] [55]
Types ¶
type Concurrent ¶
type Concurrent interface { Process(...interface{}) Result() (interface{}, error) }
The Concurrent interface represents a processor that allows adding jobs and retrieving results
type Evaluator ¶
type Evaluator func(...interface{}) (interface{}, State)
Evaluator is a function for lazy evaluation.
type Operator ¶
type Operator interface {
Operation() (interface{}, error)
}
Interface is a type that performs an operation on itself, returning any error.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
The Processor type manages a number of concurrent Processes.
func NewProcessor ¶
Return a new Processor to operate the function f over the number of threads specified taking input from queue and placing the result in buffer. Threads is limited by GOMAXPROCS, if threads is greater GOMAXPROCS or less than 1 then threads is set to GOMAXPROCS.
type Promise ¶
type Promise struct {
// contains filtered or unexported fields
}
Implementation of a promise multiple goroutine synchronisation and communication system based on the approach used in Alice. Promises will safely allow multiple promisers to interact with multiple promisees.
New or non-error Broken Promises can be Fulfilled or Failed. Fulfilled or Failed Promises can be Broken and any state of Promise can be Recovered if specified at creation.
Promises can be mutable or not, recoverable or not and may relay internal error states to other listeners. Mutable promises may have their value state changed with subsequence Fulfill calls. Recoverable promises may be recovered after a Fail call. Promises created with relay set to true will relay an error generated by attempting to fulfill an immutable fulfilled promise.
func PromiseMap ¶
A future Map function - synchronisation is via a Promise.
Example ¶
package main import ( "github.com/biogo/biogo/concurrent" "fmt" "time" ) type SlowCounter []int func (c SlowCounter) Slice(i, j int) concurrent.Mapper { return c[i:j] } func (c SlowCounter) Len() int { return len(c) } func (c SlowCounter) Operation() (r interface{}, err error) { var sum int for _, v := range c { sum += v time.Sleep(1e8) } return sum, nil } func main() { c := SlowCounter{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} p := concurrent.PromiseMap(c, 1, 2) fmt.Println("Waiting...") request1 := <-p.Wait() if request1.Err != nil { fmt.Println(request1.Err) } else { fmt.Println(request1.Value) } request2 := <-p.Wait() if request2.Err != nil { fmt.Println(request2.Err) } else { fmt.Println(request2.Value) } }
Output: Waiting... [3 7 11 15 19] [3 7 11 15 19]
func (*Promise) Break ¶
func (p *Promise) Break()
Break an already fulfilled or failed promise, blocking all listeners.