line

package
v0.0.0-...-5d6a5f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package line is a pipeline framework inspired by unix pipes and stream processing.

A pipeline is comprised of a producer, 0 or more transformers, and a consumer. The default producer is line-by-line reading of STDIN. The default consumer is a noop.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMapArgWrongShape = fmt.Errorf("a func of shape func([context,] <in>) (<out>,error) is required as the arg")

ErrMapArgWrongShape is the error returned when the func shape isn't correct.

View Source
var (
	// ErrNoErrsWaitGroup represents when the user has customized the errs channel but hasn't provided a waitgroup
	ErrNoErrsWaitGroup = fmt.Errorf("No sync.WaitGroup passed for errs channel draining")
)

Functions

func Consumer

func Consumer(in <-chan interface{}, errs chan<- error)

Consumer is the default consumer for the line.

func Noop

func Noop(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

Noop is the transform noop or passthrough

func NoopC

func NoopC(in <-chan interface{}, errs chan<- error)

NoopC is the noop consumer

func Stdin

func Stdin(out chan<- interface{}, errs chan<- error)

Stdin reads stdin and sends each line into the pipeline as a message.

func Stdout

func Stdout(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

Stdout prints out the message to standard out.

func StdoutC

func StdoutC(in <-chan interface{}, errs chan<- error)

StdoutC prints out the message to standard out.

Types

type Acker

type Acker interface {
	Ack()
}

Acker is something that can be "Ack"ed.

type Cfunc

type Cfunc func(<-chan interface{}, chan<- error)

Cfunc is the function signature for a Consumer jfunc.

type InlineTfunc

type InlineTfunc func(interface{}) (interface{}, error)

InlineTfunc is the function signature for a transformer func. The idea is to make writing an anonymous func right inline with the definition of the pipeline easier. It also provides a way to boil the essense of most transformers down to a pure single input single output func. That makes testing a lot easier as well.

type InlineTfuncContext

type InlineTfuncContext func(context.Context, interface{}) (interface{}, error)

InlineTfuncContext is InlineTfunc with Context support

type Line

type Line struct {
	// contains filtered or unexported fields
}

Line is the order of the steps in the pipe to make a pipeline.

func (*Line) Add

func (l *Line) Add(f ...Tfunc) Pipeline

Add will add a transformer to the pipeline.

func (*Line) AddContext

func (l *Line) AddContext(f ...TfuncContext) Pipeline

AddContext is like Add but with a context.Context

func (*Line) Embed

func (l *Line) Embed(parentIn <-chan interface{}, parentOut chan<- interface{}, parentErrs chan<- error)

Embed runs the whole pipeline as a transformer of a parent pipeline.

func (*Line) Filter

func (l *Line) Filter(fn interface{}) Pipeline

Filter is syntactic sugar around the Filter transformer

func (*Line) ForEach

func (l *Line) ForEach(fn interface{}) Pipeline

ForEach is syntactic sugar around the ForEach transformer

func (*Line) Map

func (l *Line) Map(fn interface{}) Pipeline

Map is syntactic sugar around the ForEach transformer

func (*Line) Run

func (l *Line) Run() error

Run runs the whole pipeline.

func (*Line) RunContext

func (l *Line) RunContext(ctx context.Context) error

RunContext runs the whole pipeline with context.Context.

func (*Line) SetC

func (l *Line) SetC(f Cfunc) Pipeline

SetC will add the consumer to the pipeline.

func (*Line) SetErrs

func (l *Line) SetErrs(errs chan<- error) Pipeline

SetErrs will set the errs channel to the pipeline. This can be used to hijack the errors behavior.

func (*Line) SetP

func (l *Line) SetP(f Pfunc) Pipeline

SetP will add the producer to the pipeline.

func (*Line) SetPContext

func (l *Line) SetPContext(f PfuncContext) Pipeline

SetPContext will add the context aware producer to the pipeline. This will override the Pfunc set with SetP.

type Pfunc

type Pfunc func(chan<- interface{}, chan<- error)

Pfunc is the function signature for a producer func.

type PfuncContext

type PfuncContext func(context.Context, chan<- interface{}, chan<- error)

PfuncContext is the function signature for a producer func.

type Pipeline

type Pipeline interface {
	SetP(Pfunc) Pipeline
	SetPContext(PfuncContext) Pipeline
	Add(...Tfunc) Pipeline
	AddContext(...TfuncContext) Pipeline
	Filter(interface{}) Pipeline
	ForEach(interface{}) Pipeline
	Map(interface{}) Pipeline
	SetC(Cfunc) Pipeline
	SetErrs(chan<- error) Pipeline
	Run() error
	RunContext(context.Context) error
	Embed(<-chan interface{}, chan<- interface{}, chan<- error) // act as a Tfunc
}

Pipeline defines what it takes to be a pipeline. This means you could write your own implementation of a pipeline (say a distributed one) and still be able to use all of the producers, consumers, and transformers that match these interfaces.

Example (Add)
package main

import (
	"bytes"
	"fmt"

	l "github.com/Reisender/pipe/line"
)

func main() {
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- bytes.NewBufferString("foo")
		}).

		// one-off add calls
		Add(l.Inline(func(m interface{}) (interface{}, error) {
			return m.(fmt.Stringer).String() + " bar", nil
		})).
		Add(l.Stdout).

		// execute the pipeline starting with the producer
		Run()
}
Output:

foo bar
Example (Add_combined)
package main

import (
	"bytes"
	"fmt"

	l "github.com/Reisender/pipe/line"
)

func main() {
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- bytes.NewBufferString("foo")
		}).

		// combined add call
		Add(
			l.Inline(func(m interface{}) (interface{}, error) {
				return m.(fmt.Stringer).String() + " bar", nil
			}),
			l.Stdout,
		).

		// execute the pipeline starting with the producer
		Run()
}
Output:

foo bar
Example (Embed)
package main

import (
	l "github.com/Reisender/pipe/line"
)

func main() {
	// define a sub-pipeline to be embedded
	// this one just prints messages out to stdout
	subPipeline := l.New().Add(l.Stdout)

	// setup and run the main pipeline
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- "foo from sub"
		}).
		Add(
			subPipeline.Embed, // embed it just like any other Tfunc
		).Run()
}
Output:

foo from sub
Example (New_w_chan)
package main

import (
	"bytes"

	l "github.com/Reisender/pipe/line"
)

func main() {
	out := make(chan interface{}, 1)
	out <- bytes.NewBufferString("foo")
	close(out)

	l.New(out).
		Add(l.Stdout).
		Run()
}
Output:

foo
Example (SetC)
package main

import (
	"bytes"
	"fmt"

	l "github.com/Reisender/pipe/line"
)

func main() {
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- bytes.NewBufferString("foo")
		}).
		SetC(func(in <-chan interface{}, errs chan<- error) {
			for msg := range in {
				fmt.Println(msg)
			}
		}).
		Run()
}
Output:

foo
Example (SetErrs)
package main

import (
	"fmt"
	"sync"

	l "github.com/Reisender/pipe/line"
)

func main() {
	// create your own errs channel
	// Run() will close it for you
	errs := make(chan error)

	// make a sync.WaitGroup to give us time to drain errs
	// before Run() returns
	var errsDrained sync.WaitGroup
	errsDrained.Add(1)

	// start reading from the errs channel
	// with your custom error channel reader
	go func() {
		defer errsDrained.Done() // indicate we are done draining the errs

		// drain the errs
		for e := range errs {
			fmt.Println(e)
		}
	}()

	// setup a pipeline with custom error channel handling
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			errs <- fmt.Errorf("foo error")
		}).
		SetErrs(errs).
		Run()

	close(errs)
	errsDrained.Wait()
}
Output:

foo error
Example (SetP)
package main

import (
	"bytes"

	l "github.com/Reisender/pipe/line"
)

func main() {
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- bytes.NewBufferString("foo")
		}).
		Add(l.Stdout).
		Run()
}
Output:

foo

func New

func New(in ...<-chan interface{}) Pipeline

New creates a new pipeline from the built-in line package.

type Tfunc

type Tfunc func(<-chan interface{}, chan<- interface{}, chan<- error)

Tfunc is the function signature for a transformer func.

func I

func I(it InlineTfunc) Tfunc

I is a convenience wrapper around Inline

func Inline

func Inline(it InlineTfunc) Tfunc

Inline wraps an InlineTfunc and returns a Tfunc. Most of the time, transformers will just range over the in channel and do stuff inside of the range and then send any errors off to the error channel. This func does that for you so you can just write a simpler transformer func. The parameter is the incoming message. The resulting interface{} is the outgoing message to be sent downstream. If nil is passed, no message will be sent downstream. If and error is returned, it will be sent down the errror channel.

Example
package main

import (
	"fmt"

	l "github.com/Reisender/pipe/line"
)

func main() {
	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- "foo"
		}).
		Add(
			l.Inline(func(m interface{}) (interface{}, error) {
				return fmt.Sprintf("inline func says: %s", m), nil
			}),
			l.Stdout,
		).Run()
}
Output:

inline func says: foo

func Many

func Many(t Tfunc, concurrency int) Tfunc

Many wraps a transformer to run it in multiple go routines.

Example
package main

import (
	"fmt"
	"sync/atomic"

	l "github.com/Reisender/pipe/line"
)

func main() {
	spinupCnt := uint32(0)

	l.New().
		Add(
			l.Many(func(in <-chan interface{}, out chan<- interface{}, errs chan<- error) {
				// there are two of these running concurrently
				atomic.AddUint32(&spinupCnt, 1)
				for m := range in {
					out <- m // passthrough
				}
			}, 2),
		).
		Run()

	fmt.Println(spinupCnt)
}
Output:

2

type TfuncContext

type TfuncContext func(context.Context, <-chan interface{}, chan<- interface{}, chan<- error)

TfuncContext is a Tfunc but one that takes a context.Context

func Filter

func Filter(fn interface{}) TfuncContext

Filter is a wrapper to Map for code readability

func ForEach

func ForEach(fn interface{}) TfuncContext

ForEach is a wrapper to Map for code readability

func InlineContext

func InlineContext(it InlineTfuncContext) TfuncContext

InlineContext wraps an InlineTfunc and returns a Tfunc.

Example
package main

import (
	"context"
	"fmt"

	l "github.com/Reisender/pipe/line"
)

func main() {
	type ctxKey string
	var key ctxKey = "somekey"
	outsideCtx, cancel := context.WithCancel(context.Background())
	outsideCtx = context.WithValue(outsideCtx, key, "Go")

	l.New().
		SetP(func(out chan<- interface{}, errs chan<- error) {
			out <- "foo"
			out <- "last"
			out <- "never"
		}).
		AddContext(
			l.InlineContext(func(ctx context.Context, m interface{}) (interface{}, error) {
				if m.(string) == "last" {
					// the InlineContext will stop if the context is Done()
					cancel() // cancel the context
				}
				return fmt.Sprintf(`inline func with context "%s" says: %s`, ctx.Value(key), m), nil
			}),
		).
		Add(l.Stdout).
		RunContext(outsideCtx) // be sure to run with context

}
Output:

inline func with context "Go" says: foo
inline func with context "Go" says: last

func ManyContext

func ManyContext(t TfuncContext, concurrency int) TfuncContext

ManyContext wraps a transformer to run it in multiple go routines.

Example
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	l "github.com/Reisender/pipe/line"
)

func main() {
	type ctxKey string
	var multiplyBy ctxKey = "multi"
	outsideCtx := context.WithValue(context.Background(), multiplyBy, uint32(10))

	spinupCnt := uint32(0)

	l.New().
		AddContext(
			l.ManyContext(func(ctx context.Context, in <-chan interface{}, out chan<- interface{}, errs chan<- error) {
				multiBy := ctx.Value(multiplyBy).(uint32)
				// there are two of these running concurrently
				atomic.AddUint32(&spinupCnt, 1*multiBy)
				for m := range in {
					out <- m // passthrough
				}
			}, 2),
		).
		RunContext(outsideCtx)

	fmt.Println(spinupCnt)
}
Output:

20

func Map

func Map(fn interface{}) TfuncContext

Map is the same as ForEach except that is also sends the resulting value on as the new value for this message. If a nil value is returned, no message will be pass along. The passed fund needs to be of the shape

func(<in>) (<out>, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL