Documentation ¶
Overview ¶
Package pipelines exposes a generic implementation of asynchronous pipelines.
A pipeline is defined by: * an input channel of some type: <-chan IN * an output channel of some type: chan <- OUT * an optional outgoing out-of-band notification bus channel: chan <- BUS * a runner function func(context.CONTEXT, <- chan IN, chan <- OUT, chan <- BUS)
This implementation puts forward type safety when connecting channels of different types with each other: the consistency of your pipeline scaffolding should be ensured at build time.
Error handling ¶
Pipelines are not akin to "promises" as there is no handling of the rejected state. Blocking errors should be handled by having the runner return an error. Non-blocking errors should be handled by sending some notification to the bus channel.
The package allows to define and manipulate pipeline with different characteristics: * Pipeline is a plain IN/OUT pipeline * FeederPipeline is a pipeline that only produces output (e.g. from some other non-channel input, such as a io.Reader or DB records) * CollectorPipeline is a pipeline that only consumes input (e.g. it produces some final non-channel output) * FanInPipeline implements a (many IN -> single OUT) fan-in pattern * FanOutPipeline implements a (single IN -> many OUT) fan-out pattern * JoinerPipeline provides a means to implement a 2-way join pattern: (IN, OTHER) -> OUT. * ChainedPipeline provides a means to declare pipelines easily using the BeginsWith, Then and Finally builder methods.
Index ¶
- type BusCollector
- func (c BusCollector) Bus() chan BUS
- func (c BusCollector) Input() chan IN
- func (c BusCollector) Output() chan OUT
- func (c BusCollector) RunWithContext(ctx context.Context) func() error
- func (c BusCollector) SetBus(bus chan BUS)
- func (c BusCollector) SetInput(in chan IN)
- func (c BusCollector) SetOutput(out chan OUT)
- func (p *BusCollector[BUS]) WithBusListener(listener BusListener[BUS]) *BusCollector[BUS]
- func (p *BusCollector[BUS]) WithInputs(inputs ...<-chan BUS) *BusCollector[BUS]
- func (p *BusCollector[BUS]) WithInputsFrom(busers ...Buser[BUS]) *BusCollector[BUS]
- type BusListener
- type Buser
- type ChainedPipeline
- func (c ChainedPipeline) Bus() chan BUS
- func (p *ChainedPipeline[IN, OUT, BUS]) Finally(end *CollectorPipeline[OUT, BUS]) *FinalChainedPipeline[OUT, BUS]
- func (c ChainedPipeline) Input() chan IN
- func (c ChainedPipeline) Output() chan OUT
- func (p *ChainedPipeline[IN, OUT, BUS]) RunWithContext(ctx context.Context) func() error
- func (c ChainedPipeline) SetBus(bus chan BUS)
- func (c ChainedPipeline) SetInput(in chan IN)
- func (c ChainedPipeline) SetOutput(out chan OUT)
- func (p *ChainedPipeline[IN, OUT, BUS]) WithInputFrom(producer Producer[IN]) *ChainedPipeline[IN, OUT, BUS]
- func (p *ChainedPipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *ChainedPipeline[IN, OUT, BUS]
- type Collection
- func (c *Collection[BUS]) Add(piped ...Runnable[BUS])
- func (c *Collection[BUS]) AddBusCollector(collector *BusCollector[BUS])
- func (c *Collection[BUS]) AddNamed(name string, piped Runnable[BUS])
- func (c *Collection[BUS]) AddNamedBusCollector(name string, collector *BusCollector[BUS])
- func (c *Collection[BUS]) AllRunnables() []Runnable[BUS]
- func (c *Collection[BUS]) AllRunnersWithContext(ctx context.Context) []func() error
- func (c *Collection[BUS]) Append(piped ...Runnable[BUS]) *Collection[BUS]
- func (c *Collection[BUS]) AppendNamed(name string, piped Runnable[BUS]) *Collection[BUS]
- func (c *Collection[BUS]) Lookup(name string) (NamedRunnable[BUS], bool)
- func (c *Collection[BUS]) RunInGroup(ctx context.Context, group *errgroup.Group)
- type Collector
- type CollectorPipeline
- func (c CollectorPipeline) Bus() chan BUS
- func (c CollectorPipeline) Input() chan IN
- func (c CollectorPipeline) Output() chan OUT
- func (c CollectorPipeline) RunWithContext(ctx context.Context) func() error
- func (c CollectorPipeline) SetBus(bus chan BUS)
- func (c CollectorPipeline) SetInput(in chan IN)
- func (c CollectorPipeline) SetOutput(out chan OUT)
- func (p *CollectorPipeline[IN, BUS]) WithBus(bus chan BUS) *CollectorPipeline[IN, BUS]
- func (p *CollectorPipeline[IN, BUS]) WithCollector(collector Collector[IN, BUS]) *CollectorPipeline[IN, BUS]
- func (p *CollectorPipeline[IN, BUS]) WithInput(in chan IN) *CollectorPipeline[IN, BUS]
- func (p *CollectorPipeline[IN, BUS]) WithInputFrom(producer Producer[IN]) *CollectorPipeline[IN, BUS]
- type Consumer
- type FanHook
- type FanInOption
- type FanInPipeline
- func (c FanInPipeline) Bus() chan BUS
- func (c FanInPipeline) Input() chan IN
- func (c FanInPipeline) Output() chan OUT
- func (c FanInPipeline) RunWithContext(ctx context.Context) func() error
- func (c FanInPipeline) SetBus(bus chan BUS)
- func (c FanInPipeline) SetInput(in chan IN)
- func (p *FanInPipeline[INOUT, BUS]) SetInputs(inputs ...<-chan INOUT)
- func (c FanInPipeline) SetOutput(out chan OUT)
- func (p *FanInPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanInPipeline[INOUT, BUS]
- func (p *FanInPipeline[INOUT, BUS]) WithFanInFrom(producers ...Producer[INOUT]) *FanInPipeline[INOUT, BUS]
- func (p *FanInPipeline[INOUT, BUS]) WithFanInOptions(opts ...FanInOption[INOUT, BUS]) *FanInPipeline[INOUT, BUS]
- func (p *FanInPipeline[INOUT, BUS]) WithInputs(inputs ...<-chan INOUT) *FanInPipeline[INOUT, BUS]
- func (p *FanInPipeline[INOUT, BUS]) WithOutput(out chan INOUT) *FanInPipeline[INOUT, BUS]
- func (p *FanInPipeline[INOUT, BUS]) WithOutputTo(consumer SettableConsumer[INOUT]) *FanInPipeline[INOUT, BUS]
- type FanOutOption
- type FanOutPipeline
- func (c FanOutPipeline) Bus() chan BUS
- func (c FanOutPipeline) Input() chan IN
- func (c FanOutPipeline) Output() chan OUT
- func (c FanOutPipeline) RunWithContext(ctx context.Context) func() error
- func (c FanOutPipeline) SetBus(bus chan BUS)
- func (p *FanOutPipeline[INOUT, BUS]) SetInput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) SetOutput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) WithFanOutOptions(opts ...FanOutOption[INOUT, BUS]) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) WithFanOutTo(consumers ...Consumer[INOUT]) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) WithInput(in chan INOUT) *FanOutPipeline[INOUT, BUS]
- func (p *FanOutPipeline[INOUT, BUS]) WithInputFrom(producer Producer[INOUT]) *FanOutPipeline[INOUT, BUS]
- type Feeder
- type FeederPipeline
- func (c FeederPipeline) Bus() chan BUS
- func (c FeederPipeline) Input() chan IN
- func (c FeederPipeline) Output() chan OUT
- func (c FeederPipeline) RunWithContext(ctx context.Context) func() error
- func (c FeederPipeline) SetBus(bus chan BUS)
- func (c FeederPipeline) SetInput(in chan IN)
- func (c FeederPipeline) SetOutput(out chan OUT)
- func (p *FeederPipeline[OUT, BUS]) WithBus(bus chan BUS) *FeederPipeline[OUT, BUS]
- func (p *FeederPipeline[OUT, BUS]) WithFeeder(feeder Feeder[OUT, BUS]) *FeederPipeline[OUT, BUS]
- func (p *FeederPipeline[OUT, BUS]) WithOutput(out chan OUT) *FeederPipeline[OUT, BUS]
- type FinalChainedPipeline
- func (p *FinalChainedPipeline[OUT, BUS]) Bus() chan BUS
- func (p *FinalChainedPipeline[OUT, BUS]) Input() chan OUT
- func (p *FinalChainedPipeline[OUT, BUS]) RunWithContext(ctx context.Context) func() error
- func (p *FinalChainedPipeline[OUT, BUS]) SetInput(in chan OUT)
- func (p *FinalChainedPipeline[OUT, BUS]) WithInput(in chan OUT) *FinalChainedPipeline[OUT, BUS]
- type InitialChainedPipeline
- func (p *InitialChainedPipeline[OUT, BUS]) BeginsWith(start *FeederPipeline[OUT, BUS]) *ChainedPipeline[dummy, OUT, BUS]
- func (p *InitialChainedPipeline[OUT, BUS]) Bus() chan BUS
- func (p *InitialChainedPipeline[OUT, BUS]) Output() chan OUT
- func (p *InitialChainedPipeline[OUT, BUS]) RunWithContext(ctx context.Context) func() error
- func (p *InitialChainedPipeline[OUT, BUS]) SetOutput(out chan OUT)
- func (p *InitialChainedPipeline[OUT, BUS]) WithOutput(out chan OUT) *InitialChainedPipeline[OUT, BUS]
- type Joiner
- type JoinerPipeline
- func (c JoinerPipeline) Bus() chan BUS
- func (c JoinerPipeline) Input() chan IN
- func (c JoinerPipeline) Output() chan OUT
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) RunWithContext(ctx context.Context) func() error
- func (c JoinerPipeline) SetBus(bus chan BUS)
- func (c JoinerPipeline) SetInput(in chan IN)
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) SetOther(other chan OTHER)
- func (c JoinerPipeline) SetOutput(out chan OUT)
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithBus(bus chan BUS) *JoinerPipeline[IN, OTHER, OUT, BUS]
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputs(in chan IN, other chan OTHER) *JoinerPipeline[IN, OTHER, OUT, BUS]
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputsFrom(producer Producer[IN], otherProducer Producer[OTHER]) *JoinerPipeline[IN, OTHER, OUT, BUS]
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithJoiner(joiner Joiner[IN, OTHER, OUT, BUS]) *JoinerPipeline[IN, OTHER, OUT, BUS]
- func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithOutput(out chan OUT) *JoinerPipeline[IN, OTHER, OUT, BUS]
- type NOBUS
- type NOBUSCHAN
- type NOINPUT
- type NOOUTPUT
- type NamedRunnable
- type Option
- type Pipeline
- func (c Pipeline) Bus() chan BUS
- func (c Pipeline) Input() chan IN
- func (c Pipeline) Output() chan OUT
- func (c Pipeline) RunWithContext(ctx context.Context) func() error
- func (c Pipeline) SetBus(bus chan BUS)
- func (c Pipeline) SetInput(in chan IN)
- func (c Pipeline) SetOutput(out chan OUT)
- func (p *Pipeline[IN, OUT, BUS]) WithBus(bus chan BUS) *Pipeline[IN, OUT, BUS]
- func (p *Pipeline[IN, OUT, BUS]) WithInput(in chan IN) *Pipeline[IN, OUT, BUS]
- func (p *Pipeline[IN, OUT, BUS]) WithInputFrom(producer Producer[IN]) *Pipeline[IN, OUT, BUS]
- func (p *Pipeline[IN, OUT, BUS]) WithOutput(out chan OUT) *Pipeline[IN, OUT, BUS]
- func (p *Pipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *Pipeline[IN, OUT, BUS]
- func (p *Pipeline[IN, OUT, BUS]) WithRunner(runner Runner[IN, OUT, BUS]) *Pipeline[IN, OUT, BUS]
- type Producer
- type Runnable
- type Runner
- type SettableConsumer
- type SettableProducer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BusCollector ¶
type BusCollector[BUS any] struct { // contains filtered or unexported fields }
BusCollector is a fan-in collector on many BUS channels, to listen on all bus notifications from other pipelines.
func NewBusCollector ¶
func NewBusCollector[BUS any](opts ...Option) *BusCollector[BUS]
NewBusCollector builds a BusCollector to listen on bus channels.
func (BusCollector) RunWithContext ¶
func (*BusCollector[BUS]) WithBusListener ¶
func (p *BusCollector[BUS]) WithBusListener(listener BusListener[BUS]) *BusCollector[BUS]
func (*BusCollector[BUS]) WithInputs ¶
func (p *BusCollector[BUS]) WithInputs(inputs ...<-chan BUS) *BusCollector[BUS]
func (*BusCollector[BUS]) WithInputsFrom ¶
func (p *BusCollector[BUS]) WithInputsFrom(busers ...Buser[BUS]) *BusCollector[BUS]
type BusListener ¶
BusListener defines a special collector for BUS entries.
type ChainedPipeline ¶
ChainedPipeline allows to construct chains of asynchronous pipelines with the Then idiom.
The input of the chain is specified with "WithInput".
The whole chain may run asynchronously with RunWithContext(), which blocks until all the elements in the chain are done.
The input of a ChainedPipeline corresponds to the input of its first pipeline. The output of a ChainedPipeline corresponds to the output of its last pipeline.
func NewChained ¶
func NewChained[IN any, OUT any, BUS any](opts ...Option) *ChainedPipeline[IN, OUT, BUS]
func Then ¶
func Then[IN any, OUT any, CHAINEDOUT any, BUS any](current *ChainedPipeline[IN, OUT, BUS], next *Pipeline[OUT, CHAINEDOUT, BUS]) *ChainedPipeline[IN, CHAINEDOUT, BUS]
Then allows to chain outputs by declaring explicit types.
Notice that unfortunately, we can't make this a fluent method of ChainedPipeline as of go1.18.
func (*ChainedPipeline[IN, OUT, BUS]) Finally ¶
func (p *ChainedPipeline[IN, OUT, BUS]) Finally(end *CollectorPipeline[OUT, BUS]) *FinalChainedPipeline[OUT, BUS]
Finally terminates a chained pipeline with some final collector pipeline.
func (*ChainedPipeline[IN, OUT, BUS]) RunWithContext ¶
func (p *ChainedPipeline[IN, OUT, BUS]) RunWithContext(ctx context.Context) func() error
RunWithContext executes all the runners of a chain of pipelines as goroutines.
The execution is interrupted if the context is cancelled.
It blocks until all inner runners exit.
func (*ChainedPipeline[IN, OUT, BUS]) WithInputFrom ¶
func (p *ChainedPipeline[IN, OUT, BUS]) WithInputFrom(producer Producer[IN]) *ChainedPipeline[IN, OUT, BUS]
WithInputFrom sets the the input channel for this chain.
func (*ChainedPipeline[IN, OUT, BUS]) WithOutputTo ¶
func (p *ChainedPipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *ChainedPipeline[IN, OUT, BUS]
WithOutputTo sets the input of the given consumer to the output of the current chain.
type Collection ¶
type Collection[BUS any] struct { // contains filtered or unexported fields }
Collection is a chained list of named runnable pipelines with some common BUS type to handle out-of-band notifications.
func NewCollection ¶
func NewCollection[BUS any]() *Collection[BUS]
func (*Collection[BUS]) AddBusCollector ¶
func (c *Collection[BUS]) AddBusCollector(collector *BusCollector[BUS])
AddBusCollector adds a BusCollector pipeline connected to all the bus channels published by the pipelines currently in the collection.
func (*Collection[BUS]) AddNamed ¶
func (c *Collection[BUS]) AddNamed(name string, piped Runnable[BUS])
func (*Collection[BUS]) AddNamedBusCollector ¶
func (c *Collection[BUS]) AddNamedBusCollector(name string, collector *BusCollector[BUS])
func (*Collection[BUS]) AllRunnables ¶
func (c *Collection[BUS]) AllRunnables() []Runnable[BUS]
AllRunnables returns all the runnable pipelines in the collection.
func (*Collection[BUS]) AllRunnersWithContext ¶
AllRunnersWithContext returns all runners as a collection of func() error suitable to execute in some errgroup.Group.
func (*Collection[BUS]) Append ¶
func (c *Collection[BUS]) Append(piped ...Runnable[BUS]) *Collection[BUS]
func (*Collection[BUS]) AppendNamed ¶
func (c *Collection[BUS]) AppendNamed(name string, piped Runnable[BUS]) *Collection[BUS]
func (*Collection[BUS]) Lookup ¶
func (c *Collection[BUS]) Lookup(name string) (NamedRunnable[BUS], bool)
Lookup in a collection for a given pipeline by its name.
func (*Collection[BUS]) RunInGroup ¶
RunInGroup starts the pipeline inside some provided errgroup.Group.
The caller is responsible for waiting on the group: RunInGroup only launches all runners as goroutines.
The context passed should be the context produced when creating the group.
type Collector ¶
Collector is the input-only function executed inside a CollectorPipeline.
The collector SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.
type CollectorPipeline ¶
CollectorPipeline is a dead-end pipeline, only consuming from input and producing not output channel.
This is a termination point in your pipeline dependency graph.
func NewCollector ¶
func NewCollector[IN any, BUS any](opts ...Option) *CollectorPipeline[IN, BUS]
NewCollector builds a CollectorPipeline that reads from a channel of type IN.
func (CollectorPipeline) RunWithContext ¶
func (*CollectorPipeline[IN, BUS]) WithBus ¶
func (p *CollectorPipeline[IN, BUS]) WithBus(bus chan BUS) *CollectorPipeline[IN, BUS]
func (*CollectorPipeline[IN, BUS]) WithCollector ¶
func (p *CollectorPipeline[IN, BUS]) WithCollector(collector Collector[IN, BUS]) *CollectorPipeline[IN, BUS]
func (*CollectorPipeline[IN, BUS]) WithInput ¶
func (p *CollectorPipeline[IN, BUS]) WithInput(in chan IN) *CollectorPipeline[IN, BUS]
func (*CollectorPipeline[IN, BUS]) WithInputFrom ¶
func (p *CollectorPipeline[IN, BUS]) WithInputFrom(producer Producer[IN]) *CollectorPipeline[IN, BUS]
type Consumer ¶
type Consumer[IN any] interface { Input() chan IN }
Consumer knows what is the input channel.
type FanHook ¶
FanHook is a hook function executed by the FanInPipeline or the FanOutPipeline runner.
type FanInOption ¶
FanInOption alters the behavior of the fan-in runner.
func WithFanInHooks ¶
func WithFanInHooks[INOUT any, BUS any](hooks ...FanHook[INOUT, BUS]) FanInOption[INOUT, BUS]
type FanInPipeline ¶
FanInPipeline is a pipeline that gathers several input channels into one single ouput.
The FanInPipeline does not transform the data type.
There is a default runner implemented, with a few FanIn options available to introduce hooks, which could be used for example to: * collect metrics and logging * apply some data transform
func (FanInPipeline) RunWithContext ¶
func (*FanInPipeline[INOUT, BUS]) SetInputs ¶
func (p *FanInPipeline[INOUT, BUS]) SetInputs(inputs ...<-chan INOUT)
func (*FanInPipeline[INOUT, BUS]) WithBus ¶
func (p *FanInPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanInPipeline[INOUT, BUS]
func (*FanInPipeline[INOUT, BUS]) WithFanInFrom ¶
func (p *FanInPipeline[INOUT, BUS]) WithFanInFrom(producers ...Producer[INOUT]) *FanInPipeline[INOUT, BUS]
func (*FanInPipeline[INOUT, BUS]) WithFanInOptions ¶
func (p *FanInPipeline[INOUT, BUS]) WithFanInOptions(opts ...FanInOption[INOUT, BUS]) *FanInPipeline[INOUT, BUS]
func (*FanInPipeline[INOUT, BUS]) WithInputs ¶
func (p *FanInPipeline[INOUT, BUS]) WithInputs(inputs ...<-chan INOUT) *FanInPipeline[INOUT, BUS]
func (*FanInPipeline[INOUT, BUS]) WithOutput ¶
func (p *FanInPipeline[INOUT, BUS]) WithOutput(out chan INOUT) *FanInPipeline[INOUT, BUS]
func (*FanInPipeline[INOUT, BUS]) WithOutputTo ¶
func (p *FanInPipeline[INOUT, BUS]) WithOutputTo(consumer SettableConsumer[INOUT]) *FanInPipeline[INOUT, BUS]
WithOutputTo alters the provided consumer to set its input to the current pipeline's output.
type FanOutPipeline ¶
FanOutPipeline takes one input and duplicate the input messages to several output channels.
FanOutPipeline does not change the data type.
There is a default runner implemented, with a few options available to introduce hooks, which could be used to: * collect metrics & logging * cloning output messages if they pass references that are mutated downstream
func (FanOutPipeline) RunWithContext ¶
func (*FanOutPipeline[INOUT, BUS]) SetInput ¶
func (p *FanOutPipeline[INOUT, BUS]) SetInput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) SetOutput ¶
func (p *FanOutPipeline[INOUT, BUS]) SetOutput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) WithBus ¶
func (p *FanOutPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) WithFanOutOptions ¶
func (p *FanOutPipeline[INOUT, BUS]) WithFanOutOptions(opts ...FanOutOption[INOUT, BUS]) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) WithFanOutTo ¶
func (p *FanOutPipeline[INOUT, BUS]) WithFanOutTo(consumers ...Consumer[INOUT]) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) WithInput ¶
func (p *FanOutPipeline[INOUT, BUS]) WithInput(in chan INOUT) *FanOutPipeline[INOUT, BUS]
func (*FanOutPipeline[INOUT, BUS]) WithInputFrom ¶
func (p *FanOutPipeline[INOUT, BUS]) WithInputFrom(producer Producer[INOUT]) *FanOutPipeline[INOUT, BUS]
type Feeder ¶
Feeder is the output-only function executed inside a FeederPipeline.
The feeder SHOULD relinquish resources and exit when the context is cancelled.
If the feeder takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).
type FeederPipeline ¶
FeederPipeline is a pipeline that only outputs, with a message generating Feeder method.
A FeederPipeline is a starting point in your pipeline dependency graph. TODO: use inner to shut inapplicable methods
Example (WithBus) ¶
package main import ( "context" "fmt" "sort" "strings" "sync" "github.com/fredbi/go-patterns/pipelines" "golang.org/x/sync/errgroup" ) type ( exampleNotification struct { Msg string Value int } exampleNotifications struct { mx sync.Mutex inner []exampleNotification } ) func (e *exampleNotifications) Add(in exampleNotification) { e.mx.Lock() defer e.mx.Unlock() e.inner = append(e.inner, in) } func (e *exampleNotifications) String() string { b := new(strings.Builder) e.mx.Lock() defer e.mx.Unlock() for _, in := range e.inner { fmt.Fprintf(b, "warn: notified of %s: %d\n", in.Msg, in.Value) } return b.String() } func (e *exampleNotifications) Sort() { sort.Slice(e.inner, func(i, j int) bool { return e.inner[i].Value < e.inner[j].Value }) } var exampleGenerator = func(ctx context.Context, out chan<- int, _ pipelines.NOBUSCHAN) error { inputs := []int{1, 2, 3, 4} for _, generated := range inputs { select { case <-ctx.Done(): return ctx.Err() case out <- generated: } } return nil } func main() { // This example creates a simple pipeline operation that feeds 3 integers to a collector that prints these out. // Even integer are rejected an generate a notification. generator := func(ctx context.Context, out chan<- int, _ chan<- exampleNotification) error { // Feeder[OUT any, BUS any] func(context.Context, chan<- OUT, chan<- BUS) error return exampleGenerator(ctx, out, nil) } oddCollector := func(ctx context.Context, in <-chan int, bus chan<- exampleNotification) error { // Collector[IN any, BUS any] func(context.Context, <-chan IN, chan<- BUS) error for { select { case <-ctx.Done(): return ctx.Err() case received, isOpen := <-in: if !isOpen { return nil } if received%2 == 0 { notice := exampleNotification{ Msg: "rejected even input", Value: received, } select { case <-ctx.Done(): return ctx.Err() case bus <- notice: } continue } fmt.Printf("received: %d\n", received) } } } pipes := pipelines.NewCollection[exampleNotification]() feeder := pipelines.NewFeeder[int, exampleNotification](). WithFeeder(generator) // a feeder that produces integers final := pipelines.NewCollector[int, exampleNotification](). WithInputFrom(feeder). // connects the collector pipeline to the feeder WithCollector(oddCollector) // a collector that prints odd integers pipes.Add(feeder, final) notificationsOutlet := new(exampleNotifications) // registers a bus listener to process out-of-band notifications // NOTE: bus messages are processed in parallel pipes.AddBusCollector( // func NewBusCollector[BUS any](opts ...Option) *BusCollector[BUS]{ pipelines.NewBusCollector[exampleNotification](). WithBusListener( // BusListener[BUS any] func(context.Context, BUS) error func(ctx context.Context, in exampleNotification) error { notificationsOutlet.Add(in) return nil }, ), ) // executes the pipeline in some goroutines group group, ctx := errgroup.WithContext(context.Background()) pipes.RunInGroup(ctx, group) if err := group.Wait(); err != nil { fmt.Printf("err: %v\n", err) } // Notice that bus notices are processed asynchronously in parallel. // Therefore, their ordering is not guaranteed. notificationsOutlet.Sort() fmt.Println(notificationsOutlet.String()) }
Output: received: 1 received: 3 warn: notified of rejected even input: 2 warn: notified of rejected even input: 4
Example (WithoutBus) ¶
package main import ( "context" "fmt" "github.com/fredbi/go-patterns/pipelines" "golang.org/x/sync/errgroup" ) var ( exampleGenerator = func(ctx context.Context, out chan<- int, _ pipelines.NOBUSCHAN) error { inputs := []int{1, 2, 3, 4} for _, generated := range inputs { select { case <-ctx.Done(): return ctx.Err() case out <- generated: } } return nil } exampleCollector = func(ctx context.Context, in <-chan int, _ pipelines.NOBUSCHAN) error { for { select { case <-ctx.Done(): return ctx.Err() case received, isOpen := <-in: if !isOpen { return nil } fmt.Printf("received: %d\n", received) } } } ) func main() { // This example creates a simple pipeline operation that feeds 3 integers to a collector that prints these out. pipes := pipelines.NewCollection[pipelines.NOBUS]() feeder := pipelines.NewFeeder[int, pipelines.NOBUS](). WithFeeder(exampleGenerator) // a feeder that produces integers pipes.Add(feeder) final := pipelines.NewCollector[int, pipelines.NOBUS](). WithInputFrom(feeder). WithCollector(exampleCollector) // a collector that prints integers pipes.Add(final) // executes the pipeline in some goroutines group group, ctx := errgroup.WithContext(context.Background()) pipes.RunInGroup(ctx, group) if err := group.Wait(); err != nil { fmt.Printf("err: %v\n", err) } }
Output: received: 1 received: 2 received: 3 received: 4
func NewFeeder ¶
func NewFeeder[OUT any, BUS any](opts ...Option) *FeederPipeline[OUT, BUS]
NewFeeder builds a new FeederPipeline.
func (FeederPipeline) RunWithContext ¶
func (*FeederPipeline[OUT, BUS]) WithBus ¶
func (p *FeederPipeline[OUT, BUS]) WithBus(bus chan BUS) *FeederPipeline[OUT, BUS]
func (*FeederPipeline[OUT, BUS]) WithFeeder ¶
func (p *FeederPipeline[OUT, BUS]) WithFeeder(feeder Feeder[OUT, BUS]) *FeederPipeline[OUT, BUS]
TODO: should be part of the constructor
func (*FeederPipeline[OUT, BUS]) WithOutput ¶
func (p *FeederPipeline[OUT, BUS]) WithOutput(out chan OUT) *FeederPipeline[OUT, BUS]
type FinalChainedPipeline ¶
FinalChainedPipeline is an input-only chained pipeline, that is returned when chaining with "Eventually".
The output cannot be defined on FinalChainedPipeline.
func (*FinalChainedPipeline[OUT, BUS]) Bus ¶
func (p *FinalChainedPipeline[OUT, BUS]) Bus() chan BUS
func (*FinalChainedPipeline[OUT, BUS]) Input ¶
func (p *FinalChainedPipeline[OUT, BUS]) Input() chan OUT
func (*FinalChainedPipeline[OUT, BUS]) RunWithContext ¶
type InitialChainedPipeline ¶
InitialChainedPipeline is an output-only chained pipeline.
The input cannot be defined on InitialChainedPipeline.
func NewInitialChained ¶
NewInitialChained builds a new initial chain of pipelines.
func (*InitialChainedPipeline[OUT, BUS]) BeginsWith ¶
func (p *InitialChainedPipeline[OUT, BUS]) BeginsWith(start *FeederPipeline[OUT, BUS]) *ChainedPipeline[dummy, OUT, BUS]
BeginsWith feeds a chained pipeline with some initial input pipeline.
func (*InitialChainedPipeline[OUT, BUS]) Bus ¶
func (p *InitialChainedPipeline[OUT, BUS]) Bus() chan BUS
func (*InitialChainedPipeline[OUT, BUS]) Output ¶
func (p *InitialChainedPipeline[OUT, BUS]) Output() chan OUT
func (*InitialChainedPipeline[OUT, BUS]) RunWithContext ¶
func (*InitialChainedPipeline[OUT, BUS]) SetOutput ¶
func (p *InitialChainedPipeline[OUT, BUS]) SetOutput(out chan OUT)
func (*InitialChainedPipeline[OUT, BUS]) WithOutput ¶
func (p *InitialChainedPipeline[OUT, BUS]) WithOutput(out chan OUT) *InitialChainedPipeline[OUT, BUS]
type Joiner ¶
type Joiner[IN any, OTHER any, OUT any, BUS any] func(context.Context, <-chan IN, <-chan OTHER, chan<- OUT, chan<- BUS) error
Joiner is the two-ways join executed inside a JoinerPipeline.
The joiner SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.
If the joiner takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).
type JoinerPipeline ¶
type JoinerPipeline[IN any, OTHER any, OUT any, BUS any] struct { // contains filtered or unexported fields }
JoinerPipeline is a pipeline that takes input for 2 channels of different types and applies a Joiner method to this input.
The implementation of the Joiner must be provided.
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) RunWithContext ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) RunWithContext(ctx context.Context) func() error
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) SetOther ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) SetOther(other chan OTHER)
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithBus ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithBus(bus chan BUS) *JoinerPipeline[IN, OTHER, OUT, BUS]
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputs ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputs(in chan IN, other chan OTHER) *JoinerPipeline[IN, OTHER, OUT, BUS]
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputsFrom ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputsFrom(producer Producer[IN], otherProducer Producer[OTHER]) *JoinerPipeline[IN, OTHER, OUT, BUS]
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithJoiner ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithJoiner(joiner Joiner[IN, OTHER, OUT, BUS]) *JoinerPipeline[IN, OTHER, OUT, BUS]
func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithOutput ¶
func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithOutput(out chan OUT) *JoinerPipeline[IN, OTHER, OUT, BUS]
type NOBUSCHAN ¶
type NOBUSCHAN = chan<- NOBUS
NOBUSCHAN is an alias for chan <- NOBUS when the BUS feature is unused.
type NamedRunnable ¶
NamedRunnable is a Runnable identifiable by a name.
type Option ¶
type Option func(*options)
Option to tune the behavior of a pipeline.
func WithAutoCloseOutput ¶
WithAutoCloseOutput sets the behavior to close the output channel after the runner is complete.
This defaults to true and can be disabled if the runner is already taking care of closing its output channel.
func WithBusBuffers ¶
WithBusBuffers sets a buffered bus channel.
func WithInputBuffers ¶
WithInputBuffers sets a buffered input channel.
func WithOutputBuffers ¶
WithOutputBuffers sets a buffered output cha,nel.
type Pipeline ¶
Pipeline is a generic pipe from channel IN to channel OUT and reporting errors or notices on channel BUS.
func NewPipeline ¶
NewPipeline builds a generic pipeline that collects from a channel of type IN and outputs into a channel of type OUT, without out-of-band notifications on channel of type BUS.
func (Pipeline) RunWithContext ¶
func (*Pipeline[IN, OUT, BUS]) WithInputFrom ¶
func (*Pipeline[IN, OUT, BUS]) WithOutput ¶
func (*Pipeline[IN, OUT, BUS]) WithOutputTo ¶
func (p *Pipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *Pipeline[IN, OUT, BUS]
WithOutputTo alters the provided consumer to set its input to the current pipeline's output.
func (*Pipeline[IN, OUT, BUS]) WithRunner ¶
WithRunner returns a pipeline with a runner function to transform IN into OUT.
type Producer ¶
type Producer[OUT any] interface { Output() chan OUT }
Producer knows what is the output channel.
type Runnable ¶
Runnable knows how to execute a runner with a context.
Runnables SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed. They are not expected to close the channels.
Whenever the bus feature is not put to use, you may use the placeholder type NOBUS: "Runnable[NOBUS]"
type Runner ¶
type Runner[IN any, OUT any, BUS any] func(context.Context, <-chan IN, chan<- OUT, chan<- BUS) error
Runner is the function executed inside a pipeline.
The runner SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.
If the runner takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).
type SettableConsumer ¶
SettableConsumer is a Consumer that may have its input set.
type SettableProducer ¶
SettableProducer is a Producer that may have its output set.