pipe

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: Apache-2.0 Imports: 4 Imported by: 5

Documentation

Overview

Package pipe provides functionalities to create nodes and interconnect them. A Node is a function container that can be connected via channels to other nodes. A node can send data to multiple nodes, and receive data from multiple nodes.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddFinal

func AddFinal[IMPL NodesMap, IN any](p *Builder[IMPL], field FinalPtr[IMPL, IN], fn FinalFunc[IN], opts ...Option)

AddFinal creates a Final node given the provided FinalFunc. The node will be assigned to the field of the NodesMap whose pointer is returned by the provided FinalPtr function. The options related to the connection to that Final node can be overridden. Otherwise the global options passed to the pipeline Builder are used.

func AddFinalProvider

func AddFinalProvider[IMPL NodesMap, IN any](p *Builder[IMPL], field FinalPtr[IMPL, IN], provider FinalProvider[IN])

AddFinalProvider registers a FinalProvider into the pipeline Builder. The function returned by the FinalProvider will be assigned to the NodesMap field whose pointer is returned by the passed FinalPtr function.

func AddMiddle

func AddMiddle[IMPL NodesMap, IN, OUT any](p *Builder[IMPL], field MiddlePtr[IMPL, IN, OUT], fn MiddleFunc[IN, OUT], opts ...Option)

AddMiddle creates a Middle node given the provided MiddleFunc. The node will be assigned to the field of the NodesMap whose pointer is returned by the provided MiddlePtr function. The options related to the connection to that Middle node can be overridden. Otherwise the global options passed to the pipeline Builder are used.

func AddMiddleProvider

func AddMiddleProvider[IMPL NodesMap, IN, OUT any](p *Builder[IMPL], field MiddlePtr[IMPL, IN, OUT], provider MiddleProvider[IN, OUT])

AddMiddleProvider registers a MiddleProvider into the pipeline Builder. The function returned by the MiddleProvider will be assigned to the NodesMap field whose pointer is returned by the passed MiddlePtr function.

func AddStart

func AddStart[IMPL NodesMap, OUT any](p *Builder[IMPL], field StartPtr[IMPL, OUT], fn StartFunc[OUT])

AddStart creates a Start node given the provided StartFunc. The node will be assigned to the field of the NodesMap whose pointer is returned by the provided StartPtr function.

func AddStartProvider

func AddStartProvider[IMPL NodesMap, OUT any](p *Builder[IMPL], field StartPtr[IMPL, OUT], provider StartProvider[OUT])

AddStartProvider registers a StartProviderFunc into the pipeline Builder. The function returned by the StartProvider will be assigned to the NodesMap field whose pointer is returned by the passed StartPtr function.

Types

type Builder

type Builder[IMPL NodesMap] struct {
	// contains filtered or unexported fields
}

Builder provides tools and functions to create a pipeline and add nodes and node providers to it.

func NewBuilder

func NewBuilder[IMPL NodesMap](nodesMap IMPL, defaultOpts ...Option) *Builder[IMPL]

NewBuilder creates a pipeline builder whose nodes and connections are defined by the passed NodesMap implementation. It accepts a set of default options that would apply to all the nodes and connections in the pipeline.

func (*Builder[IMPL]) Build

func (b *Builder[IMPL]) Build() (*Runner, error)

Build a pipe Runner ready to Start processing data until all the nodes are Done.

type Final

type Final[IN any] interface {
	Receiver[IN]
}

Final nodes go at the end of the pipeline. They only can receive data from the pipeline, despite they could export that data by other means out of the pipes library.

type FinalFunc

type FinalFunc[IN any] func(in <-chan IN)

FinalFunc is a function that receives a readable channel as unique argument. It must process the inputs from the input channel until it's closed.

func IgnoreFinal

func IgnoreFinal[IN any]() FinalFunc[IN]

IgnoreFinal is a convenience function to explicitly specify that the returned FinalFunc // is going to be ignored/bypassed by the pipes library.

type FinalProvider

type FinalProvider[IN any] func() (FinalFunc[IN], error)

FinalProvider is a function that returns a FinalFunc to be used as Final node in a pipeline. It also might return an error if there is a problem with the configuration or instantiation of the function.

If both the returned function and the error are nil, the middle node will be ignored and would be equivalent to not adding it to the pipeline.

For readability, don't do:

return nil, nil

instead, use the equivalent convenience function:

return IgnoreFinal[T](), nil

type FinalPtr

type FinalPtr[IMPL NodesMap, IN any] func(IMPL) *Final[IN]

FinalPtr is a function that, given a NodesMap, returns a pointer to a Final node, which is going to be used as store destination when this function is passed as argument to AddFinalProvider or AddFinal functions.

type Middle

type Middle[IN, OUT any] interface {
	Final[IN]
	Start[OUT]
}

Middle nodes go in between Start, Final or other Middle nodes. They can send and receive data.

type MiddleFunc

type MiddleFunc[IN, OUT any] func(in <-chan IN, out chan<- OUT)

MiddleFunc is a function that receives a readable channel as first argument, and a writable channel as second argument. It must process the inputs from the input channel until it's closed.

func Bypass

func Bypass[INOUT any]() MiddleFunc[INOUT, INOUT]

Bypass is a convenience function to explicitly specify that the returned MiddleFunc is going to be ignored/bypassed by the pipes library.

type MiddleProvider

type MiddleProvider[IN, OUT any] func() (MiddleFunc[IN, OUT], error)

MiddleProvider is a function that returns a MiddleFunc to be used as Middle node in a pipeline. It also might return an error if there is a problem with the configuration or instantiation of the function.

If the IN and OUT types are different, the returned function can't be nil unless an error is returned.

If the IN and OUT type is the same, and both the returned function and the error are nil, the middle node will be bypassed and would be equivalent to not adding it to the pipeline, dyrectly bypassing the connection between its Sender nodes to its Receiver nodes.

For readability, don't do:

return nil, nil

instead, use the equivalent convenience function:

return Bypass[T](), nil

type MiddlePtr

type MiddlePtr[IMPL NodesMap, IN, OUT any] func(IMPL) *Middle[IN, OUT]

MiddlePtr is a function that, given a NodesMap, returns a pointer to a Middle node, which is going to be used as store destination when this function is passed as argument to AddMiddleProvider or AddMiddle functions.

type NodesMap

type NodesMap interface {
	// Connect runs the code that connects the nodes of a pipeline. It is invoked
	// by the Builder before returning the pipeline Runner.
	Connect()
}

NodesMap is any data structure that stores references to the nodes of a pipeline, and specifies how to connect them by means of its Connect method. Example:

type MyPipeline struct {
	Load      pipe.Start[string]
	Transform pipe.Middle[string, string]
	Store     pipe.Final[string]
}
func (m *MyPipeline) Connect() {
	m.Load.SendTo(m.Transform)
	m.Transform.SendTo(m.Store)
}

The fields are assigned to nodes by the Builder, by means of AddStart, AddStartProvider, AddMiddle, AddMiddleProvider, AddFinal and AddFinalProvider

type Option

type Option func(options *creationOptions)

Option allows overriding the default properties of the nodes and connections of a pipeline.

func ChannelBufferLen

func ChannelBufferLen(length int) Option

ChannelBufferLen is an Option that allows specifying the length of the input channels for a given node. The default value is 0, which means that the channels are unbuffered.

type Receiver

type Receiver[IN any] interface {
	// contains filtered or unexported methods
}

Receiver is any node that can receive data from another node: Middle or Final nodes

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner stores all the configured nodes of a pipeline once their nodes are instantiated (as specified by AddStart, AddStartProvider, AddMiddle, AddMiddleProvider, AddFinal, AddFinalProvider) and connected according to the Connect function of the NodesMap that is provided to the Builder.

func (*Runner) Done

func (b *Runner) Done() <-chan struct{}

Done returns a channel that is closed when all the nodes of the pipeline have stopped processing data. This is, the functions running the node logic have returned.

func (*Runner) Start

func (b *Runner) Start()

Start the pipeline processing in a background.

type Sender

type Sender[OUT any] interface {
	// SendTo connects a Sender with a group of Receiver instances.
	SendTo(r ...Receiver[OUT]) // TODO: fail if there is any middle or final node not being destination of any "SendTo"
}

Sender is any node that can send data to another node: Start or Middle.

type Start

type Start[OUT any] interface {
	Sender[OUT]
}

Start nodes insert data into the pipeline. They only can send data to the pipeline, despite they could acquire data by other means out of the pipes library.

type StartFunc

type StartFunc[OUT any] func(out chan<- OUT)

StartFunc is a function that receives a writable channel as unique argument, and sends values to that channel during an indefinite amount of time.

func IgnoreStart

func IgnoreStart[OUT any]() StartFunc[OUT]

IgnoreStart is a convenience function to explicitly specify that the returned StartFunc is going to be ignored/bypassed by the pipes library.

type StartProvider

type StartProvider[OUT any] func() (StartFunc[OUT], error)

StartProvider is a function that returns a StartFunc to be used as Start node in a pipeline. It also might return an error if there is a problem with the configuration or instantiation of the function.

If both the returned function and the error are nil, the start node will be ignored and would be equivalent to not adding it to the pipeline.

For readability, don't do:

return nil, nil

instead, use the equivalent convenience function:

return IgnoreStart[T](), nil

type StartPtr

type StartPtr[IMPL NodesMap, OUT any] func(IMPL) *Start[OUT]

StartPtr is a function that, given a NodesMap, returns a pointer to a Start node, which is going to be used as store destination when this function is passed as argument to AddStartProvider or AddStart functions.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL