pipeline

package
v0.0.0-...-e97be17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Payload

type Payload interface {
	// Clone returns a new Payload that's a deep-copy of the original.
	Clone() Payload

	// MarkAsProcessed called by the pipeline when it reaches the output sink
	// or discarded by the pipeline.
	MarkAsProcessed()
}

Payload is implementaed by values that can be sent to the pipeline.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(stages ...StageRunner) *Pipeline

New return a new Pipeline instance where input payloads will traverse each one of the stages

func (*Pipeline) Process

func (p *Pipeline) Process(ctx context.Context, source Source, sink Sink) error

Process reads the contents of the specified source, sends them through the various stages of the pipeline and directs the results to the specified sink and returns back any errors that may have occurred.

Calls to Process block until:

  • all data from the source has been processed OR
  • an error occurs OR
  • the supplied context expires/cancelled

It is safe to call Process concurrently with different sources and sinks.

type Processor

type Processor interface {
	// Process takes the input Payload and return a new Payload to be sent either to the
	// next stage or the output sink. Process can also apt to prevent the Payload from reaching
	// the next stage by returning nil.
	Process(context.Context, Payload) (Payload, error)
}

Processor is implemented by types that can process the Payload as part of pipeline stage.

type ProcessorFunc

type ProcessorFunc func(context.Context, Payload) (Payload, error)

ProcessorFunc is adapter to Process

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(ctx context.Context, p Payload) (Payload, error)

type Sink

type Sink interface {
	// Consume process a Payload instance that's outputted by the pipeline.
	Consume(context.Context, Payload) error
}

type Source

type Source interface {
	// Next fetches the next Payload. If an error occur or no more payload
	// it returns false
	Next(context.Context) bool

	// Payload returns a Payload to be processed
	Payload() Payload

	// Error return the last error observed by the source.
	Error() error
}

type StageParams

type StageParams interface {
	// StageIndex returns the position of a stage in the pipeline.
	StageIndex() int
	// Input returns a channel for reading the input Payload into the stage.
	Input() <-chan Payload
	// Output returns a channel for writing the stage output.
	Output() chan<- Payload
	// Error returns a channel for writing the errors that were encountered
	// during the stage execution.
	Error() chan<- error
}

type StageRunner

type StageRunner interface {
	// Run implement the process logic of a stage. Run reads input payload from Input channel
	// and writes its output to Output channel.
	// Calls to Run expected to block until one of the following occurs:
	// - Input channel is closed.
	// - Its context got cancelled
	// - Error happen while processing the payload.
	Run(context.Context, StageParams)
}

StageRunner implemented by types that can be chained together to form multi-stage pipeline.

type WorkerParams

type WorkerParams struct {
	Stage int

	InCh  <-chan Payload
	OutCh chan<- Payload
	ErrCh chan<- error
}

func (*WorkerParams) Error

func (wp *WorkerParams) Error() chan<- error

func (*WorkerParams) Input

func (wp *WorkerParams) Input() <-chan Payload

func (*WorkerParams) Output

func (wp *WorkerParams) Output() chan<- Payload

func (*WorkerParams) StageIndex

func (wp *WorkerParams) StageIndex() int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL