Documentation
¶
Overview ¶
Package functional provides functional programming constructs.
Index ¶
- Variables
- func EmitAll(s Stream, e Emitter) (opened bool)
- func MultiConsume(s Stream, ptr interface{}, copier Copier, consumers ...Consumer) (errors []error)
- func WaitForClose(e Emitter)
- type CompositeMapper
- type Consumer
- type ConsumerFunc
- type Copier
- type Creater
- type Emitter
- type Filterer
- type Mapper
- type Rows
- type Stream
- func Concat(s ...Stream) Stream
- func Count() Stream
- func CountFrom(start, step int) Stream
- func Cycle(f func() Stream) Stream
- func Deferred(f func() Stream) Stream
- func DropWhile(f Filterer, s Stream) Stream
- func Filter(f Filterer, s Stream) Stream
- func Flatten(s Stream) Stream
- func Map(f Mapper, s Stream, ptr interface{}) Stream
- func Merge(creater Creater, copier Copier, before func(lhs, rhs interface{}) bool, ...) Stream
- func NewGenerator(f func(e Emitter) error) Stream
- func NewStreamFromPtrs(aSlice interface{}, c Copier) Stream
- func NewStreamFromStreamFunc(f func() Stream) Stream
- func NewStreamFromValues(aSlice interface{}, c Copier) Stream
- func NilStream() Stream
- func NoCloseStream(s Stream) Stream
- func ReadLines(r io.Reader) Stream
- func ReadLinesAndClose(r io.ReadCloser) Stream
- func ReadRows(r Rows) Stream
- func Slice(s Stream, start int, end int) Stream
- func TakeWhile(f Filterer, s Stream) Stream
- type Tuple
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Done indicates that the end of a Stream has been reached Done = errors.New("functional: End of Stream reached.") // Filters return Skipped to indicate that the current value should be // skipped. Skipped = errors.New("functional: Value skipped.") )
Functions ¶
func EmitAll ¶
EmitAll emits all of Stream s to Emitter e. If the Stream for e becomes closed, EmitAll returns false. Otherwise EmitAll returns true.
func MultiConsume ¶
MultiConsume consumes the values of s, a Stream of T, sending those T values to each Consumer in consumers. MultiConsume consumes values from s until no Consumer in consumers is accepting values. ptr is a *T that receives the values from s. copier is a Copier of T used to copy T values to the Streams sent to each Consumer in consumers. Passing null for copier means use simple assignment. MultiConsume returns all the errors from the individual Consume methods. The order of the returned errors matches the order of the consumers.
func WaitForClose ¶
func WaitForClose(e Emitter)
Use WaitForClose in emitting functions (See description for NewGenerator). WaitForClose yields execution to the caller until caller calls Close on associated Stream while returning Done each time caller calls Next. An emitting function calls WaitForClose when it is done emitting values. but before it does any final cleanup.
Types ¶
type CompositeMapper ¶
type CompositeMapper struct {
// contains filtered or unexported fields
}
CompositeMapper represents Mappers composed together e.g f(g(x)). Programs using CompositeMapper should typically store and pass them as values, not pointers. A CompositeMapper can be used by multiple goroutines simultaneously if its underlying Mappers can be used by multiple goroutines simultaneously. The zero value for CompositeMapper is a Mapper that maps nothing (the Map method always returns Skipped).
func Compose ¶
func Compose(f Mapper, g Mapper, c Creater) CompositeMapper
Compose composes two Mappers together into one e.g f(g(x)). If g maps type T values to type U values, and f maps type U values to type V values, then Compose returns a CompositeMapper mapping T values to V values. c is a Creater of U. Each time Map is called on returned CompositeMapper, it invokes c to create a U value to receive the intermediate result from g.
func (CompositeMapper) Fast ¶
func (c CompositeMapper) Fast() Mapper
Fast returns a quicker version of this CompositeMapper that cannot be used by multiple goroutines simultaneously as if FastCompose were used.
func (CompositeMapper) Map ¶
func (c CompositeMapper) Map(srcPtr interface{}, destPtr interface{}) error
type Consumer ¶
A Consumer of T consumes the T values from a Stream of T.
func CompositeConsumer ¶
CompositeConsumer returns a Consumer that sends values it consumes to each one of consumers. The returned Consumer's Consume method reports an error if the Consume method in any of consumers reports an error. ptr is a *T where T values being consumed are temporarily held; copier knows how to copy the values of type T being consumed (can be nil if simple assignment should be used). If caller passes a slice for consumers, no copy is made of it.
func FilterConsumer ¶
FilterConsumer creates a new Consumer whose Consume method applies f to the Stream before passing it onto c.
func MapConsumer ¶
MapConsumer creates a new Consumer whose Consume method applies m to the Stream before passing it onto c. c cnsumes U values; m maps T values to U Values; ptr is *T to temporarily hold T values, and this function returns a consumer of T values.
func ModifyConsumer ¶
ModifyConsumer returns a new Consumer that applies f to its Stream and then gives the resulting Stream to c. If c is a Consumer of T and f takes a Stream of U and returns a Stream of T, then ModifyConsumer returns a Consumer of U. The Consume method of the returned Consumer will close the Stream that f returns but not the original Stream. It does this by wrapping the original Stream with NoCloseStream.
func NilConsumer ¶
func NilConsumer() Consumer
NilConsumer returns a consumer that consumes no values.
type ConsumerFunc ¶
ConsumerFunc is an adapter that allows ordinary functions to be used as
Consumers.
func (ConsumerFunc) Consume ¶
func (f ConsumerFunc) Consume(s Stream) error
type Copier ¶
type Copier func(src, dest interface{})
Copier of T copies the value at src to the value at dest. src and dest are of type *T and both point to pre-initialized T.
type Creater ¶
type Creater func() interface{}
Creater of T creates a new, pre-initialized, T and returns a pointer to it.
type Emitter ¶
type Emitter interface { // EmitPtr returns the pointer supplied to Next of associated Stream. // If Close is called on associated Stream, EmitPtr returns nil and false. EmitPtr() (ptr interface{}, streamOpened bool) // Return causes Next of associated Stream to return err. Return yields // execution to the caller of Next blocking until it calls Next again or // Close. Finally, Return returns the pointer passed to Next or nil and // false if caller called Close. Return(err error) (ptr interface{}, streamOpened bool) }
Emitter allows a function to emit values to an associated Stream.
type Filterer ¶
type Filterer interface { // Filter returns nil if value ptr points to should be included or Skipped // if value should be skipped. ptr must be a *T. Filter(ptr interface{}) error }
Filterer of T filters values in a Stream of T.
func All ¶
All returns a Filterer that returns nil if all of the fs return nil. Otherwise it returns the first error encountered.
func Any ¶
Any returns a Filterer that returns Skipped if all of the fs return Skipped. Otherwise it returns nil or the first error not equal to Skipped.
func NewFilterer ¶
NewFilterer returns a new Filterer of T. f takes a *T returning nil if T value pointed to it should be included or Skipped if it should not be included. f can return other errors too.
type Mapper ¶
type Mapper interface { // Map does the mapping storing the mapped value at destPtr. // If Mapper returns Skipped, then no mapped value is stored at destPtr. // Map may return other errors. srcPtr is a *T; destPtr is a *U Map(srcPtr interface{}, destPtr interface{}) error }
Mapper maps a type T value to a type U value in a Stream.
func FastCompose ¶
FastCompose works like Compose except that it uses a *U value instead of a Creater of U to link f ang g. ptr is the *U value. Intermediate results from g are stored at ptr. Unlike Compose, the Mapper that FastCompose returns cannot be used by multiple goroutines simultaneously since what ptr points to changes with each call to Map.
type Rows ¶
type Rows interface { // Next advances to the next row. Next returns false if there is no next row. // Every call to Scan, even the first one, must be preceded by a call to Next. Next() bool // Reads the values out of the current row. args are pointer types. Scan(args ...interface{}) error }
Rows represents rows in a database table. Most database API already have a type that implements this interface
type Stream ¶
type Stream interface { // Next emits the next value in this Stream of T. // If Next returns nil, the next value is stored at ptr. // If Next returns Done, then the end of the Stream has been reached, // and the value ptr points to is unspecified. ptr must be a *T. // Once Next returns Done, it should continue to return Done. Next(ptr interface{}) error // Caller calls Close when it is finished with this Stream. // The result of calling Next after Close is unspecified. io.Closer }
Stream is a sequence emitted values. Each call to Next() emits the next value in the stream. A Stream that emits values of type T is a Stream of T.
func Concat ¶
Concat concatenates multiple Streams into one. If x = (x1, x2, ...) and y = (y1, y2, ...) then Concat(x, y) = (x1, x2, ..., y1, y2, ...). Calling Close on returned Stream closes all underlying streams. If caller passes a slice to Concat, no copy is made of it.
func Count ¶
func Count() Stream
Count returns an infinite Stream of int which emits all values beginning at 0. Calling Close on returned Stream is a no-op.
func CountFrom ¶
CountFrom returns an infinite Stream of int emitting values beginning at start and increasing by step. Calling Close on returned Stream is a no-op.
func DropWhile ¶
DropWhile returns a Stream that emits the values in s starting at the first value where the Filter method of f returns Skipped. The returned Stream's Next method reports any errors that the Filter method of f returns until it returns Skipped. f is a Filterer of T; s is a Stream of T. Calling Close on returned Stream closes s.
func Filter ¶
Filter filters values from s, returning a new Stream of T. The returned Stream's Next method reports any errors besides Skipped that the Filter method of f returns. f is a Filterer of T; s is a Stream of T. Calling Close on returned Stream closes s.
func Flatten ¶
Flatten converts a Stream of Stream of T into a Stream of T. The returned Stream automatically closes each emitted Stream from s propagating any error from closing through Next. Calling Close on returned Stream closes s and the last emitted Stream from s currently being read.
func Map ¶
Map applies f, which maps a type T value to a type U value, to a Stream of T producing a new Stream of U. If s is (x1, x2, x3, ...), Map returns the Stream (f(x1), f(x2), f(x3), ...). If f returns Skipped for a T value, then the corresponding U value is left out of the returned stream. ptr is a *T providing storage for emitted values from s. If f is a CompositeMapper, Fast() is called on it automatically. Calling Close on returned Stream closes s.
func Merge ¶
func Merge( creater Creater, copier Copier, before func(lhs, rhs interface{}) bool, streams ...Stream) Stream
Merge merges multiple streams that emit their elments in order into a single stream that emits all the elements in order. Calling Close on returned Stream closes all underlying streams. If caller passes a slice to Merge, no copy is made of it. Each Stream in streams must emit elements in order according to before, and the resulting Stream emits all elements in order according to before. before returns true if the T element at lhs comes before the T element at rhs. lhs and rhs are *T.
func NewGenerator ¶
NewGenerator creates a Stream that emits the values from emitting function f. First, f emits values by calling EmitPtr and Return on the Emitter passed to it. When When f is through emitting values or when EmitPtr or Return returns false for streamOpened, f calls WaitForClose(), performs any necessary cleanup and finally returns the error that Close() on the associated Stream will return. Its very important that f calls WaitForClose() before performing cleanup to ensure that the cleanup is done after Close() is called on the associated Stream. Caller must call Close() on returned Stream or else the goroutine operating the Stream will never exit. Note that execution of f begins the first time the caller calls Next() or Close() on associated Stream.
Example ¶
package main import ( "fmt" "github.com/keep94/gofunctional3/functional" "time" ) // DateCount represents a count of some occurrence by date type DateCount struct { Date time.Time Count int64 } // YearCount represents a count of some occurrence by year type YearCount struct { Year int Count int64 } // ByYear takes a Stream of DateCount instances and returns a Stream of // YearCount instances by totaling the counts in the DateCount instances // by year. The DateCount instances must be ordered by Date. Caller must Close // the returned Stream. Calling Close on returned Stream also closes s. func ByYear(s functional.Stream) functional.Stream { return functional.NewGenerator(func(e functional.Emitter) error { var ptr interface{} var opened bool var incoming DateCount // As soon as caller calls Close() we quit without pulling unnecessary // values from underlying stream. if ptr, opened = e.EmitPtr(); !opened { return s.Close() } var err error // We get the first date while propagating any errors to the caller. // Note that we stay in this loop until either we get a first date or // caller calls Close() for err = s.Next(&incoming); err != nil; err = s.Next(&incoming) { // Propagate error and yield execution to caller. Then get the next // pointer caller pases to Next(). if ptr, opened = e.Return(err); !opened { return s.Close() } } currentYear := incoming.Date.Year() // Running total for current year sum := incoming.Count // When Done marker is reached, we have to emit the count of the final year. for err = s.Next(&incoming); err != functional.Done; err = s.Next(&incoming) { // Propagate any errors to caller. if err != nil { if ptr, opened = e.Return(err); !opened { return s.Close() } continue } year := incoming.Date.Year() // If year changed, emit the count for the current year. // Then change currentYear and and make sum be the running total for // that year. if year != currentYear { *ptr.(*YearCount) = YearCount{Year: currentYear, Count: sum} if ptr, opened = e.Return(nil); !opened { return s.Close() } sum = incoming.Count currentYear = year } else { sum += incoming.Count } } // Emit the final year. *ptr.(*YearCount) = YearCount{Year: currentYear, Count: sum} // Note that we return nil, not functional.Done, since we are emitting the // count for the final year. The call to WaitForClose() takes // care of returning functional.Done to the caller until the caller calls // Close() e.Return(nil) functional.WaitForClose(e) return s.Close() }) } func YMD(year, month, day int) time.Time { return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC) } func main() { s := functional.NewStreamFromPtrs( []*DateCount{ {YMD(2013, 5, 24), 13}, {YMD(2013, 4, 1), 5}, {YMD(2013, 1, 1), 8}, {YMD(2012, 12, 31), 24}, {YMD(2012, 5, 26), 10}}, nil) s = ByYear(s) defer s.Close() var yc YearCount for err := s.Next(&yc); err == nil; err = s.Next(&yc) { fmt.Printf("%d: %d\n", yc.Year, yc.Count) } }
Output: 2013: 26 2012: 34
func NewStreamFromPtrs ¶
NewStreamFromPtrs converts a []*T into a Stream of T. aSlice is a []*T. c is a Copier of T. If c is nil, regular assignment is used. Calling Close on returned Stream is a no-op.
func NewStreamFromStreamFunc ¶
NewStreamFromStreamFunc creates a Stream of Streams by repeatedly calling f. Calling Close on returned Stream is a no-op.
func NewStreamFromValues ¶
NewStreamFromValues converts a []T into a Stream of T. aSlice is a []T. c is a Copier of T. If c is nil, regular assignment is used. Calling Close on returned Stream is a no-op.
func NilStream ¶
func NilStream() Stream
NilStream returns a Stream that emits no values. Calling Close on returned Stream is a no-op.
func NoCloseStream ¶
NoCloseStream returns a Stream just like s but with a Close method that does nothing. This function is useful for preventing a stream from closing its underlying stream.
func ReadLines ¶
ReadLines returns the lines of text in r separated by either "\n" or "\r\n" as a Stream of string. The emitted string types do not contain the end of line characters. Calling Close on returned Stream does nothing.
func ReadLinesAndClose ¶
func ReadLinesAndClose(r io.ReadCloser) Stream
ReadLinesAndClose works just like ReadLines except that calling Close on returned Stream closes r.
func ReadRows ¶
ReadRows returns the rows in a database table as a Stream of Tuple. Calling Close on returned stream does nothing.
func Slice ¶
Slice returns a Stream that will emit elements in s starting at index start and continuing to but not including index end. Indexes are 0 based. If end is negative, it means go to the end of s. Calling Close on returned Stream closes s.
func TakeWhile ¶
TakeWhile returns a Stream that emits the values in s until the Filter method of f returns Skipped. The returned Stream's Next method reports any errors besides Skipped that the Filter method of f returns. Calling Close on returned Stream closes s. f is a Filterer of T; s is a Stream of T.