pipeline

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package pipeline provides a pipeline for processing data.

The pipeline package offers a convenient way to process data using a series of stages. Each stage in the pipeline performs a specific operation on the data and passes it to the next stage. This allows for a modular and flexible approach to data processing.

One of the key benefits of using the pipeline package is that it manages the flow of data using channels. This ensures that data is passed between stages efficiently and without the need for complex synchronisation mechanisms. Additionally, the use of channels enables concurrent processing, allowing multiple stages to execute in parallel, which can significantly improve performance for computationally intensive tasks.

Another advantage of using the pipeline package is its error handling mechanism. The pipeline will stop on the first encountered error, preventing further processing and ensuring that errors are handled gracefully. This makes it easier to identify and debug issues in the data processing pipeline.

Overall, the pipeline package provides a convenient and efficient way to process data by leveraging channels for data flow management, supporting concurrency, and handling errors effectively.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPipelineMustBeSet is returned when the pipeline is not set.
	ErrPipelineMustBeSet = errors.New("pipe must be set")
	// ErrInputMustBeSet is returned when the input is not set.
	ErrInputMustBeSet = errors.New("input must be set")
	// ErrSplitterTotal is returned when the total is not set.
	ErrSplitterTotal = errors.New("total must be greater than 0")
)

Functions

func AddMerger

func AddMerger[I any](pipe *Pipeline, name string, steps ...*model.Step[I]) (*model.Step[I], error)

AddMerger adds a merger step to the pipeline. It will merge the output of the steps into a single channel.

func AddRootStep

func AddRootStep[O any](
	pipe *Pipeline,
	name string,
	stepFn func(ctx context.Context, rootChan chan<- O) error,
	opts ...StepOption[O],
) (*model.Step[O], error)

AddRootStep adds a root step to the pipeline. It will run the step function.

func AddSink

func AddSink[I any](pipe *Pipeline, name string, input *model.Step[I], sinkFn func(ctx context.Context, input I) error) error

AddSink adds a sink step to the pipeline. It will consume the input channel and run the sink function.

func AddSinkFromChan

func AddSinkFromChan[I any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	stepFn func(ctx context.Context, input <-chan I) error,
) error

AddSinkFromChan adds a sink step to the pipeline. It will consume the input channel.

func AddStepFromChan

func AddStepFromChan[I any, O any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	stepFromChan StepFromChanFn[I, O],
	opts ...StepOption[O],
) (*model.Step[O], error)

AddStepFromChan adds a step that takes an input channel and produces an output channel.

func AddStepOneToMany

func AddStepOneToMany[I any, O any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	oneToMany OneToManyFn[I, O],
	opts ...StepOption[O],
) (*model.Step[O], error)

AddStepOneToMany adds a step that takes one input and produces many outputs.

func AddStepOneToOne

func AddStepOneToOne[I any, O any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	oneToOne OneToOneFn[I, O],
	opts ...StepOption[O],
) (*model.Step[O], error)

AddStepOneToOne adds a step that takes one input and produces one output.

func AddStepOneToOneOrZero

func AddStepOneToOneOrZero[I any, O any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	oneToOne OneToOneFn[I, O],
	opts ...StepOption[O],
) (*model.Step[O], error)

AddStepOneToOneOrZero adds a step that takes one input and produces one output. If the output is a zero value, it is ignored.

Types

type OneToManyFn

type OneToManyFn[I, O any] func(context.Context, I) ([]O, error)

OneToManyFn is a function that takes an input and produces many outputs.

type OneToOneFn

type OneToOneFn[I, O any] func(context.Context, I) (O, error)

OneToOneFn is a function that takes an input and produces an output.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a pipeline of steps.

func New

func New(ctx context.Context, opts ...model.PipelineOption) (*Pipeline, error)

New creates a new pipeline.

func (*Pipeline) Run

func (p *Pipeline) Run() error

Run starts the pipeline and waits for it to finish.

type Splitter

type Splitter[I any] struct {
	Total int
	// contains filtered or unexported fields
}

Splitter is a step that splits the input into multiple outputs.

func AddSplitter

func AddSplitter[I any](pipe *Pipeline, name string, input *model.Step[I], total int, opts ...SplitterOption[I]) (*Splitter[I], error)

AddSplitter adds a splitter step to the pipeline. It will split the input into multiple outputs based on the total.

func AddSplitterFn

func AddSplitterFn[I any](
	pipe *Pipeline,
	name string,
	input *model.Step[I],
	fns []SplitterFn[I],
	opts ...SplitterOption[I],
) (*Splitter[I], error)

AddSplitterFn adds a splitter step to the pipeline. It will split the input into multiple outputs based on the provided functions.

func (*Splitter[I]) Get

func (s *Splitter[I]) Get() (*model.Step[I], bool)

Get returns the next splitted step.

type SplitterFn

type SplitterFn[I any] func(input I) (bool, error)

SplitterFn is a function that returns wether to keep the input or not.

type SplitterOption

type SplitterOption[I any] func(s *Splitter[I])

SplitterOption is a function that modifies a Splitter.

func SplitterBufferSize

func SplitterBufferSize[I any](bufferSize int) SplitterOption[I]

SplitterBufferSize sets the buffer size of the Splitter. Each splitted step will have a buffer of this size.

type StepFromChanFn

type StepFromChanFn[I, O any] func(ctx context.Context, input <-chan I, output chan O) error

StepFromChanFn is a function that takes an input channel and produces an output channel.

type StepOption

type StepOption[O any] func(s *model.Step[O])

StepOption is a function that modifies a Step.

func StepConcurrency

func StepConcurrency[O any](concurrent int) StepOption[O]

StepConcurrency sets the concurrency of the step.

func StepKeepOpen

func StepKeepOpen[O any]() StepOption[O]

StepKeepOpen does not close input channel.

Directories

Path Synopsis
Package drawer provides a way to draw a pipeline graph.
Package drawer provides a way to draw a pipeline graph.
Package measure provides a way to measure the performance of a pipeline.
Package measure provides a way to measure the performance of a pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL