pipelines

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package pipelines exposes a generic implementation of asynchronous pipelines.

A pipeline is defined by: * an input channel of some type: <-chan IN * an output channel of some type: chan <- OUT * an optional outgoing out-of-band notification bus channel: chan <- BUS * a runner function func(context.CONTEXT, <- chan IN, chan <- OUT, chan <- BUS)

This implementation puts forward type safety when connecting channels of different types with each other: the consistency of your pipeline scaffolding should be ensured at build time.

Error handling

Pipelines are not akin to "promises" as there is no handling of the rejected state. Blocking errors should be handled by having the runner return an error. Non-blocking errors should be handled by sending some notification to the bus channel.

The package allows to define and manipulate pipeline with different characteristics: * Pipeline is a plain IN/OUT pipeline * FeederPipeline is a pipeline that only produces output (e.g. from some other non-channel input, such as a io.Reader or DB records) * CollectorPipeline is a pipeline that only consumes input (e.g. it produces some final non-channel output) * FanInPipeline implements a (many IN -> single OUT) fan-in pattern * FanOutPipeline implements a (single IN -> many OUT) fan-out pattern * JoinerPipeline provides a means to implement a 2-way join pattern: (IN, OTHER) -> OUT. * ChainedPipeline provides a means to declare pipelines easily using the BeginsWith, Then and Finally builder methods.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BusCollector

type BusCollector[BUS any] struct {
	// contains filtered or unexported fields
}

BusCollector is a fan-in collector on many BUS channels, to listen on all bus notifications from other pipelines.

func NewBusCollector

func NewBusCollector[BUS any](opts ...Option) *BusCollector[BUS]

NewBusCollector builds a BusCollector to listen on bus channels.

func (BusCollector) Bus

func (c BusCollector) Bus() chan BUS

func (BusCollector) Input

func (c BusCollector) Input() chan IN

func (BusCollector) Output

func (c BusCollector) Output() chan OUT

func (BusCollector) RunWithContext

func (c BusCollector) RunWithContext(ctx context.Context) func() error

func (BusCollector) SetBus

func (c BusCollector) SetBus(bus chan BUS)

func (BusCollector) SetInput

func (c BusCollector) SetInput(in chan IN)

func (BusCollector) SetOutput

func (c BusCollector) SetOutput(out chan OUT)

func (*BusCollector[BUS]) WithBusListener

func (p *BusCollector[BUS]) WithBusListener(listener BusListener[BUS]) *BusCollector[BUS]

func (*BusCollector[BUS]) WithInputs

func (p *BusCollector[BUS]) WithInputs(inputs ...<-chan BUS) *BusCollector[BUS]

func (*BusCollector[BUS]) WithInputsFrom

func (p *BusCollector[BUS]) WithInputsFrom(busers ...Buser[BUS]) *BusCollector[BUS]

type BusListener

type BusListener[BUS any] func(context.Context, BUS) error

BusListener defines a special collector for BUS entries.

type Buser

type Buser[BUS any] interface {
	Bus() chan BUS
}

Buser knows about its bus channel

type ChainedPipeline

type ChainedPipeline[IN any, OUT any, BUS any] struct {
	// contains filtered or unexported fields
}

ChainedPipeline allows to construct chains of asynchronous pipelines with the Then idiom.

The input of the chain is specified with "WithInput".

The whole chain may run asynchronously with RunWithContext(), which blocks until all the elements in the chain are done.

The input of a ChainedPipeline corresponds to the input of its first pipeline. The output of a ChainedPipeline corresponds to the output of its last pipeline.

func NewChained

func NewChained[IN any, OUT any, BUS any](opts ...Option) *ChainedPipeline[IN, OUT, BUS]

func Then

func Then[IN any, OUT any, CHAINEDOUT any, BUS any](current *ChainedPipeline[IN, OUT, BUS], next *Pipeline[OUT, CHAINEDOUT, BUS]) *ChainedPipeline[IN, CHAINEDOUT, BUS]

Then allows to chain outputs by declaring explicit types.

Notice that unfortunately, we can't make this a fluent method of ChainedPipeline as of go1.18.

func (ChainedPipeline) Bus

func (c ChainedPipeline) Bus() chan BUS

func (*ChainedPipeline[IN, OUT, BUS]) Finally

func (p *ChainedPipeline[IN, OUT, BUS]) Finally(end *CollectorPipeline[OUT, BUS]) *FinalChainedPipeline[OUT, BUS]

Finally terminates a chained pipeline with some final collector pipeline.

func (ChainedPipeline) Input

func (c ChainedPipeline) Input() chan IN

func (ChainedPipeline) Output

func (c ChainedPipeline) Output() chan OUT

func (*ChainedPipeline[IN, OUT, BUS]) RunWithContext

func (p *ChainedPipeline[IN, OUT, BUS]) RunWithContext(ctx context.Context) func() error

RunWithContext executes all the runners of a chain of pipelines as goroutines.

The execution is interrupted if the context is cancelled.

It blocks until all inner runners exit.

func (ChainedPipeline) SetBus

func (c ChainedPipeline) SetBus(bus chan BUS)

func (ChainedPipeline) SetInput

func (c ChainedPipeline) SetInput(in chan IN)

func (ChainedPipeline) SetOutput

func (c ChainedPipeline) SetOutput(out chan OUT)

func (*ChainedPipeline[IN, OUT, BUS]) WithInputFrom

func (p *ChainedPipeline[IN, OUT, BUS]) WithInputFrom(producer Producer[IN]) *ChainedPipeline[IN, OUT, BUS]

WithInputFrom sets the the input channel for this chain.

func (*ChainedPipeline[IN, OUT, BUS]) WithOutputTo

func (p *ChainedPipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *ChainedPipeline[IN, OUT, BUS]

WithOutputTo sets the input of the given consumer to the output of the current chain.

type Collection

type Collection[BUS any] struct {
	// contains filtered or unexported fields
}

Collection is a chained list of named runnable pipelines with some common BUS type to handle out-of-band notifications.

func NewCollection

func NewCollection[BUS any]() *Collection[BUS]

func (*Collection[BUS]) Add

func (c *Collection[BUS]) Add(piped ...Runnable[BUS])

func (*Collection[BUS]) AddBusCollector

func (c *Collection[BUS]) AddBusCollector(collector *BusCollector[BUS])

AddBusCollector adds a BusCollector pipeline connected to all the bus channels published by the pipelines currently in the collection.

func (*Collection[BUS]) AddNamed

func (c *Collection[BUS]) AddNamed(name string, piped Runnable[BUS])

func (*Collection[BUS]) AddNamedBusCollector

func (c *Collection[BUS]) AddNamedBusCollector(name string, collector *BusCollector[BUS])

func (*Collection[BUS]) AllRunnables

func (c *Collection[BUS]) AllRunnables() []Runnable[BUS]

AllRunnables returns all the runnable pipelines in the collection.

func (*Collection[BUS]) AllRunnersWithContext

func (c *Collection[BUS]) AllRunnersWithContext(ctx context.Context) []func() error

AllRunnersWithContext returns all runners as a collection of func() error suitable to execute in some errgroup.Group.

func (*Collection[BUS]) Append

func (c *Collection[BUS]) Append(piped ...Runnable[BUS]) *Collection[BUS]

func (*Collection[BUS]) AppendNamed

func (c *Collection[BUS]) AppendNamed(name string, piped Runnable[BUS]) *Collection[BUS]

func (*Collection[BUS]) Lookup

func (c *Collection[BUS]) Lookup(name string) (NamedRunnable[BUS], bool)

Lookup in a collection for a given pipeline by its name.

func (*Collection[BUS]) RunInGroup

func (c *Collection[BUS]) RunInGroup(ctx context.Context, group *errgroup.Group)

RunInGroup starts the pipeline inside some provided errgroup.Group.

The caller is responsible for waiting on the group: RunInGroup only launches all runners as goroutines.

The context passed should be the context produced when creating the group.

type Collector

type Collector[IN any, BUS any] func(context.Context, <-chan IN, chan<- BUS) error

Collector is the input-only function executed inside a CollectorPipeline.

The collector SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.

type CollectorPipeline

type CollectorPipeline[IN any, BUS any] struct {
	// contains filtered or unexported fields
}

CollectorPipeline is a dead-end pipeline, only consuming from input and producing not output channel.

This is a termination point in your pipeline dependency graph.

func NewCollector

func NewCollector[IN any, BUS any](opts ...Option) *CollectorPipeline[IN, BUS]

NewCollector builds a CollectorPipeline that reads from a channel of type IN.

func (CollectorPipeline) Bus

func (c CollectorPipeline) Bus() chan BUS

func (CollectorPipeline) Input

func (c CollectorPipeline) Input() chan IN

func (CollectorPipeline) Output

func (c CollectorPipeline) Output() chan OUT

func (CollectorPipeline) RunWithContext

func (c CollectorPipeline) RunWithContext(ctx context.Context) func() error

func (CollectorPipeline) SetBus

func (c CollectorPipeline) SetBus(bus chan BUS)

func (CollectorPipeline) SetInput

func (c CollectorPipeline) SetInput(in chan IN)

func (CollectorPipeline) SetOutput

func (c CollectorPipeline) SetOutput(out chan OUT)

func (*CollectorPipeline[IN, BUS]) WithBus

func (p *CollectorPipeline[IN, BUS]) WithBus(bus chan BUS) *CollectorPipeline[IN, BUS]

func (*CollectorPipeline[IN, BUS]) WithCollector

func (p *CollectorPipeline[IN, BUS]) WithCollector(collector Collector[IN, BUS]) *CollectorPipeline[IN, BUS]

func (*CollectorPipeline[IN, BUS]) WithInput

func (p *CollectorPipeline[IN, BUS]) WithInput(in chan IN) *CollectorPipeline[IN, BUS]

func (*CollectorPipeline[IN, BUS]) WithInputFrom

func (p *CollectorPipeline[IN, BUS]) WithInputFrom(producer Producer[IN]) *CollectorPipeline[IN, BUS]

type Consumer

type Consumer[IN any] interface {
	Input() chan IN
}

Consumer knows what is the input channel.

type FanHook

type FanHook[INOUT any, BUS any] func(context.Context, INOUT, chan<- BUS) error

FanHook is a hook function executed by the FanInPipeline or the FanOutPipeline runner.

type FanInOption

type FanInOption[INOUT any, BUS any] func(*fanInOptions[INOUT, BUS])

FanInOption alters the behavior of the fan-in runner.

func WithFanInHooks

func WithFanInHooks[INOUT any, BUS any](hooks ...FanHook[INOUT, BUS]) FanInOption[INOUT, BUS]

type FanInPipeline

type FanInPipeline[INOUT any, BUS any] struct {
	// contains filtered or unexported fields
}

FanInPipeline is a pipeline that gathers several input channels into one single ouput.

The FanInPipeline does not transform the data type.

There is a default runner implemented, with a few FanIn options available to introduce hooks, which could be used for example to: * collect metrics and logging * apply some data transform

func NewFanIn

func NewFanIn[INOUT any, BUS any](opts ...Option) *FanInPipeline[INOUT, BUS]

func (FanInPipeline) Bus

func (c FanInPipeline) Bus() chan BUS

func (FanInPipeline) Input

func (c FanInPipeline) Input() chan IN

func (FanInPipeline) Output

func (c FanInPipeline) Output() chan OUT

func (FanInPipeline) RunWithContext

func (c FanInPipeline) RunWithContext(ctx context.Context) func() error

func (FanInPipeline) SetBus

func (c FanInPipeline) SetBus(bus chan BUS)

func (FanInPipeline) SetInput

func (c FanInPipeline) SetInput(in chan IN)

func (*FanInPipeline[INOUT, BUS]) SetInputs

func (p *FanInPipeline[INOUT, BUS]) SetInputs(inputs ...<-chan INOUT)

func (FanInPipeline) SetOutput

func (c FanInPipeline) SetOutput(out chan OUT)

func (*FanInPipeline[INOUT, BUS]) WithBus

func (p *FanInPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanInPipeline[INOUT, BUS]

func (*FanInPipeline[INOUT, BUS]) WithFanInFrom

func (p *FanInPipeline[INOUT, BUS]) WithFanInFrom(producers ...Producer[INOUT]) *FanInPipeline[INOUT, BUS]

func (*FanInPipeline[INOUT, BUS]) WithFanInOptions

func (p *FanInPipeline[INOUT, BUS]) WithFanInOptions(opts ...FanInOption[INOUT, BUS]) *FanInPipeline[INOUT, BUS]

func (*FanInPipeline[INOUT, BUS]) WithInputs

func (p *FanInPipeline[INOUT, BUS]) WithInputs(inputs ...<-chan INOUT) *FanInPipeline[INOUT, BUS]

func (*FanInPipeline[INOUT, BUS]) WithOutput

func (p *FanInPipeline[INOUT, BUS]) WithOutput(out chan INOUT) *FanInPipeline[INOUT, BUS]

func (*FanInPipeline[INOUT, BUS]) WithOutputTo

func (p *FanInPipeline[INOUT, BUS]) WithOutputTo(consumer SettableConsumer[INOUT]) *FanInPipeline[INOUT, BUS]

WithOutputTo alters the provided consumer to set its input to the current pipeline's output.

type FanOutOption

type FanOutOption[INOUT any, BUS any] func(*fanOutOptions[INOUT, BUS])

FanOutOption alters the behavior of the fan-out runner.

func WithFanOutHooks

func WithFanOutHooks[INOUT any, BUS any](hooks ...FanHook[INOUT, BUS]) FanOutOption[INOUT, BUS]

type FanOutPipeline

type FanOutPipeline[INOUT any, BUS any] struct {
	// contains filtered or unexported fields
}

FanOutPipeline takes one input and duplicate the input messages to several output channels.

FanOutPipeline does not change the data type.

There is a default runner implemented, with a few options available to introduce hooks, which could be used to: * collect metrics & logging * cloning output messages if they pass references that are mutated downstream

func NewFanOut

func NewFanOut[INOUT any, BUS any](opts ...Option) *FanOutPipeline[INOUT, BUS]

func (FanOutPipeline) Bus

func (c FanOutPipeline) Bus() chan BUS

func (FanOutPipeline) Input

func (c FanOutPipeline) Input() chan IN

func (FanOutPipeline) Output

func (c FanOutPipeline) Output() chan OUT

func (FanOutPipeline) RunWithContext

func (c FanOutPipeline) RunWithContext(ctx context.Context) func() error

func (FanOutPipeline) SetBus

func (c FanOutPipeline) SetBus(bus chan BUS)

func (*FanOutPipeline[INOUT, BUS]) SetInput

func (p *FanOutPipeline[INOUT, BUS]) SetInput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) SetOutput

func (p *FanOutPipeline[INOUT, BUS]) SetOutput(_ chan INOUT) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) WithBus

func (p *FanOutPipeline[INOUT, BUS]) WithBus(bus chan BUS) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) WithFanOutOptions

func (p *FanOutPipeline[INOUT, BUS]) WithFanOutOptions(opts ...FanOutOption[INOUT, BUS]) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) WithFanOutTo

func (p *FanOutPipeline[INOUT, BUS]) WithFanOutTo(consumers ...Consumer[INOUT]) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) WithInput

func (p *FanOutPipeline[INOUT, BUS]) WithInput(in chan INOUT) *FanOutPipeline[INOUT, BUS]

func (*FanOutPipeline[INOUT, BUS]) WithInputFrom

func (p *FanOutPipeline[INOUT, BUS]) WithInputFrom(producer Producer[INOUT]) *FanOutPipeline[INOUT, BUS]

type Feeder

type Feeder[OUT any, BUS any] func(context.Context, chan<- OUT, chan<- BUS) error

Feeder is the output-only function executed inside a FeederPipeline.

The feeder SHOULD relinquish resources and exit when the context is cancelled.

If the feeder takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).

type FeederPipeline

type FeederPipeline[OUT any, BUS any] struct {
	// contains filtered or unexported fields
}

FeederPipeline is a pipeline that only outputs, with a message generating Feeder method.

A FeederPipeline is a starting point in your pipeline dependency graph. TODO: use inner to shut inapplicable methods

Example (WithBus)
package main

import (
	"context"
	"fmt"
	"sort"
	"strings"
	"sync"

	"github.com/fredbi/go-patterns/pipelines"
	"golang.org/x/sync/errgroup"
)

type (
	exampleNotification struct {
		Msg   string
		Value int
	}

	exampleNotifications struct {
		mx    sync.Mutex
		inner []exampleNotification
	}
)

func (e *exampleNotifications) Add(in exampleNotification) {
	e.mx.Lock()
	defer e.mx.Unlock()

	e.inner = append(e.inner, in)
}

func (e *exampleNotifications) String() string {
	b := new(strings.Builder)
	e.mx.Lock()
	defer e.mx.Unlock()

	for _, in := range e.inner {
		fmt.Fprintf(b, "warn: notified of %s: %d\n", in.Msg, in.Value)
	}

	return b.String()
}

func (e *exampleNotifications) Sort() {
	sort.Slice(e.inner, func(i, j int) bool {
		return e.inner[i].Value < e.inner[j].Value
	})
}

var exampleGenerator = func(ctx context.Context, out chan<- int, _ pipelines.NOBUSCHAN) error {

	inputs := []int{1, 2, 3, 4}
	for _, generated := range inputs {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case out <- generated:
		}
	}

	return nil
}

func main() {
	// This example creates a simple pipeline operation that feeds 3 integers to a collector that prints these out.
	// Even integer are rejected an generate a notification.

	generator := func(ctx context.Context, out chan<- int, _ chan<- exampleNotification) error {
		// Feeder[OUT any, BUS any] func(context.Context, chan<- OUT, chan<- BUS) error
		return exampleGenerator(ctx, out, nil)
	}

	oddCollector := func(ctx context.Context, in <-chan int, bus chan<- exampleNotification) error {
		// Collector[IN any, BUS any] func(context.Context, <-chan IN, chan<- BUS) error
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case received, isOpen := <-in:
				if !isOpen {
					return nil
				}

				if received%2 == 0 {
					notice := exampleNotification{
						Msg:   "rejected even input",
						Value: received,
					}
					select {
					case <-ctx.Done():
						return ctx.Err()
					case bus <- notice:
					}

					continue
				}

				fmt.Printf("received: %d\n", received)
			}
		}
	}

	pipes := pipelines.NewCollection[exampleNotification]()

	feeder := pipelines.NewFeeder[int, exampleNotification]().
		WithFeeder(generator) // a feeder that produces integers

	final := pipelines.NewCollector[int, exampleNotification]().
		WithInputFrom(feeder).      // connects the collector pipeline to the feeder
		WithCollector(oddCollector) // a collector that prints odd integers

	pipes.Add(feeder, final)

	notificationsOutlet := new(exampleNotifications)

	// registers a bus listener to process out-of-band notifications
	// NOTE: bus messages are processed in parallel
	pipes.AddBusCollector(
		// func NewBusCollector[BUS any](opts ...Option) *BusCollector[BUS]{
		pipelines.NewBusCollector[exampleNotification]().
			WithBusListener(
				// BusListener[BUS any] func(context.Context, BUS) error
				func(ctx context.Context, in exampleNotification) error {
					notificationsOutlet.Add(in)

					return nil
				},
			),
	)

	// executes the pipeline in some goroutines group
	group, ctx := errgroup.WithContext(context.Background())
	pipes.RunInGroup(ctx, group)

	if err := group.Wait(); err != nil {
		fmt.Printf("err: %v\n", err)
	}

	// Notice that bus notices are processed asynchronously in parallel.
	// Therefore, their ordering is not guaranteed.
	notificationsOutlet.Sort()

	fmt.Println(notificationsOutlet.String())

}
Output:

received: 1
received: 3
warn: notified of rejected even input: 2
warn: notified of rejected even input: 4
Example (WithoutBus)
package main

import (
	"context"
	"fmt"

	"github.com/fredbi/go-patterns/pipelines"
	"golang.org/x/sync/errgroup"
)

var (
	exampleGenerator = func(ctx context.Context, out chan<- int, _ pipelines.NOBUSCHAN) error {

		inputs := []int{1, 2, 3, 4}
		for _, generated := range inputs {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case out <- generated:
			}
		}

		return nil
	}

	exampleCollector = func(ctx context.Context, in <-chan int, _ pipelines.NOBUSCHAN) error {

		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case received, isOpen := <-in:
				if !isOpen {
					return nil
				}
				fmt.Printf("received: %d\n", received)
			}
		}
	}
)

func main() {
	// This example creates a simple pipeline operation that feeds 3 integers to a collector that prints these out.
	pipes := pipelines.NewCollection[pipelines.NOBUS]()

	feeder := pipelines.NewFeeder[int, pipelines.NOBUS]().
		WithFeeder(exampleGenerator) // a feeder that produces integers
	pipes.Add(feeder)

	final := pipelines.NewCollector[int, pipelines.NOBUS]().
		WithInputFrom(feeder).
		WithCollector(exampleCollector) // a collector that prints integers
	pipes.Add(final)

	// executes the pipeline in some goroutines group
	group, ctx := errgroup.WithContext(context.Background())
	pipes.RunInGroup(ctx, group)

	if err := group.Wait(); err != nil {
		fmt.Printf("err: %v\n", err)
	}

}
Output:

received: 1
received: 2
received: 3
received: 4

func NewFeeder

func NewFeeder[OUT any, BUS any](opts ...Option) *FeederPipeline[OUT, BUS]

NewFeeder builds a new FeederPipeline.

func (FeederPipeline) Bus

func (c FeederPipeline) Bus() chan BUS

func (FeederPipeline) Input

func (c FeederPipeline) Input() chan IN

func (FeederPipeline) Output

func (c FeederPipeline) Output() chan OUT

func (FeederPipeline) RunWithContext

func (c FeederPipeline) RunWithContext(ctx context.Context) func() error

func (FeederPipeline) SetBus

func (c FeederPipeline) SetBus(bus chan BUS)

func (FeederPipeline) SetInput

func (c FeederPipeline) SetInput(in chan IN)

func (FeederPipeline) SetOutput

func (c FeederPipeline) SetOutput(out chan OUT)

func (*FeederPipeline[OUT, BUS]) WithBus

func (p *FeederPipeline[OUT, BUS]) WithBus(bus chan BUS) *FeederPipeline[OUT, BUS]

func (*FeederPipeline[OUT, BUS]) WithFeeder

func (p *FeederPipeline[OUT, BUS]) WithFeeder(feeder Feeder[OUT, BUS]) *FeederPipeline[OUT, BUS]

TODO: should be part of the constructor

func (*FeederPipeline[OUT, BUS]) WithOutput

func (p *FeederPipeline[OUT, BUS]) WithOutput(out chan OUT) *FeederPipeline[OUT, BUS]

type FinalChainedPipeline

type FinalChainedPipeline[IN any, BUS any] struct {
	// contains filtered or unexported fields
}

FinalChainedPipeline is an input-only chained pipeline, that is returned when chaining with "Eventually".

The output cannot be defined on FinalChainedPipeline.

func (*FinalChainedPipeline[OUT, BUS]) Bus

func (p *FinalChainedPipeline[OUT, BUS]) Bus() chan BUS

func (*FinalChainedPipeline[OUT, BUS]) Input

func (p *FinalChainedPipeline[OUT, BUS]) Input() chan OUT

func (*FinalChainedPipeline[OUT, BUS]) RunWithContext

func (p *FinalChainedPipeline[OUT, BUS]) RunWithContext(ctx context.Context) func() error

func (*FinalChainedPipeline[OUT, BUS]) SetInput

func (p *FinalChainedPipeline[OUT, BUS]) SetInput(in chan OUT)

func (*FinalChainedPipeline[OUT, BUS]) WithInput

func (p *FinalChainedPipeline[OUT, BUS]) WithInput(in chan OUT) *FinalChainedPipeline[OUT, BUS]

type InitialChainedPipeline

type InitialChainedPipeline[OUT any, BUS any] struct {
	// contains filtered or unexported fields
}

InitialChainedPipeline is an output-only chained pipeline.

The input cannot be defined on InitialChainedPipeline.

func NewInitialChained

func NewInitialChained[OUT any, BUS any](opts ...Option) *InitialChainedPipeline[OUT, BUS]

NewInitialChained builds a new initial chain of pipelines.

func (*InitialChainedPipeline[OUT, BUS]) BeginsWith

func (p *InitialChainedPipeline[OUT, BUS]) BeginsWith(start *FeederPipeline[OUT, BUS]) *ChainedPipeline[dummy, OUT, BUS]

BeginsWith feeds a chained pipeline with some initial input pipeline.

func (*InitialChainedPipeline[OUT, BUS]) Bus

func (p *InitialChainedPipeline[OUT, BUS]) Bus() chan BUS

func (*InitialChainedPipeline[OUT, BUS]) Output

func (p *InitialChainedPipeline[OUT, BUS]) Output() chan OUT

func (*InitialChainedPipeline[OUT, BUS]) RunWithContext

func (p *InitialChainedPipeline[OUT, BUS]) RunWithContext(ctx context.Context) func() error

func (*InitialChainedPipeline[OUT, BUS]) SetOutput

func (p *InitialChainedPipeline[OUT, BUS]) SetOutput(out chan OUT)

func (*InitialChainedPipeline[OUT, BUS]) WithOutput

func (p *InitialChainedPipeline[OUT, BUS]) WithOutput(out chan OUT) *InitialChainedPipeline[OUT, BUS]

type Joiner

type Joiner[IN any, OTHER any, OUT any, BUS any] func(context.Context, <-chan IN, <-chan OTHER, chan<- OUT, chan<- BUS) error

Joiner is the two-ways join executed inside a JoinerPipeline.

The joiner SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.

If the joiner takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).

type JoinerPipeline

type JoinerPipeline[IN any, OTHER any, OUT any, BUS any] struct {
	// contains filtered or unexported fields
}

JoinerPipeline is a pipeline that takes input for 2 channels of different types and applies a Joiner method to this input.

The implementation of the Joiner must be provided.

func NewJoiner

func NewJoiner[IN any, OTHER any, OUT any, BUS any](opts ...Option) *JoinerPipeline[IN, OTHER, OUT, BUS]

func (JoinerPipeline) Bus

func (c JoinerPipeline) Bus() chan BUS

func (JoinerPipeline) Input

func (c JoinerPipeline) Input() chan IN

func (JoinerPipeline) Output

func (c JoinerPipeline) Output() chan OUT

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) RunWithContext

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) RunWithContext(ctx context.Context) func() error

func (JoinerPipeline) SetBus

func (c JoinerPipeline) SetBus(bus chan BUS)

func (JoinerPipeline) SetInput

func (c JoinerPipeline) SetInput(in chan IN)

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) SetOther

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) SetOther(other chan OTHER)

func (JoinerPipeline) SetOutput

func (c JoinerPipeline) SetOutput(out chan OUT)

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithBus

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithBus(bus chan BUS) *JoinerPipeline[IN, OTHER, OUT, BUS]

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputs

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputs(in chan IN, other chan OTHER) *JoinerPipeline[IN, OTHER, OUT, BUS]

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputsFrom

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithInputsFrom(producer Producer[IN], otherProducer Producer[OTHER]) *JoinerPipeline[IN, OTHER, OUT, BUS]

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithJoiner

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithJoiner(joiner Joiner[IN, OTHER, OUT, BUS]) *JoinerPipeline[IN, OTHER, OUT, BUS]

func (*JoinerPipeline[IN, OTHER, OUT, BUS]) WithOutput

func (p *JoinerPipeline[IN, OTHER, OUT, BUS]) WithOutput(out chan OUT) *JoinerPipeline[IN, OTHER, OUT, BUS]

type NOBUS

type NOBUS dummy

NOBUS is a placeholder when the BUS feature is unused.

type NOBUSCHAN

type NOBUSCHAN = chan<- NOBUS

NOBUSCHAN is an alias for chan <- NOBUS when the BUS feature is unused.

type NOINPUT

type NOINPUT = dummy

type NOOUTPUT

type NOOUTPUT = dummy

type NamedRunnable

type NamedRunnable[BUS any] interface {
	Runnable[BUS]
	Name() string
}

NamedRunnable is a Runnable identifiable by a name.

type Option

type Option func(*options)

Option to tune the behavior of a pipeline.

func WithAutoCloseOutput

func WithAutoCloseOutput(enabled bool) Option

WithAutoCloseOutput sets the behavior to close the output channel after the runner is complete.

This defaults to true and can be disabled if the runner is already taking care of closing its output channel.

func WithBusBuffers

func WithBusBuffers(channelBuffers int) Option

WithBusBuffers sets a buffered bus channel.

func WithInputBuffers

func WithInputBuffers(channelBuffers int) Option

WithInputBuffers sets a buffered input channel.

func WithName

func WithName(name string) Option

func WithOutputBuffers

func WithOutputBuffers(channelBuffers int) Option

WithOutputBuffers sets a buffered output cha,nel.

type Pipeline

type Pipeline[IN any, OUT any, BUS any] struct {
	// contains filtered or unexported fields
}

Pipeline is a generic pipe from channel IN to channel OUT and reporting errors or notices on channel BUS.

func NewPipeline

func NewPipeline[IN any, OUT any, BUS any](opts ...Option) *Pipeline[IN, OUT, BUS]

NewPipeline builds a generic pipeline that collects from a channel of type IN and outputs into a channel of type OUT, without out-of-band notifications on channel of type BUS.

func (Pipeline) Bus

func (c Pipeline) Bus() chan BUS

func (Pipeline) Input

func (c Pipeline) Input() chan IN

func (Pipeline) Output

func (c Pipeline) Output() chan OUT

func (Pipeline) RunWithContext

func (c Pipeline) RunWithContext(ctx context.Context) func() error

func (Pipeline) SetBus

func (c Pipeline) SetBus(bus chan BUS)

func (Pipeline) SetInput

func (c Pipeline) SetInput(in chan IN)

func (Pipeline) SetOutput

func (c Pipeline) SetOutput(out chan OUT)

func (*Pipeline[IN, OUT, BUS]) WithBus

func (p *Pipeline[IN, OUT, BUS]) WithBus(bus chan BUS) *Pipeline[IN, OUT, BUS]

func (*Pipeline[IN, OUT, BUS]) WithInput

func (p *Pipeline[IN, OUT, BUS]) WithInput(in chan IN) *Pipeline[IN, OUT, BUS]

func (*Pipeline[IN, OUT, BUS]) WithInputFrom

func (p *Pipeline[IN, OUT, BUS]) WithInputFrom(producer Producer[IN]) *Pipeline[IN, OUT, BUS]

func (*Pipeline[IN, OUT, BUS]) WithOutput

func (p *Pipeline[IN, OUT, BUS]) WithOutput(out chan OUT) *Pipeline[IN, OUT, BUS]

func (*Pipeline[IN, OUT, BUS]) WithOutputTo

func (p *Pipeline[IN, OUT, BUS]) WithOutputTo(consumer SettableConsumer[OUT]) *Pipeline[IN, OUT, BUS]

WithOutputTo alters the provided consumer to set its input to the current pipeline's output.

func (*Pipeline[IN, OUT, BUS]) WithRunner

func (p *Pipeline[IN, OUT, BUS]) WithRunner(runner Runner[IN, OUT, BUS]) *Pipeline[IN, OUT, BUS]

WithRunner returns a pipeline with a runner function to transform IN into OUT.

type Producer

type Producer[OUT any] interface {
	Output() chan OUT
}

Producer knows what is the output channel.

type Runnable

type Runnable[BUS any] interface {
	RunWithContext(context.Context) func() error
	Buser[BUS]
}

Runnable knows how to execute a runner with a context.

Runnables SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed. They are not expected to close the channels.

Whenever the bus feature is not put to use, you may use the placeholder type NOBUS: "Runnable[NOBUS]"

type Runner

type Runner[IN any, OUT any, BUS any] func(context.Context, <-chan IN, chan<- OUT, chan<- BUS) error

Runner is the function executed inside a pipeline.

The runner SHOULD relinquish resources and exit when the context is cancelled or when the input channel is closed.

If the runner takes care of closing the ouput channel, the executing pipeline should explictly no do so redundantly (create the pipeline with "WithAutoCloseOutput(false)" in this case).

type SettableConsumer

type SettableConsumer[IN any] interface {
	Consumer[IN]
	SetInput(chan IN)
}

SettableConsumer is a Consumer that may have its input set.

type SettableProducer

type SettableProducer[OUT any] interface {
	Producer[OUT]
	SetOutput(chan OUT)
}

SettableProducer is a Producer that may have its output set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL