pipeline

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2020 License: BSD-3-Clause Imports: 8 Imported by: 11

Documentation

Overview

Package pipeline provides means to construct and execute parallel pipelines.

A Pipeline feeds batches of data through several functions that can be specified to be executed in encounter order, in arbitrary sequential order, or in parallel. Ordered, sequential, or parallel stages can arbitrarily alternate.

A Pipeline consists of a Source object, and several Node objects.

Source objects that are supported by this implementation are arrays, slices, strings, channels, and bufio.Scanner objects, but other kinds of Source objects can be added by user programs.

Node objects can be specified to receive batches from the input source either sequentially in encounter order, which is always the same order in which they were originally encountered at the source; sequentially, but in arbitrary order; or in parallel. Ordered nodes always receive batches in encounter order even if they are preceded by arbitrary sequential, or even parallel nodes.

Node objects consist of filters, which are pairs of receiver and finalizer functions. Each batch is passed to each receiver function, which can transform and modify the batch for the next receiver function in the pipeline. Each finalizer function is called once when all batches have been passed through all receiver functions.

Pipelines do not have an explicit representation for sinks. Instead, filters can use side effects to generate results.

Pipelines also support cancelation by way of the context package of Go's standard library.

An application of pipelines can be found in the elPrep tool at https://github.com/exascience/elprep - specifically in https://github.com/ExaScience/elprep/blob/master/sam/filter-pipeline.go

Example (WordCount)
package main

import (
	"bufio"
	"fmt"
	"io"
	"runtime"
	"strings"

	"github.com/exascience/pargo/pipeline"
	"github.com/exascience/pargo/sort"
	"github.com/exascience/pargo/sync"
)

type Word string

func (w Word) Hash() (hash uint64) {
	// DJBX33A
	hash = 5381
	for _, b := range w {
		hash = ((hash << 5) + hash) + uint64(b)
	}
	return
}

func WordCount(r io.Reader) *sync.Map {
	result := sync.NewMap(16 * runtime.GOMAXPROCS(0))
	scanner := pipeline.NewScanner(r)
	scanner.Split(bufio.ScanWords)
	var p pipeline.Pipeline
	p.Source(scanner)
	p.Add(
		pipeline.Par(pipeline.Receive(
			func(_ int, data interface{}) interface{} {
				var uniqueWords []string
				for _, s := range data.([]string) {
					newValue, _ := result.Modify(Word(s), func(value interface{}, ok bool) (newValue interface{}, store bool) {
						if ok {
							newValue = value.(int) + 1
						} else {
							newValue = 1
						}
						store = true
						return
					})
					if newValue.(int) == 1 {
						uniqueWords = append(uniqueWords, s)
					}
				}
				return uniqueWords
			},
		)),
		pipeline.Ord(pipeline.ReceiveAndFinalize(
			func(_ int, data interface{}) interface{} {
				// print unique words as encountered first at the source
				for _, s := range data.([]string) {
					fmt.Print(s, " ")
				}
				return data
			},
			func() { fmt.Println(".") },
		)),
	)
	p.Run()
	return result
}

func main() {
	r := strings.NewReader("The big black bug bit the big black bear but the big black bear bit the big black bug back")
	counts := WordCount(r)
	words := make(sort.StringSlice, 0)
	counts.Range(func(key, _ interface{}) bool {
		words = append(words, string(key.(Word)))
		return true
	})
	sort.Sort(words)
	for _, word := range words {
		count, _ := counts.Load(Word(word))
		fmt.Println(word, count.(int))
	}

}
Output:

The big black bug bit the bear but back .
The 1
back 1
bear 2
big 4
bit 2
black 4
bug 2
but 1
the 3

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComposeFilters

func ComposeFilters(pipeline *Pipeline, kind NodeKind, dataSize *int, filters []Filter) (receivers []Receiver, finalizers []Finalizer)

ComposeFilters takes a number of filters, calls them with the given pipeline, kind, and dataSize parameters in order, and appends the returned receivers and finalizers (except for nil values) to the result slices.

ComposeFilters is used in Node implementations. User programs typically do not call ComposeFilters.

func Identity added in v1.1.0

func Identity(_ *Pipeline, _ NodeKind, _ *int) (_ Receiver, _ Finalizer)

Identity is a filter that passes data batches through unmodified. This filter will be optimized away in a pipeline, so it does not hurt to add it.

Types

type BytesScanner

type BytesScanner struct {
	*bufio.Scanner
	// contains filtered or unexported fields
}

BytesScanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches slices of bytes.

func NewBytesScanner

func NewBytesScanner(r io.Reader) *BytesScanner

NewBytesScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.

func (*BytesScanner) Data

func (src *BytesScanner) Data() interface{}

Data implements the method of the Source interface.

func (*BytesScanner) Fetch

func (src *BytesScanner) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*BytesScanner) Prepare

func (src *BytesScanner) Prepare(_ context.Context) (size int)

Prepare implements the method of the Source interface.

type Filter

type Filter func(pipeline *Pipeline, kind NodeKind, dataSize *int) (Receiver, Finalizer)

A Filter is a function that returns a Receiver and a Finalizer to be added to a node. It receives a pipeline, the kind of node it will be added to, and the expected total data size that the receiver will be asked to process.

The dataSize parameter is either positive, in which case it indicates the expected total size of all batches that will eventually be passed to this filter's receiver, or it is negative, in which case the expected size is either unknown or too difficult to determine. The dataSize parameter is a pointer whose contents can be modified by the filter, for example if this filter increases or decreases the total size for subsequent filters, or if this filter can change dataSize from an unknown to a known value, or vice versa, must change it from a known to an unknown value.

Either the receiver or the finalizer or both can be nil, in which case they will not be added to the current node.

func Count

func Count(result *int) Filter

Count creates a filter that sets the result pointer to the total size of all data batches it sees.

func Every

func Every(result *bool, cancelWhenKnown bool, predicate Predicate) Filter

Every creates a filter that sets the result pointer to true if the given predicate returns true for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.

func Finalize

func Finalize(finalize Finalizer) Filter

Finalize creates a filter that returns a nil receiver and the given finalizer.

func NotAny

func NotAny(result *bool, cancelWhenKnown bool, predicate Predicate) Filter

NotAny creates a filter that sets the result pointer to true if the given predicate returns false for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.

func NotEvery

func NotEvery(result *bool, cancelWhenKnown bool, predicate Predicate) Filter

NotEvery creates a filter that sets the result pointer to true if the given predicate returns false for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.

func Receive

func Receive(receive Receiver) Filter

Receive creates a Filter that returns the given receiver and a nil finalizer.

func ReceiveAndFinalize

func ReceiveAndFinalize(receive Receiver, finalize Finalizer) Filter

ReceiveAndFinalize creates a filter that returns the given filter and receiver.

func Slice

func Slice(result interface{}) Filter

Slice creates a filter that appends all the data batches it sees to the result. The result must represent a settable slice, for example by using the address operator & on a given slice.

func Some

func Some(result *bool, cancelWhenKnown bool, predicate Predicate) Filter

Some creates a filter that sets the result pointer to true if the given predicate returns true for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.

type Finalizer

type Finalizer func()

A Finalizer is called once after the corresponding receiver has been called for all data batches in the current pipeline.

type Func added in v1.1.0

type Func struct {
	// contains filtered or unexported fields
}

Func is a generic source that generates data batches by repeatedly calling a function.

func NewFunc added in v1.1.0

func NewFunc(size int, fetch func(size int) (data interface{}, fetched int, err error)) *Func

NewFunc returns a new Func to generate data batches by repeatedly calling fetch.

The size parameter informs the pipeline what the total expected size of all data batches is. Pass -1 if the total size is unknown or difficult to determine.

The fetch function returns a data batch of the requested size. It returns the size of the data batch that it was actually able to fetch. It returns 0 if there is no more data to be fetched from the source; the pipeline will then make no further attempts to fetch more elements.

The fetch function can also return an error if necessary.

func (*Func) Data added in v1.1.0

func (f *Func) Data() interface{}

Data implements the method of the Source interface.

func (*Func) Err added in v1.1.0

func (f *Func) Err() error

Err implements the method of the Source interface.

func (*Func) Fetch added in v1.1.0

func (f *Func) Fetch(size int) (fetched int)

Fetch implements the method of the Source interface.

func (*Func) Prepare added in v1.1.0

func (f *Func) Prepare(_ context.Context) int

Prepare implements the method of the Source interface.

type Node

type Node interface {

	// TryMerge tries to merge node with the current node by appending its
	// filters to the filters of the current node, which succeeds if both nodes
	// are either sequential or parallel. The return value merged indicates
	// whether merging succeeded.
	TryMerge(node Node) (merged bool)

	// Begin informs this node that the pipeline is going to start to feed
	// batches of data to this node. The pipeline, the index of this node among
	// all the nodes in the pipeline, and the expected total size of all batches
	// combined are passed as parameters.
	//
	// The dataSize parameter is either positive, in which case it indicates the
	// expected total size of all batches that will eventually be passed to this
	// node's Feed method, or it is negative, in which case the expected size is
	// either unknown or too difficult to determine. The dataSize parameter is a
	// pointer whose contents can be modified by Begin, for example if this node
	// increases or decreases the total size for subsequent nodes, or if this
	// node can change dataSize from an unknown to a known value, or vice versa,
	// must change it from a known to an unknown value.
	//
	// A node may decide that, based on the given information, it will actually
	// not need to see any of the batches that are normally going to be passed
	// to it. In that case, it can return false as a result, and its Feed and
	// End method will not be called anymore.  Otherwise, it should return true
	// by default.
	Begin(p *Pipeline, index int, dataSize *int) (keep bool)

	// Feed is called for each batch of data. The pipeline, the index of this
	// node among all the nodes in the pipeline (which may be different from the
	// index number seen by Begin), the sequence number of the batch (according
	// to the encounter order), and the actual batch of data are passed as
	// parameters.
	//
	// The data parameter contains the batch of data, which is usually a slice
	// of a particular type. After the data has been processed by all filters of
	// this node, the node must call p.FeedForward with exactly the same index
	// and sequence numbers, but a potentially modified batch of data.
	// FeedForward must be called even when the data batch is or becomes empty,
	// to ensure that all sequence numbers are seen by subsequent nodes.
	Feed(p *Pipeline, index int, seqNo int, data interface{})

	// End is called after all batches have been passed to Feed. This allows the
	// node to release resources and call the finalizers of its filters.
	End()
}

A Node object represents a sequence of filters which are together executed either in encounter order, in arbitrary sequential order, or in parallel.

The methods of this interface are typically not called by user programs, but rather implemented by specific node types and called by pipelines. Ordered, sequential, and parallel nodes are also implemented in this package, so that user programs are typically not concerned with Node methods at all.

func Limit

func Limit(limit int, cancelWhenReached bool) Node

Limit creates an ordered node with a filter that caps the total size of all data batches it passes to the next filter in the pipeline to the given limit. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the limit is reached. If limit is negative, all data is passed through unmodified.

func LimitedPar

func LimitedPar(limit int, filters ...Filter) Node

LimitedPar creates a parallel node with the given filters. The node uses at most limit goroutines at the same time. If limit is 0, a reasonable default is used instead. Even if limit is 0, the node is still limited. For unlimited nodes, use Par instead.

func NewNode

func NewNode(kind NodeKind, filters ...Filter) Node

NewNode creates a node of the given kind, with the given filters.

It is often more convenient to use one of the Ord, Seq, or Par methods.

func Ord

func Ord(filters ...Filter) Node

Ord creates an ordered node with the given filters.

func Par

func Par(filters ...Filter) Node

Par creates a parallel node with the given filters.

func Seq

func Seq(filters ...Filter) Node

Seq creates a sequential node with the given filters.

func Skip

func Skip(n int) Node

Skip creates an ordered node with a filter that skips the first n elements from the data batches it passes to the next filter in the pipeline. If n is negative, no data is passed through, and the error value of the pipeline is set to a non-nil value.

func StrictOrd

func StrictOrd(filters ...Filter) Node

StrictOrd creates an ordered node with the given filters.

type NodeKind

type NodeKind int

A NodeKind reperesents the different kinds of nodes.

const (
	// Ordered nodes receive batches in encounter order.
	Ordered NodeKind = iota

	// Sequential nodes receive batches in arbitrary sequential order.
	Sequential

	// Parallel nodes receives batches in parallel.
	Parallel
)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

A Pipeline is a parallel pipeline that can feed batches of data fetched from a source through several nodes that are ordered, sequential, or parallel.

The zero Pipeline is valid and empty.

A Pipeline must not be copied after first use.

func (*Pipeline) Add

func (p *Pipeline) Add(nodes ...Node)

Add appends nodes to the end of this pipeline.

func (*Pipeline) Cancel

func (p *Pipeline) Cancel()

Cancel calls the cancel function of this pipeline's context.

func (*Pipeline) Context

func (p *Pipeline) Context() context.Context

Context returns this pipeline's context.

func (*Pipeline) Err

func (p *Pipeline) Err() (err error)

Err returns the current error value for this pipeline, which may be nil if no error has occurred so far.

Err and SetErr are safe to be concurrently invoked.

func (*Pipeline) FeedForward

func (p *Pipeline) FeedForward(index int, seqNo int, data interface{})

FeedForward must be called in the Feed method of a node to forward a potentially modified data batch to the next node in the current pipeline.

FeedForward is used in Node implementations. User programs typically do not call FeedForward.

FeedForward must be called with the pipeline received as a parameter by Feed, and must pass the same index and seqNo received by Feed. The data parameter can be either a modified or an unmodified data batch. FeedForward must always be called, even if the data batch is unmodified, and even if the data batch is or becomes empty.

func (*Pipeline) NofBatches

func (p *Pipeline) NofBatches(n int) (nofBatches int)

NofBatches sets or gets the number of batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is known or can be determined easily.

NofBatches can be called safely by user programs before Run or RunWithContext is called.

If user programs do not call NofBatches, or call them with a value < 1, then the pipeline will choose a reasonable default value that takes runtime.GOMAXPROCS(0) into account.

If the expected total size for this pipeline's data source is unknown, or is difficult to determine, use SetVariableBatchSize to influence batch sizes.

func (*Pipeline) Run

func (p *Pipeline) Run()

Run initiates pipeline execution by calling RunWithContext(context.WithCancel(context.Background())), and ensures that the cancel function is called at least once when the pipeline is done.

Run should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before Run to deal with load imbalance, but this is not necessary since Run chooses a reasonable default value.

Run prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.

func (*Pipeline) RunWithContext

func (p *Pipeline) RunWithContext(ctx context.Context, cancel context.CancelFunc)

RunWithContext initiates pipeline execution.

It expects a context and a cancel function as parameters, for example from context.WithCancel(context.Background()). It does not ensure that the cancel function is called at least once, so this must be ensured by the function calling RunWithContext.

RunWithContext should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before RunWithContext to deal with load imbalance, but this is not necessary since RunWithContext chooses a reasonable default value.

RunWithContext prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.

func (*Pipeline) SetErr

func (p *Pipeline) SetErr(err error) bool

SetErr attempts to set a new error value for this pipeline, unless it already has a non-nil error value. If the attempt is successful, SetErr also cancels the pipeline, and returns true. If the attempt is not successful, SetErr returns false.

SetErr and Err are safe to be concurrently invoked, for example from the different goroutines executing filters of parallel nodes in this pipeline.

func (*Pipeline) SetVariableBatchSize

func (p *Pipeline) SetVariableBatchSize(batchInc, maxBatchSize int)

SetVariableBatchSize sets the batch size(s) for the batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is unknown or difficult to determine.

SetVariableBatchSize can be called safely by user programs before Run or RunWithContext is called.

If user programs do not call SetVariableBatchSize, or pass a value < 1 to any of the two parameters, then the pipeline will choose a reasonable default value for that respective parameter.

The pipeline will start with batchInc as a batch size, and increase the batch size for every subsequent batch by batchInc to accomodate data sources of different total sizes. The batch size will never be larger than maxBatchSize, though.

If the expected total size for this pipeline's data source is known, or can be determined easily, use NofBatches to influence the batch size.

func (*Pipeline) Source

func (p *Pipeline) Source(source interface{})

Source sets the data source for this pipeline.

If source does not implement the Source interface, the pipeline uses reflection to create a proper source for arrays, slices, strings, or channels.

It is safe to call Source multiple times before Run or RunWithContext is called, in which case only the last call to Source is effective.

type Predicate

type Predicate func(data interface{}) bool

A Predicate is a function that is passed a data batch and returns a boolean value.

In most cases, it will cast the data parameter to a specific slice type and check a predicate on each element of the slice.

type Receiver

type Receiver func(seqNo int, data interface{}) (filteredData interface{})

A Receiver is called for every data batch, and returns a potentially modified data batch. The seqNo parameter indicates the order in which the data batch was encountered at the current pipeline's data source.

type Scanner

type Scanner struct {
	*bufio.Scanner
	// contains filtered or unexported fields
}

Scanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches strings.

func NewScanner

func NewScanner(r io.Reader) *Scanner

NewScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.

func (*Scanner) Data

func (src *Scanner) Data() interface{}

Data implements the method of the Source interface.

func (*Scanner) Fetch

func (src *Scanner) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*Scanner) Prepare

func (src *Scanner) Prepare(_ context.Context) (size int)

Prepare implements the method of the Source interface.

type SingletonChan added in v1.1.0

type SingletonChan struct {
	// contains filtered or unexported fields
}

SingletonChan is similar to a regular chan source, except it only accepts and passes through single elements instead of creating slices of elements from the input channel.

func NewSingletonChan added in v1.1.0

func NewSingletonChan(channel interface{}) *SingletonChan

NewSingletonChan returns a new SingletonChan to read from the given channel.

func (*SingletonChan) Data added in v1.1.0

func (src *SingletonChan) Data() interface{}

Data implements the method of the Source interface.

func (*SingletonChan) Err added in v1.1.0

func (src *SingletonChan) Err() error

Err implements the method of the Source interface.

func (*SingletonChan) Fetch added in v1.1.0

func (src *SingletonChan) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*SingletonChan) Prepare added in v1.1.0

func (src *SingletonChan) Prepare(ctx context.Context) (size int)

Prepare implements the method of the Source interface.

type Source

type Source interface {
	// Err returns an error value or nil
	Err() error

	// Prepare receives a pipeline context and informs the pipeline what the
	// total expected size of all data batches is. The return value is -1 if the
	// total size is unknown or difficult to determine.
	Prepare(ctx context.Context) (size int)

	// Fetch gets a data batch of the requested size from the source. It returns
	// the size of the data batch that it was actually able to fetch. It returns
	// 0 if there is no more data to be fetched from the source; the pipeline
	// will then make no further attempts to fetch more elements.
	Fetch(size int) (fetched int)

	// Data returns the last fetched data batch.
	Data() interface{}
}

A Source represents an object that can generate data batches for pipelines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL