functional

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

go-functional: High-Performance functional primitives for Go

go-functional is a library that provides functional constructs for Go programs.

GitHub tag (latest SemVer) Git Workflow Go Version GO Reference

Overview

go-functional is a high-performance library that provides functional constructs for Go programs. It allows developers to easily construct highly parallel data processing pipelines without worrying about concurrency or the intricacies of data transfer between stages.

Key Features

  • Filter, Map, and Reduce functions for each processing stage
  • Procedural and object-oriented interfaces
  • Batch and stream processing modes
  • Sequential and parallel processing
  • Configurable parameters for fine-grained control
  • Context support for cancellation and resource management

Installation

go get github.com/jake-scott/go-functional

Usage

Basic Example

type Person struct {
    Name string
    Age  int
}

people := []Person{
    {"Alice", 30},
    {"Bob", 25},
    {"Charlie", 40},
}

// Filter adults
adults := functional.NewSliceStage(people).
    Filter(func(p Person) (bool, error) { return p.Age >= 30, nil })

// Map names to uppercase
names := functional.Map(adults, func(p Person) (string, error) { return strings.ToUpper(p.Name), nil })

// Iterate over results
ctx := context.Background()
for names.Iterator().Next(ctx) {
    fmt.Println(names.Iterator().Get()) // Output: ALICE, CHARLIE
}

Advanced Features

  • Batch vs. streaming processing
  • Sequential vs. parallel processing
  • Inherited options
  • Context cancellation

Concepts

Pipelines are constructed by stiching multiple processing stages together. The output of each stage becomes the input for the next stage.

Each stage reads its input using an Iterator. Iterator is an interface, allowing the developer to customize the way the initial stage consumes items. go-functional supplies Iterator implementations that read from:

  • simple slices
  • channels
  • bufio.Scanner and like implemtations of the iter.scanner.Scanner interface

The way stages process items can be affected by configuration supplied to the first stage when it is constructed, or to per-stage filter/map/reduce calls. Configuration can be passed between stages by marking it as inheritable. The following processing parameters can be affected by configuration:

  • Processing type (batch or streaming)
  • Maximal degree of parallelization
  • Order preservation (for batch stages)

Additionally, stages can be provided with a size hint, which can help reduce allocation overhead in batch stages reading an input of unknown size, and a context to cancel background goroutines created by streaming and parallel stages.

Processing functions

All processing functions are passed a function object f which is executed for each input element e. The processing functions produce outputs based on the result of the function object execution f(e).

Filter

The Filter processor outputs the subset of input elements where f(e) == true.

Map

The Map processor outputs one element for every input element. The output elements have the value of f(e) and may be of a different type than the input elements.

Reduce

The Reduce processor outputs a single element that is the result of running a reduce function on every element. Reduce functions mutate an accumulator as elements are processed, by returning the new version of the accumulator.

Basic usage

Initializing the pipeline

The pipeline must be primed with an initial source of elements. go-functional provides three generic helper functions to create the first stage:

  • NewSliceStage[T]() creates an initial pipeline stage that iterates over the supplied slice of elements of type T.
  • NewChannelStage[T]() creates an initial pipeline stage that reads elements of type T from the supplied channel.
  • NewScannerStage[T]() creates an initial pipeline stage that reads elements of type T from a bufio.Scanner implementation. Scanner stages can be used to read elements from an io.Reader.

For example:

type person struct {
    name string
    age  int
}
people := []person{
    {"Bob", 39},
    {"Alice", 25},
    {"Dave", 89},
    {"Mo", 20},
}

pipeline := NewSliceStage(people)

Processing items

The processing functions can be used in an object-oriented or procedural manner. The object-oriented interface allows for chaining of multiple processing stages, for example:

over25 := func(p person) (bool, error) {
    return p.age > 25, nil
}

personCapitalize := func(p person) (person, error) {
    p.name = strings.ToUpper(p.name)
    return p, nil
}

results := functional.NewSliceStage(people).
    Filter(over25).
    Map(personCapitalize)

The object-oriented interface cannot be used when a map function returns elements of a new type. In that case, the procedural interface must be used at least for that stage:

getName := func(p person) (string, error) {
    return p.name, nil
}

results1 := functional.NewSliceStage(people).
    Filter(over25)

results := functional.Map(results1, getName)

Retrieving results

The caller reads results from the Iterator from the last pipeline stage:

iter := results.Iterator()
ctx := context.Background()
for iter.Next(ctx) {
    fmt.Printf("P: %+v\n", iter.Get())
}

// output:
// P: Bob
// P: Dave

Advanced usage

Batch vs streaming

By default, pipeline stages process their input in batch mode. This means that each stage must complete before the next can proceed. Each stage stores the results in a new slice that is passed to the next stage when the original stage finishes. This approach works well when the number of elements is low enough to process in memory, and when the processing functions are fast.

When processing functions are slow (eg. use a lot of CPU or depend on external resources like the network), or when the number of items is very large, stages can be configured to stream results to the next stage over a channel, as results are produced.

For example:

// DNS lookups are slow...
func toHostname(addr netip.Addr) (string, error) {
    hn := addr.String()

    names, err := net.LookupAddr(hn)
    if err == nil {
        hn = names[0]
    }

    return hn, nil
}

ch := make(chan netip.Addr)

// generateIps writes IP addresses to resolve to ch
go generateIps(ch)

stage := functional.NewChannelStage(ch)
results := functional.Map(stage, toHostname, functional.ProcessingType(functional.StreamingStage))

// read results as they are produced by Map()
iter := results.Iterator()
for iter.Next(ctx) {
    fmt.Println(iter.Get())
}

Sequential vs parallel processing

By default, pipeline stages process input elements one at a time, irrespective of whether they are in batch or streaming mode.. This approach works when the processing function can keep up with the pace of input elements (in the case of streaming) or when the function itself if fast.

When a processing function is slow, it can become a bottleneck for the entire pipeline. To address this, the slow stage can be run in parallel to increase performance.

For example:

results := functional.Map(stage, toHostname,
    functional.ProcessingType(functional.StreamingStage),
    functional.Parallelism(10))

Note that the requested degree of parallelism is used in conjunction with the size hint for a stage. When an underlying iterator cannot supply its own size hint (e.g., channels or scanners), a stage uses a user-supplied hint or otherwise defaults to 100 (which can be changed by updating the value of the DefaultSizeHint variable).

The maximum degree of parallelism used by a stage is the minimum of the size hint and the value of the Parallelism() option. Hence, it may be necessary to supply both the SizeHint and Parallelism options, especially when aiming for a degree of parallelism exceeding 100.

Larger degrees of parallelism are suitable for non-CPU bound stages like network access, whereas smaller degrees of parallelism are more suited for CPU-bound stages.

Inherited options

Whenever options are passed to an initial stage or a processing function, the InheritOptions option can also be supplied to enable the inheritance of all the provided options by future stages. These inherited options can then be overridden per processing function. For example:

join := func(a, b string) (string, error) {
    if a == "" {
        return b, nil
    } else {
        return fmt.Sprintf("%s, %s", a, b), nil
    }
}

results := functional.NewSliceStage(people,
              functional.InheritOptions(true),
              functional.WithTracing(true)).
            Filter(over25).
            Map(personCapitalize).
            Reduce("", join)

fmt.Println(results)        // Output: ALICE, CHARLIE

Non-inherited options passed to the initial stage constructor do not have any effect.

Contexts

A context can be passed to any stage using the WithContext option. This allows the pipeline to be canceled when the context is canceled or expires. The iterator Next() method also accept a context to enable the interruption of blocking reads (e.g., on channels or scanners). The processing functions pass the stage context to the iterator methods, and the caller must also pass a context to these methods when extracting the results from the pipeline.

It is recommended to use a context for pipelines or stages that involve parallel or streaming stages to avoid goroutine leaks.

Error handling

There are two sources of errors that can occur during processing:

  • The source iterator (eg. an error reading from a network)
  • User supplied filter/map/reduce functions

go-functional runs an error handler callback whenever one of these conditions is encountered. The error handler can indicate that the pipeline processing should continue or be aborted by returning true (continue) or false (abort). The default error handler does nothing and indicates that processing should continue.

A custom error handler can be passed as an option to any stage, eg:

func logErrorHandler(ec functional.ErrorContext, err error) bool {
	log.Printf("Error in %v: %s\n", ec, err)
	return false    // abort processing
}

stage := functional.NewSliceStage(people, functional.WithErrorHandler(logErrorHandler))

It is possible to have the error handler stash the error for retreival after the pipeline returns control to the caller.

API Reference

For detailed information on functions and types, please refer to the API documentation

Test coverage is available here.

License

This project is licensed under the Apache v2 License - see the LICENSE file for details.

Documentation

Overview

Package functional provides highly performant functional primitives for Go. It supports streaming and parallel execution of stages of a processing pipeline.

Index

Constants

This section is empty.

Variables

View Source
var DefaultSizeHint uint = 100

DefaultSizeHint is used by batch processing functions for initial allocations when the underlying iterator cannot provide size infomation and a stage specific size hint has not been provided.

View Source
var DefaultTracer = func(format string, v ...any) {
	fmt.Fprintf(os.Stderr, "<TRACE> "+format+"\n", v...)
}

DefaultTracer is the global default trace function. It prints messages to stderr. DefaultTracer can be replaced by another tracing function to effect all stages.

Functions

func Reduce

func Reduce[T, A any](s *Stage[T], initial A, r ReduceFunc[T, A], opts ...StageOption) A

Reduce is the non-OO version of stage.Reduce(). It must be used in the case where the accumulator of the reduce function is of a different type to the input elements (due to limitations of go generics).

func SliceFromIterator

func SliceFromIterator[T any](a []T, t T) ([]T, error)

Convenience reduction function that returns a slice of elements from the iterator of the pipeline stage.

Types

type ErrorContext

type ErrorContext int

ErrorContext provides error handler callbacks with a hint about where in processing the error occured

const (
	// ErrorContextIterator hints that the error occured reading an interator
	ErrorContextItertator ErrorContext = iota

	// ErrorContextFilterFunction means the error occured in a filter func
	ErrorContextFilterFunction

	// ErrorContextMapFunction means the error occured in a map func
	ErrorContextMapFunction

	// ErrorContextReduceFunction means the error occued in a reduce func
	ErrorContextReduceFunction

	// We don't know which phase of processing the error occured when
	// the hint it ErrorContextOther
	ErrorContextOther
)

type ErrorHandler

type ErrorHandler func(where ErrorContext, err error) bool

Functions complying with the ErrorHandler prototype can be used to process errors that occur during the pipeline processing functions. The default handler ignores the error. A custom handler can be provided using the WithErrorHandler option.

Parameters:

  • where describes the context in which the error occured
  • err is the error to be handled

The function should return true if processing should continue regardless, or false to stop processing.

type FilterFunc

type FilterFunc[T any] func(T) (bool, error)

FilterFunc is a generic function type that takes a single element and returns true if it is to be included or false if the element is to be excluded from the result set.

If an error is returned, it is passed to the stage's error handler function which may elect to continue or abort processing.

Example:

func findEvenInts(i int) (bool, error) {
    return i%2 == 0, nil
}

type Iterator

type Iterator[T any] interface {
	// Next traverses the iterator to the next element
	// Returns true if the iterator advanced, or false if there are no more
	// elements or if an error occured (see Error() below)
	Next(ctx context.Context) bool

	// Get returns current value referred to by the iterator
	Get() T

	// Error returns a non-nil value if an error occured processing Next()
	Error() error
}

Iterator is a generic interface for one-directional traversal through a collection or stream of items.

type MapFunc

type MapFunc[T any, M any] func(T) (M, error)

MapFunc is a generic function that takes a single element and returns a single transformed element.

Example:

func ipAddress(host string) (net.IP, error) {
    return net.LookupIp(host)
}

type ReduceFunc

type ReduceFunc[T any, A any] func(A, T) (A, error)

ReduceFunc is a generic function that takes an element value of type T and an accululator value of type A and returns a new accumulator value.

Example:

func add(a, i int) (int, error) {
    return a + i, nil
}

type Size

type Size[T any] interface {
	Size() uint
}

Size is an interface that can be implemented by an iterator that knows the number of elements in the collection when it is initialized

type Stage

type Stage[T any] struct {
	// contains filtered or unexported fields
}

Stage represents one processing phase of a larger pipeline The processing methods of a stage read input elements using the underlying Iterator and return a new Stage ready to read elements from the previous stage using a new iterator.

func Filter

func Filter[T any](s *Stage[T], f FilterFunc[T], opts ...StageOption) *Stage[T]

Filter is the non-OO version of Stage.Filter().

func Map

func Map[T, M any](s *Stage[T], m MapFunc[T, M], opts ...StageOption) *Stage[M]

Map is the non-OO version of Stage.Map(). It must be used in the case where the map function returns items of a different type than the input elements, due to limitations of Golang's generic syntax.

func NewChannelStage

func NewChannelStage[T any](ch chan T, opts ...StageOption) *Stage[T]

NewChannelStage instantiates a pipeline stage using a channel iterator backed by the provided channel.

func NewScannerStage

func NewScannerStage(s scanner.Scanner, opts ...StageOption) *Stage[string]

NewScannerState instantiates a pipeline stage using a scanner iterator, backed by the provided scanner.

func NewSliceStage

func NewSliceStage[T any](s []T, opts ...StageOption) *Stage[T]

NewSliceStage instantiates a pipeline stage using a slice iterator backed by the provided slice.

func NewStage

func NewStage[T any](i Iterator[T], opts ...StageOption) *Stage[T]

NewStage instantiates a pipeline stage from an Iterator and optional set of processing optionns

func (*Stage[T]) Filter

func (s *Stage[T]) Filter(f FilterFunc[T], opts ...StageOption) *Stage[T]

Filter processes this stage's input elements by calling f for each element and returns a new stage that will process all the elements where f(e) is true.

If this stage is configured to process in batch, Filter returns after all the input elements have been processed; those elements are passed to the next stage as a slice.

If this stage is configured to stream, Filter returns immediately after launching a go-routine to process the elements in the background. The next stage reads from a channel that the processing goroutine writes its results to as they are processed.

func (*Stage[T]) Iterator

func (s *Stage[T]) Iterator() Iterator[T]

Iterator returns the underlying iterator for a stage. It is most useful as a mechanism for retrieving the result from the last stage of a pipeline by the caller of the pipeline.

func (*Stage[T]) Map

func (s *Stage[T]) Map(m MapFunc[T, T], opts ...StageOption) *Stage[T]

Map processes the stage's input elements by calling m for each element, returning a new stage containing the same number of elements, mapped to new values of the same type.

If the map function returns values of a different type to the input values, the non-OO version of Map() must be used instead.

If the stage is configured to process in batch, Map returns after all the input elements have been processed; those elements are passed to the next stage as a slice.

If the stage is configued to stream, Map returns immediately after launching go-routines to process the elements in the background. The returned stage reads from a channel that the processing goroutine writes its result to as they are processed.

func (*Stage[T]) Reduce

func (s *Stage[T]) Reduce(initial T, r ReduceFunc[T, T], opts ...StageOption) T

Reduce processes the stage's input elements to a single element of the same type, by calling r for every element and passing an accumulator value that each invocation of r can update by returning a value.

If the Reduce function returns a value of a different type to the input values, the non-OO version of Reduce() must be used instead.

Reduce always runs sequentially in a batch mode.

type StageOption

type StageOption func(g *stageOptions)

StageOptions provide a mechanism to customize how the processing functions of a stage opterate.

func InheritOptions

func InheritOptions(inherit bool) StageOption

InheritOptions causes this stage's options to be inherited by the next stage. The next stage can override these inherited options. Further inheritence can be disabled by passing this option with a false value.

The default is no inheritence.

func Parallelism

func Parallelism(max uint) StageOption

The Parallem option defines the maximum concurrency of the stage.

If not specified, the default is to process elements serially.

func PreserveOrder

func PreserveOrder(preserve bool) StageOption

PreserveOrder causes concurent batch stages to retain the order of processed elements. This is always the case with serial stages and is not possible for concurrent streaming stages. Maintaining the order of elements for concurrent batch stages incurs a performance penalty.

The default is to not maintain order.

func ProcessingType

func ProcessingType(t StageType) StageOption

The ProcessingType option configures whether the stage operates in batch or streaming mode. If not specified, stages default to processing in batch mode.

func SizeHint

func SizeHint(hint uint) StageOption

The SizeHint option provides the stage processor functions with a guideline regarding the number of elements there are to process. This is primarily used with iterators that cannot provide the information themselves.

If not specified and the iterator cannot provide the information, the default value DefaultSizeHint is used.

func WithContext

func WithContext(ctx context.Context) StageOption

WithContext attaches the provided context to the stage.

func WithErrorHandler

func WithErrorHandler(handler ErrorHandler) StageOption

WithErrorHandler installs a custom error handler which will be called from the processing functions when the filter/map/reduce function or an iterator emits an error.

The handler should return true to continue processing or false to abort.

The handler can stash the error for use in the pipeline's caller.

func WithTraceFunc

func WithTraceFunc(f TraceFunc) StageOption

WithTraceFunc sets the trace function for the stage. Use WithTracing to enable/disable tracing.

func WithTracing

func WithTracing(enable bool) StageOption

WithTracing enables tracing for the stage. If a custom trace function has not been set using WithTraceFunc, trace messages are printed to stderr.

type StageType

type StageType int

StageType describes the behaviour of a pipeline stage

const (
	// Batch stages collect the results of processing all of the
	// input items before passing control to the next stage
	BatchStage StageType = iota

	// Streaming stages pass the results of processed input items to the
	// next pipeline stage as a stream while processing other elements continues.
	StreamingStage
)

func (StageType) String

func (t StageType) String() string

type TraceFunc

type TraceFunc func(format string, v ...any)

TraceFunc defines the function prototype of a tracing function Per stage functions can be configured using WithTraceFunc

Directories

Path Synopsis
examples
iter
channel
Package channel implements an interator that reads a data stream from the supplied channel.
Package channel implements an interator that reads a data stream from the supplied channel.
scanner
Package scanner implements a stream tokenizer iterator.
Package scanner implements a stream tokenizer iterator.
slice
Package slice implements an iterator that traverses uni-directionally over a generic slice of elements
Package slice implements an iterator that traverses uni-directionally over a generic slice of elements

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL