Documentation
¶
Overview ¶
Package pipeline provides a simplistic implementation of pipelines as outlined in https://blog.golang.org/pipelines
Example ¶
package main import ( "fmt" "github.com/hyfather/pipeline" ) func printStage(inObj interface{}) interface{} { fmt.Println(inObj) return inObj } func squareStage(inObj interface{}) interface{} { if v, ok := inObj.(int); ok { return v * v } return nil } var pipelineChan chan interface{} func main() { p := pipeline.New() p.AddStageWithFanOut(squareStage, 1) p.AddStage(printStage) pipelineChan = make(chan interface{}, 10) pipelineChan <- 2 pipelineChan <- 3 close(pipelineChan) <-p.Run(pipelineChan) }
Output: 4 9
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MergeChannels ¶
func MergeChannels(inChans []chan interface{}) (outChan chan interface{})
MergeChannels merges an array of channels into a single channel. This utility function can also be used independently outside of a pipeline.
Example ¶
package main import ( "fmt" "github.com/hyfather/pipeline" "sort" ) func main() { inChan1 := make(chan interface{}, 10) inChan2 := make(chan interface{}, 10) inChan1 <- 1 inChan1 <- 3 inChan1 <- 5 inChan1 <- 7 inChan1 <- 9 close(inChan1) inChan2 <- 2 inChan2 <- 4 inChan2 <- 6 inChan2 <- 8 inChan2 <- 10 close(inChan2) outChan := pipeline.MergeChannels([]chan interface{}{inChan1, inChan2}) var ints []int for e := range outChan { ints = append(ints, e.(int)) } sort.Ints(ints) fmt.Println(ints) }
Output: [1 2 3 4 5 6 7 8 9 10]
Types ¶
type Pipeline ¶
type Pipeline []StageFn
Pipeline type defines a pipeline to which processing "stages" can be added and configured to fan-out. Pipelines are meant to be long running as they continuously process data as it comes in.
A pipeline can be simultaneously run multiple times with different input channels by invoking the Run() method multiple times. A running pipeline shouldn't be copied.
func (*Pipeline) AddRawStage ¶
AddRawStage simply adds a StageFn type to the pipeline without any further processing or parsing. This is meant for extensibility and customizations.
func (*Pipeline) AddStage ¶
AddStage is a convenience method for adding a stage with fanSize = 1. See AddStageWithFanOut for more information.
func (*Pipeline) AddStageWithFanOut ¶
AddStageWithFanOut adds a parallel fan-out ProcessFn to the pipeline. The fanSize number indicates how many instances of this stage will read from the previous stage and process the data flowing through simultaneously to take advantage of parallel CPU scheduling.
Most pipelines will have multiple stages, and the order in which AddStage() and AddStageWithFanOut() is invoked matters -- the first invocation indicates the first stage and so forth.
Since discrete goroutines process the inChan for FanOut > 1, the order of objects flowing through the FanOut stages can't be guaranteed.
func (*Pipeline) Run ¶
func (p *Pipeline) Run(inChan <-chan interface{}) (doneChan chan struct{})
Run starts the pipeline with all the stages that have been added. Run is not a blocking function and will return immediately with a doneChan. Consumers can wait on the doneChan for an indication of when the pipeline has completed processing.
The pipeline runs until its `inChan` channel is open. Once the `inChan` is closed, the pipeline stages will sequentially complete from the first stage to the last. Once all stages are complete, the last outChan is drained and the doneChan is closed.
Run() can be invoked multiple times to start multiple instances of a pipeline that will typically process different incoming channels.
type ProcessFn ¶
type ProcessFn func(inObj interface{}) (outObj interface{})
ProcessFn are the primary function types defined by users of this package and passed in to instantiate a meaningful pipeline.
type StageFn ¶
type StageFn func(inChan <-chan interface{}) (outChan chan interface{})
StageFn is a lower level function type that chains together multiple stages using channels.