Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Payload ¶
type Payload interface { // Clone returns a new Payload that's a deep-copy of the original. Clone() Payload // MarkAsProcessed called by the pipeline when it reaches the output sink // or discarded by the pipeline. MarkAsProcessed() }
Payload is implementaed by values that can be sent to the pipeline.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func New ¶
func New(stages ...StageRunner) *Pipeline
New return a new Pipeline instance where input payloads will traverse each one of the stages
func (*Pipeline) Process ¶
Process reads the contents of the specified source, sends them through the various stages of the pipeline and directs the results to the specified sink and returns back any errors that may have occurred.
Calls to Process block until:
- all data from the source has been processed OR
- an error occurs OR
- the supplied context expires/cancelled
It is safe to call Process concurrently with different sources and sinks.
type Processor ¶
type Processor interface { // Process takes the input Payload and return a new Payload to be sent either to the // next stage or the output sink. Process can also apt to prevent the Payload from reaching // the next stage by returning nil. Process(context.Context, Payload) (Payload, error) }
Processor is implemented by types that can process the Payload as part of pipeline stage.
type ProcessorFunc ¶
ProcessorFunc is adapter to Process
type StageParams ¶
type StageParams interface { // StageIndex returns the position of a stage in the pipeline. StageIndex() int // Input returns a channel for reading the input Payload into the stage. Input() <-chan Payload // Output returns a channel for writing the stage output. Output() chan<- Payload // Error returns a channel for writing the errors that were encountered // during the stage execution. Error() chan<- error }
type StageRunner ¶
type StageRunner interface { // Run implement the process logic of a stage. Run reads input payload from Input channel // and writes its output to Output channel. // Calls to Run expected to block until one of the following occurs: // - Input channel is closed. // - Its context got cancelled // - Error happen while processing the payload. Run(context.Context, StageParams) }
StageRunner implemented by types that can be chained together to form multi-stage pipeline.
type WorkerParams ¶
func (*WorkerParams) Error ¶
func (wp *WorkerParams) Error() chan<- error
func (*WorkerParams) Input ¶
func (wp *WorkerParams) Input() <-chan Payload
func (*WorkerParams) Output ¶
func (wp *WorkerParams) Output() chan<- Payload
func (*WorkerParams) StageIndex ¶
func (wp *WorkerParams) StageIndex() int