Documentation ¶
Overview ¶
Package line is a pipeline framework inspired by unix pipes and stream processing.
A pipeline is comprised of a producer, 0 or more transformers, and a consumer. The default producer is line-by-line reading of STDIN. The default consumer is a noop.
Index ¶
- Variables
- func Consumer(in <-chan interface{}, errs chan<- error)
- func Noop(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
- func NoopC(in <-chan interface{}, errs chan<- error)
- func Stdin(out chan<- interface{}, errs chan<- error)
- func Stdout(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
- func StdoutC(in <-chan interface{}, errs chan<- error)
- type Acker
- type Cfunc
- type InlineTfunc
- type InlineTfuncContext
- type Line
- func (l *Line) Add(f ...Tfunc) Pipeline
- func (l *Line) AddContext(f ...TfuncContext) Pipeline
- func (l *Line) Embed(parentIn <-chan interface{}, parentOut chan<- interface{}, ...)
- func (l *Line) Filter(fn interface{}) Pipeline
- func (l *Line) ForEach(fn interface{}) Pipeline
- func (l *Line) Map(fn interface{}) Pipeline
- func (l *Line) Run() error
- func (l *Line) RunContext(ctx context.Context) error
- func (l *Line) SetC(f Cfunc) Pipeline
- func (l *Line) SetErrs(errs chan<- error) Pipeline
- func (l *Line) SetP(f Pfunc) Pipeline
- func (l *Line) SetPContext(f PfuncContext) Pipeline
- type Pfunc
- type PfuncContext
- type Pipeline
- type Tfunc
- type TfuncContext
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrMapArgWrongShape = fmt.Errorf("a func of shape func([context,] <in>) (<out>,error) is required as the arg")
ErrMapArgWrongShape is the error returned when the func shape isn't correct.
var ( // ErrNoErrsWaitGroup represents when the user has customized the errs channel but hasn't provided a waitgroup ErrNoErrsWaitGroup = fmt.Errorf("No sync.WaitGroup passed for errs channel draining") )
Functions ¶
func Consumer ¶
func Consumer(in <-chan interface{}, errs chan<- error)
Consumer is the default consumer for the line.
func Noop ¶
func Noop(in <-chan interface{}, out chan<- interface{}, errs chan<- error)
Noop is the transform noop or passthrough
func Stdin ¶
func Stdin(out chan<- interface{}, errs chan<- error)
Stdin reads stdin and sends each line into the pipeline as a message.
Types ¶
type Cfunc ¶
type Cfunc func(<-chan interface{}, chan<- error)
Cfunc is the function signature for a Consumer jfunc.
type InlineTfunc ¶
type InlineTfunc func(interface{}) (interface{}, error)
InlineTfunc is the function signature for a transformer func. The idea is to make writing an anonymous func right inline with the definition of the pipeline easier. It also provides a way to boil the essense of most transformers down to a pure single input single output func. That makes testing a lot easier as well.
type InlineTfuncContext ¶
InlineTfuncContext is InlineTfunc with Context support
type Line ¶
type Line struct {
// contains filtered or unexported fields
}
Line is the order of the steps in the pipe to make a pipeline.
func (*Line) AddContext ¶
func (l *Line) AddContext(f ...TfuncContext) Pipeline
AddContext is like Add but with a context.Context
func (*Line) Embed ¶
func (l *Line) Embed(parentIn <-chan interface{}, parentOut chan<- interface{}, parentErrs chan<- error)
Embed runs the whole pipeline as a transformer of a parent pipeline.
func (*Line) RunContext ¶
RunContext runs the whole pipeline with context.Context.
func (*Line) SetErrs ¶
SetErrs will set the errs channel to the pipeline. This can be used to hijack the errors behavior.
func (*Line) SetPContext ¶
func (l *Line) SetPContext(f PfuncContext) Pipeline
SetPContext will add the context aware producer to the pipeline. This will override the Pfunc set with SetP.
type Pfunc ¶
type Pfunc func(chan<- interface{}, chan<- error)
Pfunc is the function signature for a producer func.
type PfuncContext ¶
PfuncContext is the function signature for a producer func.
type Pipeline ¶
type Pipeline interface { SetP(Pfunc) Pipeline SetPContext(PfuncContext) Pipeline Add(...Tfunc) Pipeline AddContext(...TfuncContext) Pipeline Filter(interface{}) Pipeline ForEach(interface{}) Pipeline Map(interface{}) Pipeline SetC(Cfunc) Pipeline SetErrs(chan<- error) Pipeline Run() error RunContext(context.Context) error Embed(<-chan interface{}, chan<- interface{}, chan<- error) // act as a Tfunc }
Pipeline defines what it takes to be a pipeline. This means you could write your own implementation of a pipeline (say a distributed one) and still be able to use all of the producers, consumers, and transformers that match these interfaces.
Example (Add) ¶
package main import ( "bytes" "fmt" l "github.com/Reisender/pipe/line" ) func main() { l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("foo") }). // one-off add calls Add(l.Inline(func(m interface{}) (interface{}, error) { return m.(fmt.Stringer).String() + " bar", nil })). Add(l.Stdout). // execute the pipeline starting with the producer Run() }
Output: foo bar
Example (Add_combined) ¶
package main import ( "bytes" "fmt" l "github.com/Reisender/pipe/line" ) func main() { l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("foo") }). // combined add call Add( l.Inline(func(m interface{}) (interface{}, error) { return m.(fmt.Stringer).String() + " bar", nil }), l.Stdout, ). // execute the pipeline starting with the producer Run() }
Output: foo bar
Example (Embed) ¶
package main import ( l "github.com/Reisender/pipe/line" ) func main() { // define a sub-pipeline to be embedded // this one just prints messages out to stdout subPipeline := l.New().Add(l.Stdout) // setup and run the main pipeline l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- "foo from sub" }). Add( subPipeline.Embed, // embed it just like any other Tfunc ).Run() }
Output: foo from sub
Example (New_w_chan) ¶
package main import ( "bytes" l "github.com/Reisender/pipe/line" ) func main() { out := make(chan interface{}, 1) out <- bytes.NewBufferString("foo") close(out) l.New(out). Add(l.Stdout). Run() }
Output: foo
Example (SetC) ¶
package main import ( "bytes" "fmt" l "github.com/Reisender/pipe/line" ) func main() { l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("foo") }). SetC(func(in <-chan interface{}, errs chan<- error) { for msg := range in { fmt.Println(msg) } }). Run() }
Output: foo
Example (SetErrs) ¶
package main import ( "fmt" "sync" l "github.com/Reisender/pipe/line" ) func main() { // create your own errs channel // Run() will close it for you errs := make(chan error) // make a sync.WaitGroup to give us time to drain errs // before Run() returns var errsDrained sync.WaitGroup errsDrained.Add(1) // start reading from the errs channel // with your custom error channel reader go func() { defer errsDrained.Done() // indicate we are done draining the errs // drain the errs for e := range errs { fmt.Println(e) } }() // setup a pipeline with custom error channel handling l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { errs <- fmt.Errorf("foo error") }). SetErrs(errs). Run() close(errs) errsDrained.Wait() }
Output: foo error
Example (SetP) ¶
package main import ( "bytes" l "github.com/Reisender/pipe/line" ) func main() { l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- bytes.NewBufferString("foo") }). Add(l.Stdout). Run() }
Output: foo
type Tfunc ¶
type Tfunc func(<-chan interface{}, chan<- interface{}, chan<- error)
Tfunc is the function signature for a transformer func.
func Inline ¶
func Inline(it InlineTfunc) Tfunc
Inline wraps an InlineTfunc and returns a Tfunc. Most of the time, transformers will just range over the in channel and do stuff inside of the range and then send any errors off to the error channel. This func does that for you so you can just write a simpler transformer func. The parameter is the incoming message. The resulting interface{} is the outgoing message to be sent downstream. If nil is passed, no message will be sent downstream. If and error is returned, it will be sent down the errror channel.
Example ¶
package main import ( "fmt" l "github.com/Reisender/pipe/line" ) func main() { l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- "foo" }). Add( l.Inline(func(m interface{}) (interface{}, error) { return fmt.Sprintf("inline func says: %s", m), nil }), l.Stdout, ).Run() }
Output: inline func says: foo
func Many ¶
Many wraps a transformer to run it in multiple go routines.
Example ¶
package main import ( "fmt" "sync/atomic" l "github.com/Reisender/pipe/line" ) func main() { spinupCnt := uint32(0) l.New(). Add( l.Many(func(in <-chan interface{}, out chan<- interface{}, errs chan<- error) { // there are two of these running concurrently atomic.AddUint32(&spinupCnt, 1) for m := range in { out <- m // passthrough } }, 2), ). Run() fmt.Println(spinupCnt) }
Output: 2
type TfuncContext ¶
TfuncContext is a Tfunc but one that takes a context.Context
func Filter ¶
func Filter(fn interface{}) TfuncContext
Filter is a wrapper to Map for code readability
func ForEach ¶
func ForEach(fn interface{}) TfuncContext
ForEach is a wrapper to Map for code readability
func InlineContext ¶
func InlineContext(it InlineTfuncContext) TfuncContext
InlineContext wraps an InlineTfunc and returns a Tfunc.
Example ¶
package main import ( "context" "fmt" l "github.com/Reisender/pipe/line" ) func main() { type ctxKey string var key ctxKey = "somekey" outsideCtx, cancel := context.WithCancel(context.Background()) outsideCtx = context.WithValue(outsideCtx, key, "Go") l.New(). SetP(func(out chan<- interface{}, errs chan<- error) { out <- "foo" out <- "last" out <- "never" }). AddContext( l.InlineContext(func(ctx context.Context, m interface{}) (interface{}, error) { if m.(string) == "last" { // the InlineContext will stop if the context is Done() cancel() // cancel the context } return fmt.Sprintf(`inline func with context "%s" says: %s`, ctx.Value(key), m), nil }), ). Add(l.Stdout). RunContext(outsideCtx) // be sure to run with context }
Output: inline func with context "Go" says: foo inline func with context "Go" says: last
func ManyContext ¶
func ManyContext(t TfuncContext, concurrency int) TfuncContext
ManyContext wraps a transformer to run it in multiple go routines.
Example ¶
package main import ( "context" "fmt" "sync/atomic" l "github.com/Reisender/pipe/line" ) func main() { type ctxKey string var multiplyBy ctxKey = "multi" outsideCtx := context.WithValue(context.Background(), multiplyBy, uint32(10)) spinupCnt := uint32(0) l.New(). AddContext( l.ManyContext(func(ctx context.Context, in <-chan interface{}, out chan<- interface{}, errs chan<- error) { multiBy := ctx.Value(multiplyBy).(uint32) // there are two of these running concurrently atomic.AddUint32(&spinupCnt, 1*multiBy) for m := range in { out <- m // passthrough } }, 2), ). RunContext(outsideCtx) fmt.Println(spinupCnt) }
Output: 20
func Map ¶
func Map(fn interface{}) TfuncContext
Map is the same as ForEach except that is also sends the resulting value on as the new value for this message. If a nil value is returned, no message will be pass along. The passed fund needs to be of the shape
func(<in>) (<out>, error)