Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Data ¶
type Data interface { // Clone returns a new Data that is a deep-copy of the original. Clone() Data }
Data is implemented by values that can be sent through a pipeline.
type InputSource ¶
type InputSource interface { // Next fetches the next data element from the source. If no more items are // available or an error occurs, calls to Next return false. Next(context.Context) bool // Data returns the next data to be processed. Data() Data // Error return the last error observed by the source. Error() error }
InputSource is implemented by types that generate Data instances which can be used as inputs to a Pipeline instance.
type OutputSink ¶
type OutputSink interface { // Consume processes a Data instance that has been emitted out of // a Pipeline instance. Consume(context.Context, Data) error }
OutputSink is implemented by types that can operate as the tail of a pipeline.
type Pipeline ¶
Pipeline is an abstract and extendable asynchronous data pipeline with concurrent tasks at each stage. Each pipeline is constructed from an InputSource, an OutputSink, and zero or more Stage instances for processing.
func NewPipeline ¶
NewPipeline returns a new data pipeline instance where input traverse each of the provided Stage instances.
func (*Pipeline) DataItemCount ¶ added in v0.2.0
DataItemCount returns the number of data items currently on the pipeline.
func (*Pipeline) Execute ¶
func (p *Pipeline) Execute(ctx context.Context, src InputSource, sink OutputSink) error
Execute performs ExecuteBuffered with a bufsize parameter equal to 1.
func (*Pipeline) ExecuteBuffered ¶
func (p *Pipeline) ExecuteBuffered(ctx context.Context, src InputSource, sink OutputSink, bufsize int) error
ExecuteBuffered reads data from the InputSource, sends them through each of the Stage instances, and finishes with the OutputSink. All errors are returned that occurred during the execution. ExecuteBuffered will block until all data from the InputSource has been processed, or an error occurs, or the context expires.
type Stage ¶
type Stage interface { // ID returns the optional identifier assigned to this stage. ID() string // Run executes the processing logic for this stage by reading // data from the input channel, processing the data and sending // the results to the output channel. Run blocks until the stage // input channel is closed, the context expires, or an error occurs. Run(context.Context, StageParams) }
Stage is designed to be executed in sequential order to form a multi-stage data pipeline.
func Broadcast ¶
Broadcast returns a Stage that passes a copy of each incoming data to all specified tasks and emits their outputs to the next stage.
func DynamicPool ¶
DynamicPool returns a Stage that maintains a dynamic pool that can scale up to max parallel tasks for processing incoming inputs in parallel and emitting their outputs to the next stage.
func FIFO ¶
FIFO returns a Stage that processes incoming data in a first-in first-out fashion. Each input is passed to the specified Task and its output is emitted to the next Stage.
type StageParams ¶
type StageParams interface { // Pipeline returns the pipeline executing this stage. Pipeline() *Pipeline // Position returns the position of this stage in the pipeline. Position() int // Input returns the input channel for this stage. Input() <-chan Data // Output returns the output channel for this stage. Output() chan<- Data // DataQueue returns the alternative data queue for this stage. DataQueue() queue.Queue // Error returns the queue that reports errors encountered by the stage. Error() queue.Queue // Registry returns a map of stage names to stage input channels. Registry() StageRegistry }
StageParams provides the information needed for executing a pipeline Stage. The Pipeline passes a StageParams instance to the Run method of each stage.
type StageRegistry ¶
StageRegistry is a map of stage identifiers to input channels.
type Task ¶
type Task interface { // Process operates on the input data and returns back a new data to be // forwarded to the next pipeline stage. Task instances may also opt to // prevent the data from reaching the rest of the pipeline by returning // a nil data value instead. Process(context.Context, Data, TaskParams) (Data, error) }
Task is implemented by types that can process Data as part of a pipeline stage.
type TaskParams ¶
type TaskParams interface { // Pipeline returns the pipeline executing this task. Pipeline() *Pipeline // Registry returns a map of stage names to stage input channels. Registry() StageRegistry }
TaskParams provides access to pipeline mechanisms needed by a Task. The Stage passes a TaskParams instance to the Process method of each task.