Documentation ¶
Index ¶
- Variables
- func GetRowConvTransformFunc(rc *rowconv.RowConverter) func(row.Row, ReadableMap) ([]*TransformedRowResult, string)
- func GetTransFailureDetails(err error) string
- func GetTransFailureRow(err error) row.Row
- func GetTransFailureSqlRow(err error) sql.Row
- func GetTransFailureTransName(err error) string
- func IsTransformFailure(err error) bool
- type BadRowCallback
- type ImmutableProperties
- type InFunc
- type NamedTransform
- type OutFunc
- type Pipeline
- func (p *Pipeline) Abort()
- func (p *Pipeline) AddStage(stage NamedTransform)
- func (p *Pipeline) InjectRow(stageName string, r row.Row)
- func (p *Pipeline) InjectRowWithProps(stageName string, r row.Row, props map[string]interface{})
- func (p *Pipeline) IsStopping() bool
- func (p *Pipeline) NoMore()
- func (p *Pipeline) RunAfter(f func())
- func (p *Pipeline) SetBadRowCallback(callback BadRowCallback)
- func (p *Pipeline) SetOutput(outFunc OutFunc)
- func (p *Pipeline) Start()
- func (p *Pipeline) StopWithErr(err error)
- func (p *Pipeline) Wait() error
- type ReadableMap
- type RowWithProps
- type SinkFunc
- type SourceFunc
- type TransformCollection
- type TransformFunc
- type TransformRowFailure
- type TransformRowFunc
- type TransformedRowResult
Constants ¶
This section is empty.
Variables ¶
var NoProps = ImmutableProperties{}
NoProps is an empty ImmutableProperties struct
Functions ¶
func GetRowConvTransformFunc ¶
func GetRowConvTransformFunc(rc *rowconv.RowConverter) func(row.Row, ReadableMap) ([]*TransformedRowResult, string)
GetRowConvTranformFunc can be used to wrap a RowConverter and use that RowConverter in a pipeline.
func GetTransFailureDetails ¶
GetTransFailureDetails extracts the details string from an error that is an instance of a TransformRowFailure
func GetTransFailureRow ¶
GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure
func GetTransFailureSqlRow ¶
GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure
func GetTransFailureTransName ¶
GetTransFailureTransName extracts the name of the transform that failed from an error that is an instance of a TransformRowFailure
func IsTransformFailure ¶
IsTransformFailure will return true if the error is an instance of a TransformRowFailure
Types ¶
type BadRowCallback ¶
type BadRowCallback func(*TransformRowFailure) (quit bool)
BadRowCallback is a callback function that is called when a bad row is encountered. returning true from this function when called will quit the entire pipeline
type ImmutableProperties ¶
type ImmutableProperties struct {
// contains filtered or unexported fields
}
ImmutableProperties is a map of properties which can't be edited after creation
func (ImmutableProperties) Get ¶
func (ip ImmutableProperties) Get(propName string) (interface{}, bool)
Get retrieves an element from the map, and a bool which says if there was a property that exists with that name at all
func (ImmutableProperties) Set ¶
func (ip ImmutableProperties) Set(updates map[string]interface{}) ImmutableProperties
Set will create a new ImmutableProperties struct whose values are the original properties combined with the provided updates
type InFunc ¶
type InFunc func(p *Pipeline, ch chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, noMoreChan <-chan struct{})
InFunc is a pipeline input function that reads row data from a source and puts it in a channel.
func InFuncForChannel ¶
InFuncForChannel returns an InFunc that reads off the channel given.
func ProcFuncForReader ¶
func ProcFuncForReader(ctx context.Context, rd table.TableReader) InFunc
ProcFuncForReader adapts a standard TableReader to work as an InFunc for a pipeline
func ProcFuncForSourceFunc ¶
func ProcFuncForSourceFunc(sourceFunc SourceFunc) InFunc
ProcFuncForSourceFunc is a helper method that creates an InFunc for a given SourceFunc. It takes care of channel processing, stop conditions, and error handling.
type NamedTransform ¶
type NamedTransform struct { // The name of the transform (If an error occurs during processing this name will be provided as the TransformName // in any TransformRowFailure error. Name string // Func is the TransformFunc being applied Func TransformFunc }
NamedTransform is a struct containing a TransformFunc and the name of the transform being applied. If an error occurs during processing this name will be provided as the TransformName in any TransformRowFailure error.
func NewNamedTransform ¶
func NewNamedTransform(name string, transRowFunc TransformRowFunc) NamedTransform
NewNamedTransform returns a NamedTransform object from a name and a TransformRowFunc. The returned NamedTransform will have its Func member set to be a TransformFunc that handles input, output, and stop channel processing, along with error handling and it will call the given TransformRowFunc for every row.
type OutFunc ¶
type OutFunc func(p *Pipeline, ch <-chan RowWithProps, badRowChan chan<- *TransformRowFailure)
OutFunc is a pipeline output function that takes the data the pipeline has processed off of the channel.
func ProcFuncForSinkFunc ¶
ProcFuncForSinkFunc is a helper method that creates an OutFunc for a given SinkFunc. It takes care of channel processing, stop conditions, and error handling.
func ProcFuncForWriter ¶
func ProcFuncForWriter(ctx context.Context, wr table.TableWriter) OutFunc
ProcFuncForWriter adapts a standard TableWriter to work as an OutFunc for a pipeline
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a struct that manages the operation of a row processing pipeline, where data is read from some source and written to a channel by the InFunc. An optional series of transformation functions read from this output as their input, passing output to the next stage, ultimately to the OutFunc. Each transform has a name, and is referred to as a stage in the pipeline.
Pipelines can be constructed in phases, with different call sites adding transformations or even redirecting output as required. Once a pipeline is started with Start(), all configuration methods will panic.
Pipelines can be supplied with callbacks to run after they complete, which happens when output has finished writing, or when Abort() or StopWithError() is called.
Pipelines must be cleaned up by a call to either Wait, Abort, or StopWithError, all of which run any deferred functions registered with the pipeline via calls to RunAfter (e.g. closing readers and writers).
Ironically, not even a little thread safe.
func NewAsyncPipeline ¶
func NewAsyncPipeline(inFunc InFunc, outFunc OutFunc, stages *TransformCollection, badRowCB BadRowCallback) *Pipeline
NewAsyncPipeline creates a Pipeline from a given InFunc, OutFunc, TransformCollection, and a BadRowCallback.
func NewPartialPipeline ¶
NewPartialPipeline creates a pipeline stub that doesn't have an output func set on it yet. An OutFunc must be applied via a call to SetOutput before calling Start().
func (*Pipeline) AddStage ¶
func (p *Pipeline) AddStage(stage NamedTransform)
AddStage adds a new named transform to the set of stages
func (*Pipeline) InjectRow ¶
InjectRow injects a row at a particular stage in the pipeline. The row will be processed before other pipeline input arrives.
func (*Pipeline) InjectRowWithProps ¶
func (*Pipeline) IsStopping ¶
IsStopping returns true if the pipeline is currently stopping
func (*Pipeline) NoMore ¶
func (p *Pipeline) NoMore()
NoMore signals that the pipeline has no more input to process. Must be called exactly once by the consumer when there are no more input rows to process.
func (*Pipeline) RunAfter ¶
func (p *Pipeline) RunAfter(f func())
Schedules the given function to run after the pipeline completes.
func (*Pipeline) SetBadRowCallback ¶
func (p *Pipeline) SetBadRowCallback(callback BadRowCallback)
SetBadRowCallback sets the callback to run when a bad row is encountered to the callback given
func (*Pipeline) Start ¶
func (p *Pipeline) Start()
Starts the pipeline processing. Panics if the pipeline hasn't been set up completely yet.
func (*Pipeline) StopWithErr ¶
StopWithErr provides a method by the pipeline can be stopped when an error is encountered. This would typically be done in InFuncs and OutFuncs
type ReadableMap ¶
type ReadableMap interface { // Get retrieves an element from the map, and a bool which says if there was a property that exists with that // name at all Get(propName string) (interface{}, bool) }
ReadableMap is an interface that provides read only access to map properties
type RowWithProps ¶
type RowWithProps struct { Row row.Row Props ImmutableProperties }
RowWithProps is a struct that couples a row being processed by a pipeline with properties. These properties work as a means of passing data about a row between stages in a pipeline.
func NewRowWithProps ¶
func NewRowWithProps(r row.Row, props map[string]interface{}) RowWithProps
NewRowWithProps creates a RowWith props from a row and a map of properties
type SinkFunc ¶
type SinkFunc func(row.Row, ReadableMap) error
SinkFunc is a function that will process the final transformed rows from a pipeline. This function will be called once for every row that makes it through the pipeline
type SourceFunc ¶
type SourceFunc func() (row.Row, ImmutableProperties, error)
SourceFunc is a function that will return a new row for each successive call until all it's rows are exhausted, at which point io.EOF should be returned
func SourceFuncForRows ¶
func SourceFuncForRows(rows []row.Row) SourceFunc
SourceFuncForRows returns a source func that yields the rows given in order. Suitable for very small result sets that are statically defined or otherwise fit easily into memory.
type TransformCollection ¶
type TransformCollection struct { // Transforms is a slice of named transforms stored in the order they will be applied Transforms []NamedTransform }
TransformCollection is a collection of transforms to be applied in order in a pipeline
func NewTransformCollection ¶
func NewTransformCollection(namedTransforms ...NamedTransform) *TransformCollection
NewTransformCollection creates a TransformCollection from NamedTransforms
func (*TransformCollection) AppendTransforms ¶
func (tc *TransformCollection) AppendTransforms(nt ...NamedTransform)
AppendTransform will mutate the internal slice of transforms by appending this new transform to the slice of Transforms
func (*TransformCollection) NumTransforms ¶
func (tc *TransformCollection) NumTransforms() int
NumTransforms returns the number of NamedTransforms in the collection
func (*TransformCollection) TransformAt ¶
func (tc *TransformCollection) TransformAt(idx int) NamedTransform
TransformAt returns the NamedTransform at a given index
type TransformFunc ¶
type TransformFunc func(inChan <-chan RowWithProps, outChan chan<- RowWithProps, badRowChan chan<- *TransformRowFailure, stopChan <-chan struct{})
TransformFunc reads rows from the inChan, transforms them, and then writes them to the outChan. If an error occurs processing a row a TransformRowFailure will be written to the failure channel, and if the stopChan is closed it should exit all processing.
type TransformRowFailure ¶
TransformRowFailure is an error implementation that stores the row that failed to transform, the transform that failed and some details of the error
func (*TransformRowFailure) Error ¶
func (trf *TransformRowFailure) Error() string
Error returns a string containing details of the error that occurred
type TransformRowFunc ¶
type TransformRowFunc func(inRow row.Row, props ReadableMap) (rowData []*TransformedRowResult, badRowDetails string)
TransformRowFunc processes a single row and it's properties and can return 0 or more TransformRowResults per row. If the row being processed is bad it should return nil, and a string containing details of the row problem.
type TransformedRowResult ¶
type TransformedRowResult struct { // RowData is the new row that should be passed on to the next stage RowData row.Row // PropertyUpdates are mutations that should be applied to the row's properties PropertyUpdates map[string]interface{} }
TransformedRowResult is what will be returned from each stage of a transform