Documentation ¶
Overview ¶
Package pipeline provides means to construct and execute parallel pipelines.
A Pipeline feeds batches of data through several functions that can be specified to be executed in encounter order, in arbitrary sequential order, or in parallel. Ordered, sequential, or parallel stages can arbitrarily alternate.
A Pipeline consists of a Source object, and several Node objects.
Source objects that are supported by this implementation are arrays, slices, strings, channels, and bufio.Scanner objects, but other kinds of Source objects can be added by user programs.
Node objects can be specified to receive batches from the input source either sequentially in encounter order, which is always the same order in which they were originally encountered at the source; sequentially, but in arbitrary order; or in parallel. Ordered nodes always receive batches in encounter order even if they are preceded by arbitrary sequential, or even parallel nodes.
Node objects consist of filters, which are pairs of receiver and finalizer functions. Each batch is passed to each receiver function, which can transform and modify the batch for the next receiver function in the pipeline. Each finalizer function is called once when all batches have been passed through all receiver functions.
Pipelines do not have an explicit representation for sinks. Instead, filters can use side effects to generate results.
Pipelines also support cancelation by way of the context package of Go's standard library.
An application of pipelines can be found in the elPrep tool at https://github.com/exascience/elprep - specifically in https://github.com/ExaScience/elprep/blob/master/sam/filter-pipeline.go
Example (WordCount) ¶
package main import ( "bufio" "fmt" "io" "runtime" "strings" "github.com/exascience/pargo/pipeline" "github.com/exascience/pargo/sort" "github.com/exascience/pargo/sync" ) type Word string func (w Word) Hash() (hash uint64) { // DJBX33A hash = 5381 for _, b := range w { hash = ((hash << 5) + hash) + uint64(b) } return } func WordCount(r io.Reader) *sync.Map { result := sync.NewMap(16 * runtime.GOMAXPROCS(0)) scanner := pipeline.NewScanner(r) scanner.Split(bufio.ScanWords) var p pipeline.Pipeline p.Source(scanner) p.Add( pipeline.Par(pipeline.Receive( func(_ int, data interface{}) interface{} { var uniqueWords []string for _, s := range data.([]string) { newValue, _ := result.Modify(Word(s), func(value interface{}, ok bool) (newValue interface{}, store bool) { if ok { newValue = value.(int) + 1 } else { newValue = 1 } store = true return }) if newValue.(int) == 1 { uniqueWords = append(uniqueWords, s) } } return uniqueWords }, )), pipeline.Ord(pipeline.ReceiveAndFinalize( func(_ int, data interface{}) interface{} { // print unique words as encountered first at the source for _, s := range data.([]string) { fmt.Print(s, " ") } return data }, func() { fmt.Println(".") }, )), ) p.Run() return result } func main() { r := strings.NewReader("The big black bug bit the big black bear but the big black bear bit the big black bug back") counts := WordCount(r) words := make(sort.StringSlice, 0) counts.Range(func(key, _ interface{}) bool { words = append(words, string(key.(Word))) return true }) sort.Sort(words) for _, word := range words { count, _ := counts.Load(Word(word)) fmt.Println(word, count.(int)) } }
Output: The big black bug bit the bear but back . The 1 back 1 bear 2 big 4 bit 2 black 4 bug 2 but 1 the 3
Index ¶
- func ComposeFilters(pipeline *Pipeline, kind NodeKind, dataSize *int, filters []Filter) (receivers []Receiver, finalizers []Finalizer)
- func Identity(_ *Pipeline, _ NodeKind, _ *int) (_ Receiver, _ Finalizer)
- type BytesScanner
- type Filter
- func Count(result *int) Filter
- func Every(result *bool, cancelWhenKnown bool, predicate Predicate) Filter
- func Finalize(finalize Finalizer) Filter
- func NotAny(result *bool, cancelWhenKnown bool, predicate Predicate) Filter
- func NotEvery(result *bool, cancelWhenKnown bool, predicate Predicate) Filter
- func Receive(receive Receiver) Filter
- func ReceiveAndFinalize(receive Receiver, finalize Finalizer) Filter
- func Slice(result interface{}) Filter
- func Some(result *bool, cancelWhenKnown bool, predicate Predicate) Filter
- type Finalizer
- type Func
- type Node
- func Limit(limit int, cancelWhenReached bool) Node
- func LimitedPar(limit int, filters ...Filter) Node
- func NewNode(kind NodeKind, filters ...Filter) Node
- func Ord(filters ...Filter) Node
- func Par(filters ...Filter) Node
- func Seq(filters ...Filter) Node
- func Skip(n int) Node
- func StrictOrd(filters ...Filter) Node
- type NodeKind
- type Pipeline
- func (p *Pipeline) Add(nodes ...Node)
- func (p *Pipeline) Cancel()
- func (p *Pipeline) Context() context.Context
- func (p *Pipeline) Err() (err error)
- func (p *Pipeline) FeedForward(index int, seqNo int, data interface{})
- func (p *Pipeline) NofBatches(n int) (nofBatches int)
- func (p *Pipeline) Run()
- func (p *Pipeline) RunWithContext(ctx context.Context, cancel context.CancelFunc)
- func (p *Pipeline) SetErr(err error) bool
- func (p *Pipeline) SetVariableBatchSize(batchInc, maxBatchSize int)
- func (p *Pipeline) Source(source interface{})
- type Predicate
- type Receiver
- type Scanner
- type SingletonChan
- type Source
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComposeFilters ¶
func ComposeFilters(pipeline *Pipeline, kind NodeKind, dataSize *int, filters []Filter) (receivers []Receiver, finalizers []Finalizer)
ComposeFilters takes a number of filters, calls them with the given pipeline, kind, and dataSize parameters in order, and appends the returned receivers and finalizers (except for nil values) to the result slices.
ComposeFilters is used in Node implementations. User programs typically do not call ComposeFilters.
Types ¶
type BytesScanner ¶
BytesScanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches slices of bytes.
func NewBytesScanner ¶
func NewBytesScanner(r io.Reader) *BytesScanner
NewBytesScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.
func (*BytesScanner) Data ¶
func (src *BytesScanner) Data() interface{}
Data implements the method of the Source interface.
func (*BytesScanner) Fetch ¶
func (src *BytesScanner) Fetch(n int) (fetched int)
Fetch implements the method of the Source interface.
type Filter ¶
A Filter is a function that returns a Receiver and a Finalizer to be added to a node. It receives a pipeline, the kind of node it will be added to, and the expected total data size that the receiver will be asked to process.
The dataSize parameter is either positive, in which case it indicates the expected total size of all batches that will eventually be passed to this filter's receiver, or it is negative, in which case the expected size is either unknown or too difficult to determine. The dataSize parameter is a pointer whose contents can be modified by the filter, for example if this filter increases or decreases the total size for subsequent filters, or if this filter can change dataSize from an unknown to a known value, or vice versa, must change it from a known to an unknown value.
Either the receiver or the finalizer or both can be nil, in which case they will not be added to the current node.
func Count ¶
Count creates a filter that sets the result pointer to the total size of all data batches it sees.
func Every ¶
Every creates a filter that sets the result pointer to true if the given predicate returns true for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.
func NotAny ¶
NotAny creates a filter that sets the result pointer to true if the given predicate returns false for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.
func NotEvery ¶
NotEvery creates a filter that sets the result pointer to true if the given predicate returns false for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.
func ReceiveAndFinalize ¶
ReceiveAndFinalize creates a filter that returns the given filter and receiver.
func Slice ¶
func Slice(result interface{}) Filter
Slice creates a filter that appends all the data batches it sees to the result. The result must represent a settable slice, for example by using the address operator & on a given slice.
type Finalizer ¶
type Finalizer func()
A Finalizer is called once after the corresponding receiver has been called for all data batches in the current pipeline.
type Func ¶ added in v1.1.0
type Func struct {
// contains filtered or unexported fields
}
Func is a generic source that generates data batches by repeatedly calling a function.
func NewFunc ¶ added in v1.1.0
NewFunc returns a new Func to generate data batches by repeatedly calling fetch.
The size parameter informs the pipeline what the total expected size of all data batches is. Pass -1 if the total size is unknown or difficult to determine.
The fetch function returns a data batch of the requested size. It returns the size of the data batch that it was actually able to fetch. It returns 0 if there is no more data to be fetched from the source; the pipeline will then make no further attempts to fetch more elements.
The fetch function can also return an error if necessary.
func (*Func) Data ¶ added in v1.1.0
func (f *Func) Data() interface{}
Data implements the method of the Source interface.
type Node ¶
type Node interface { // TryMerge tries to merge node with the current node by appending its // filters to the filters of the current node, which succeeds if both nodes // are either sequential or parallel. The return value merged indicates // whether merging succeeded. TryMerge(node Node) (merged bool) // Begin informs this node that the pipeline is going to start to feed // batches of data to this node. The pipeline, the index of this node among // all the nodes in the pipeline, and the expected total size of all batches // combined are passed as parameters. // // The dataSize parameter is either positive, in which case it indicates the // expected total size of all batches that will eventually be passed to this // node's Feed method, or it is negative, in which case the expected size is // either unknown or too difficult to determine. The dataSize parameter is a // pointer whose contents can be modified by Begin, for example if this node // increases or decreases the total size for subsequent nodes, or if this // node can change dataSize from an unknown to a known value, or vice versa, // must change it from a known to an unknown value. // // A node may decide that, based on the given information, it will actually // not need to see any of the batches that are normally going to be passed // to it. In that case, it can return false as a result, and its Feed and // End method will not be called anymore. Otherwise, it should return true // by default. Begin(p *Pipeline, index int, dataSize *int) (keep bool) // Feed is called for each batch of data. The pipeline, the index of this // node among all the nodes in the pipeline (which may be different from the // index number seen by Begin), the sequence number of the batch (according // to the encounter order), and the actual batch of data are passed as // parameters. // // The data parameter contains the batch of data, which is usually a slice // of a particular type. After the data has been processed by all filters of // this node, the node must call p.FeedForward with exactly the same index // and sequence numbers, but a potentially modified batch of data. // FeedForward must be called even when the data batch is or becomes empty, // to ensure that all sequence numbers are seen by subsequent nodes. Feed(p *Pipeline, index int, seqNo int, data interface{}) // End is called after all batches have been passed to Feed. This allows the // node to release resources and call the finalizers of its filters. End() }
A Node object represents a sequence of filters which are together executed either in encounter order, in arbitrary sequential order, or in parallel.
The methods of this interface are typically not called by user programs, but rather implemented by specific node types and called by pipelines. Ordered, sequential, and parallel nodes are also implemented in this package, so that user programs are typically not concerned with Node methods at all.
func Limit ¶
Limit creates an ordered node with a filter that caps the total size of all data batches it passes to the next filter in the pipeline to the given limit. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the limit is reached. If limit is negative, all data is passed through unmodified.
func LimitedPar ¶
LimitedPar creates a parallel node with the given filters. The node uses at most limit goroutines at the same time. If limit is 0, a reasonable default is used instead. Even if limit is 0, the node is still limited. For unlimited nodes, use Par instead.
func NewNode ¶
NewNode creates a node of the given kind, with the given filters.
It is often more convenient to use one of the Ord, Seq, or Par methods.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
A Pipeline is a parallel pipeline that can feed batches of data fetched from a source through several nodes that are ordered, sequential, or parallel.
The zero Pipeline is valid and empty.
A Pipeline must not be copied after first use.
func (*Pipeline) Cancel ¶
func (p *Pipeline) Cancel()
Cancel calls the cancel function of this pipeline's context.
func (*Pipeline) Err ¶
Err returns the current error value for this pipeline, which may be nil if no error has occurred so far.
Err and SetErr are safe to be concurrently invoked.
func (*Pipeline) FeedForward ¶
FeedForward must be called in the Feed method of a node to forward a potentially modified data batch to the next node in the current pipeline.
FeedForward is used in Node implementations. User programs typically do not call FeedForward.
FeedForward must be called with the pipeline received as a parameter by Feed, and must pass the same index and seqNo received by Feed. The data parameter can be either a modified or an unmodified data batch. FeedForward must always be called, even if the data batch is unmodified, and even if the data batch is or becomes empty.
func (*Pipeline) NofBatches ¶
NofBatches sets or gets the number of batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is known or can be determined easily.
NofBatches can be called safely by user programs before Run or RunWithContext is called.
If user programs do not call NofBatches, or call them with a value < 1, then the pipeline will choose a reasonable default value that takes runtime.GOMAXPROCS(0) into account.
If the expected total size for this pipeline's data source is unknown, or is difficult to determine, use SetVariableBatchSize to influence batch sizes.
func (*Pipeline) Run ¶
func (p *Pipeline) Run()
Run initiates pipeline execution by calling RunWithContext(context.WithCancel(context.Background())), and ensures that the cancel function is called at least once when the pipeline is done.
Run should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before Run to deal with load imbalance, but this is not necessary since Run chooses a reasonable default value.
Run prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.
func (*Pipeline) RunWithContext ¶
func (p *Pipeline) RunWithContext(ctx context.Context, cancel context.CancelFunc)
RunWithContext initiates pipeline execution.
It expects a context and a cancel function as parameters, for example from context.WithCancel(context.Background()). It does not ensure that the cancel function is called at least once, so this must be ensured by the function calling RunWithContext.
RunWithContext should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before RunWithContext to deal with load imbalance, but this is not necessary since RunWithContext chooses a reasonable default value.
RunWithContext prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.
func (*Pipeline) SetErr ¶
SetErr attempts to set a new error value for this pipeline, unless it already has a non-nil error value. If the attempt is successful, SetErr also cancels the pipeline, and returns true. If the attempt is not successful, SetErr returns false.
SetErr and Err are safe to be concurrently invoked, for example from the different goroutines executing filters of parallel nodes in this pipeline.
func (*Pipeline) SetVariableBatchSize ¶
SetVariableBatchSize sets the batch size(s) for the batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is unknown or difficult to determine.
SetVariableBatchSize can be called safely by user programs before Run or RunWithContext is called.
If user programs do not call SetVariableBatchSize, or pass a value < 1 to any of the two parameters, then the pipeline will choose a reasonable default value for that respective parameter.
The pipeline will start with batchInc as a batch size, and increase the batch size for every subsequent batch by batchInc to accomodate data sources of different total sizes. The batch size will never be larger than maxBatchSize, though.
If the expected total size for this pipeline's data source is known, or can be determined easily, use NofBatches to influence the batch size.
func (*Pipeline) Source ¶
func (p *Pipeline) Source(source interface{})
Source sets the data source for this pipeline.
If source does not implement the Source interface, the pipeline uses reflection to create a proper source for arrays, slices, strings, or channels.
It is safe to call Source multiple times before Run or RunWithContext is called, in which case only the last call to Source is effective.
type Predicate ¶
type Predicate func(data interface{}) bool
A Predicate is a function that is passed a data batch and returns a boolean value.
In most cases, it will cast the data parameter to a specific slice type and check a predicate on each element of the slice.
type Receiver ¶
type Receiver func(seqNo int, data interface{}) (filteredData interface{})
A Receiver is called for every data batch, and returns a potentially modified data batch. The seqNo parameter indicates the order in which the data batch was encountered at the current pipeline's data source.
type Scanner ¶
Scanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches strings.
func NewScanner ¶
NewScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.
func (*Scanner) Data ¶
func (src *Scanner) Data() interface{}
Data implements the method of the Source interface.
type SingletonChan ¶ added in v1.1.0
type SingletonChan struct {
// contains filtered or unexported fields
}
SingletonChan is similar to a regular chan source, except it only accepts and passes through single elements instead of creating slices of elements from the input channel.
func NewSingletonChan ¶ added in v1.1.0
func NewSingletonChan(channel interface{}) *SingletonChan
NewSingletonChan returns a new SingletonChan to read from the given channel.
func (*SingletonChan) Data ¶ added in v1.1.0
func (src *SingletonChan) Data() interface{}
Data implements the method of the Source interface.
func (*SingletonChan) Err ¶ added in v1.1.0
func (src *SingletonChan) Err() error
Err implements the method of the Source interface.
func (*SingletonChan) Fetch ¶ added in v1.1.0
func (src *SingletonChan) Fetch(n int) (fetched int)
Fetch implements the method of the Source interface.
type Source ¶
type Source interface { // Err returns an error value or nil Err() error // Prepare receives a pipeline context and informs the pipeline what the // total expected size of all data batches is. The return value is -1 if the // total size is unknown or difficult to determine. Prepare(ctx context.Context) (size int) // Fetch gets a data batch of the requested size from the source. It returns // the size of the data batch that it was actually able to fetch. It returns // 0 if there is no more data to be fetched from the source; the pipeline // will then make no further attempts to fetch more elements. Fetch(size int) (fetched int) // Data returns the last fetched data batch. Data() interface{} }
A Source represents an object that can generate data batches for pipelines.