pipeline

package
v0.40.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

NoProps is an empty ImmutableProperties struct

Functions

func GetRowConvTransformFunc

func GetRowConvTransformFunc(rc *rowconv.RowConverter) func(row.Row, ReadableMap) ([]*TransformedRowResult, string)

GetRowConvTranformFunc can be used to wrap a RowConverter and use that RowConverter in a pipeline.

func GetTransFailureDetails

func GetTransFailureDetails(err error) string

GetTransFailureDetails extracts the details string from an error that is an instance of a TransformRowFailure

func GetTransFailureRow

func GetTransFailureRow(err error) row.Row

GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure

func GetTransFailureSqlRow

func GetTransFailureSqlRow(err error) sql.Row

GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure

func GetTransFailureTransName

func GetTransFailureTransName(err error) string

GetTransFailureTransName extracts the name of the transform that failed from an error that is an instance of a TransformRowFailure

func IsTransformFailure

func IsTransformFailure(err error) bool

IsTransformFailure will return true if the error is an instance of a TransformRowFailure

Types

type BadRowCallback

type BadRowCallback func(*TransformRowFailure) (quit bool)

BadRowCallback is a callback function that is called when a bad row is encountered. returning true from this function when called will quit the entire pipeline

type ImmutableProperties

type ImmutableProperties struct {
	// contains filtered or unexported fields
}

ImmutableProperties is a map of properties which can't be edited after creation

func (ImmutableProperties) Get

func (ip ImmutableProperties) Get(propName string) (interface{}, bool)

Get retrieves an element from the map, and a bool which says if there was a property that exists with that name at all

func (ImmutableProperties) Set

func (ip ImmutableProperties) Set(updates map[string]interface{}) ImmutableProperties

Set will create a new ImmutableProperties struct whose values are the original properties combined with the provided updates

type InFunc

type InFunc func(p *Pipeline, ch chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, noMoreChan <-chan struct{})

InFunc is a pipeline input function that reads row data from a source and puts it in a channel.

func InFuncForChannel

func InFuncForChannel(rowChan <-chan row.Row) InFunc

InFuncForChannel returns an InFunc that reads off the channel given.

func ProcFuncForReader

func ProcFuncForReader(ctx context.Context, rd table.TableReader) InFunc

ProcFuncForReader adapts a standard TableReader to work as an InFunc for a pipeline

func ProcFuncForSourceFunc

func ProcFuncForSourceFunc(sourceFunc SourceFunc) InFunc

ProcFuncForSourceFunc is a helper method that creates an InFunc for a given SourceFunc. It takes care of channel processing, stop conditions, and error handling.

type NamedTransform

type NamedTransform struct {
	// The name of the transform (If an error occurs during processing this name will be provided as the TransformName
	// in any TransformRowFailure error.
	Name string

	// Func is the TransformFunc being applied
	Func TransformFunc
}

NamedTransform is a struct containing a TransformFunc and the name of the transform being applied. If an error occurs during processing this name will be provided as the TransformName in any TransformRowFailure error.

func NewNamedTransform

func NewNamedTransform(name string, transRowFunc TransformRowFunc) NamedTransform

NewNamedTransform returns a NamedTransform object from a name and a TransformRowFunc. The returned NamedTransform will have its Func member set to be a TransformFunc that handles input, output, and stop channel processing, along with error handling and it will call the given TransformRowFunc for every row.

type OutFunc

type OutFunc func(p *Pipeline, ch <-chan RowWithProps, badRowChan chan<- *TransformRowFailure)

OutFunc is a pipeline output function that takes the data the pipeline has processed off of the channel.

func ProcFuncForSinkFunc

func ProcFuncForSinkFunc(sinkFunc SinkFunc) OutFunc

ProcFuncForSinkFunc is a helper method that creates an OutFunc for a given SinkFunc. It takes care of channel processing, stop conditions, and error handling.

func ProcFuncForWriter

func ProcFuncForWriter(ctx context.Context, wr table.TableWriter) OutFunc

ProcFuncForWriter adapts a standard TableWriter to work as an OutFunc for a pipeline

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a struct that manages the operation of a row processing pipeline, where data is read from some source and written to a channel by the InFunc. An optional series of transformation functions read from this output as their input, passing output to the next stage, ultimately to the OutFunc. Each transform has a name, and is referred to as a stage in the pipeline.

Pipelines can be constructed in phases, with different call sites adding transformations or even redirecting output as required. Once a pipeline is started with Start(), all configuration methods will panic.

Pipelines can be supplied with callbacks to run after they complete, which happens when output has finished writing, or when Abort() or StopWithError() is called.

Pipelines must be cleaned up by a call to either Wait, Abort, or StopWithError, all of which run any deferred functions registered with the pipeline via calls to RunAfter (e.g. closing readers and writers).

Ironically, not even a little thread safe.

func NewAsyncPipeline

func NewAsyncPipeline(inFunc InFunc, outFunc OutFunc, stages *TransformCollection, badRowCB BadRowCallback) *Pipeline

NewAsyncPipeline creates a Pipeline from a given InFunc, OutFunc, TransformCollection, and a BadRowCallback.

func NewPartialPipeline

func NewPartialPipeline(inFunc InFunc) *Pipeline

NewPartialPipeline creates a pipeline stub that doesn't have an output func set on it yet. An OutFunc must be applied via a call to SetOutput before calling Start().

func (*Pipeline) Abort

func (p *Pipeline) Abort()

Abort signals the pipeline to stop processing.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(stage NamedTransform)

AddStage adds a new named transform to the set of stages

func (*Pipeline) InjectRow

func (p *Pipeline) InjectRow(stageName string, r row.Row)

InjectRow injects a row at a particular stage in the pipeline. The row will be processed before other pipeline input arrives.

func (*Pipeline) InjectRowWithProps

func (p *Pipeline) InjectRowWithProps(stageName string, r row.Row, props map[string]interface{})

func (*Pipeline) IsStopping

func (p *Pipeline) IsStopping() bool

IsStopping returns true if the pipeline is currently stopping

func (*Pipeline) NoMore

func (p *Pipeline) NoMore()

NoMore signals that the pipeline has no more input to process. Must be called exactly once by the consumer when there are no more input rows to process.

func (*Pipeline) RunAfter

func (p *Pipeline) RunAfter(f func())

Schedules the given function to run after the pipeline completes.

func (*Pipeline) SetBadRowCallback

func (p *Pipeline) SetBadRowCallback(callback BadRowCallback)

SetBadRowCallback sets the callback to run when a bad row is encountered to the callback given

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(outFunc OutFunc)

SetOutput sets the output function to the function given

func (*Pipeline) Start

func (p *Pipeline) Start()

Starts the pipeline processing. Panics if the pipeline hasn't been set up completely yet.

func (*Pipeline) StopWithErr

func (p *Pipeline) StopWithErr(err error)

StopWithErr provides a method by the pipeline can be stopped when an error is encountered. This would typically be done in InFuncs and OutFuncs

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait waits for the pipeline to complete and return any error that occurred during its execution.

type ReadableMap

type ReadableMap interface {
	// Get retrieves an element from the map, and a bool which says if there was a property that exists with that
	// name at all
	Get(propName string) (interface{}, bool)
}

ReadableMap is an interface that provides read only access to map properties

type RowWithProps

type RowWithProps struct {
	Row   row.Row
	Props ImmutableProperties
}

RowWithProps is a struct that couples a row being processed by a pipeline with properties. These properties work as a means of passing data about a row between stages in a pipeline.

func NewRowWithProps

func NewRowWithProps(r row.Row, props map[string]interface{}) RowWithProps

NewRowWithProps creates a RowWith props from a row and a map of properties

type SinkFunc

type SinkFunc func(row.Row, ReadableMap) error

SinkFunc is a function that will process the final transformed rows from a pipeline. This function will be called once for every row that makes it through the pipeline

type SourceFunc

type SourceFunc func() (row.Row, ImmutableProperties, error)

SourceFunc is a function that will return a new row for each successive call until all it's rows are exhausted, at which point io.EOF should be returned

func SourceFuncForRows

func SourceFuncForRows(rows []row.Row) SourceFunc

SourceFuncForRows returns a source func that yields the rows given in order. Suitable for very small result sets that are statically defined or otherwise fit easily into memory.

type TransformCollection

type TransformCollection struct {
	// Transforms is a slice of named transforms stored in the order they will be applied
	Transforms []NamedTransform
}

TransformCollection is a collection of transforms to be applied in order in a pipeline

func NewTransformCollection

func NewTransformCollection(namedTransforms ...NamedTransform) *TransformCollection

NewTransformCollection creates a TransformCollection from NamedTransforms

func (*TransformCollection) AppendTransforms

func (tc *TransformCollection) AppendTransforms(nt ...NamedTransform)

AppendTransform will mutate the internal slice of transforms by appending this new transform to the slice of Transforms

func (*TransformCollection) NumTransforms

func (tc *TransformCollection) NumTransforms() int

NumTransforms returns the number of NamedTransforms in the collection

func (*TransformCollection) TransformAt

func (tc *TransformCollection) TransformAt(idx int) NamedTransform

TransformAt returns the NamedTransform at a given index

type TransformFunc

type TransformFunc func(inChan <-chan RowWithProps, outChan chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, stopChan <-chan struct{})

TransformFunc reads rows from the inChan, transforms them, and then writes them to the outChan. If an error occurs processing a row a TransformRowFailure will be written to the failure channel, and if the stopChan is closed it should exit all processing.

type TransformRowFailure

type TransformRowFailure struct {
	Row           row.Row
	SqlRow        sql.Row
	TransformName string
	Details       string
}

TransformRowFailure is an error implementation that stores the row that failed to transform, the transform that failed and some details of the error

func (*TransformRowFailure) Error

func (trf *TransformRowFailure) Error() string

Error returns a string containing details of the error that occurred

type TransformRowFunc

type TransformRowFunc func(inRow row.Row, props ReadableMap) (rowData []*TransformedRowResult, badRowDetails string)

TransformRowFunc processes a single row and it's properties and can return 0 or more TransformRowResults per row. If the row being processed is bad it should return nil, and a string containing details of the row problem.

type TransformedRowResult

type TransformedRowResult struct {
	// RowData is the new row that should be passed on to the next stage
	RowData row.Row

	// PropertyUpdates are mutations that should be applied to the row's properties
	PropertyUpdates map[string]interface{}
}

TransformedRowResult is what will be returned from each stage of a transform

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL