pipeline

package
v0.0.0-...-18d85b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: Apache-2.0, Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

TODO explain here how to write wrappers to use without casting from `interface{}`.

Index

Constants

This section is empty.

Variables

View Source
var ErrShutdown = errors.New("Pipeline shutdown")

ErrShutdown is an error send to post-processing hook when pipeline has been shutdown.

Functions

This section is empty.

Types

type BufferedReadWriter

type BufferedReadWriter struct {
	// contains filtered or unexported fields
}

BufferedReadWriter implements Reader and Writer and acts like a pipe. All writes are queued in a buffered channel and are waiting to be consumed.

Used internally by Pipeline but also helpful for testing.

func (*BufferedReadWriter) Close

func (b *BufferedReadWriter) Close() error

Close can be called in `Writer` and `Reader` context.

In `Reader` it means that no more values will be read so writer can stop writing to a buffer (`io.ErrClosedPipe` will be returned for calls to `Write()`).

In `Writer` it means that no more values will be written so reader should start returning `io.EOF` error after returning all queued values.

func (*BufferedReadWriter) GetContext

func (b *BufferedReadWriter) GetContext() context.Context

func (*BufferedReadWriter) QueuedEntries

func (b *BufferedReadWriter) QueuedEntries() int

func (*BufferedReadWriter) Read

func (b *BufferedReadWriter) Read() (interface{}, error)

func (*BufferedReadWriter) Write

func (b *BufferedReadWriter) Write(entry interface{}) error

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(rootProcessor *PipelineNode) *Pipeline

func (*Pipeline) AddPostProcessingHook

func (p *Pipeline) AddPostProcessingHook(hook func(context.Context, error) error)

AddPostProcessingHook adds post-processing hook. Context will be a main reader context and error will be nil, if processing was successful, ErrShutdown when pipeline was shutdown and non nil otherwise.

func (*Pipeline) AddPreProcessingHook

func (p *Pipeline) AddPreProcessingHook(hook func(context.Context) (context.Context, error))

AddPreProcessingHook adds post-processing hook. Context will be a main reader context.

func (*Pipeline) IsRunning

func (p *Pipeline) IsRunning() bool

IsRunning returns true if pipeline is running

func (*Pipeline) PrintStatus

func (p *Pipeline) PrintStatus()

func (*Pipeline) Process

func (p *Pipeline) Process(reader Reader) <-chan error

Process starts pipeline. Return channel will return if an error occurred in any of the processors or any of the pipeline hooks. Will return ErrShutdown if the pipeline was shutdown.

func (*Pipeline) SetRoot

func (p *Pipeline) SetRoot(rootProcessor *PipelineNode)

func (*Pipeline) Shutdown

func (p *Pipeline) Shutdown()

Shutdown stops the processing. Please note that post-processing hooks will receive ErrShutdown when Shutdown() is called.

type PipelineInterface

type PipelineInterface interface {
	SetRoot(rootProcessor *PipelineNode)
	// AddPreProcessingHook adds a pre-processing hook function. Returned
	// context.Context will be passed to the processors. If error is returned
	// pipeline will not start processing data.
	AddPreProcessingHook(hook func(context.Context) (context.Context, error))
	AddPostProcessingHook(hook func(context.Context, error) error)
	Shutdown()
	PrintStatus()
}

PipelineInterface is an interface that defines common pipeline methods in structs that embed Pipeline.

type PipelineNode

type PipelineNode struct {
	// Remember to update reset() method if you ever add a new field to this struct!
	Processor Processor
	Children  []*PipelineNode
	// contains filtered or unexported fields
}

func Node

func Node(processor Processor) *PipelineNode

func (*PipelineNode) Pipe

func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode

type Processor

type Processor interface {
	// Process is a main method of `Processor`. It receives `Reader`
	// that contains object passed down the pipeline from the previous procesor. Writes to
	// `Writer` will be passed to the next processor. WARNING! `Process`
	// should **always** call `Close()` on `Writer` when no more object will be
	// written and `Close()` on `Reader` when reading is finished.
	// Data required by following processors (like aggregated data) should be saved in
	// `Store`. Read `Store` godoc to understand how to use it.
	// The first argument `ctx` is a context with cancel. Processor should monitor
	// `ctx.Done()` channel and exit when it returns a value. This can happen when
	// pipeline execution is interrupted, ex. due to an error.
	//
	// Given all information above `Process` should always look like this:
	//
	//    func (p *Processor) Process(ctx context.Context, store *pipeline.Store, r Reader, w Writer) error {
	//    	defer r.Close()
	//    	defer w.Close()
	//
	//    	// Some pre code...
	//
	//    	for {
	//    		entry, err := r.Read()
	//    		if err != nil {
	//    			if err == io.EOF {
	//    				break
	//    			} else {
	//    				return errors.Wrap(err, "Error reading from Reader in [ProcessorName]")
	//    			}
	//    		}
	//
	//    		// Process entry...
	//
	//    		// Write to Writer if needed but exit if pipe is closed:
	//    		err = w.Write(entry)
	//    		if err != nil {
	//    			if err == io.ErrClosedPipe {
	//    				// Reader does not need more data
	//    				return nil
	//    			}
	//    			return errors.Wrap(err, "Error writing to Writer in [ProcessorName]")
	//    		}
	//
	//    		// Return errors if needed...
	//
	//    		// Exit when pipeline terminated due to an error in another processor...
	//    		select {
	//    		case <-ctx.Done():
	//    			return nil
	//    		default:
	//    			continue
	//    		}
	//    	}
	//
	//    	// Some post code...
	//
	//    	return nil
	//    }
	Process(context.Context, *Store, Reader, Writer) error
	// Returns processor name. Helpful for errors, debuging and reports.
	Name() string
	// Reset resets internal state of the processor. This is run by the pipeline
	// everytime before the pipeline starts running.
	// It is extremely important to implement this method, otherwise internal
	// state of the processor will be maintained between pipeline runs and may
	// result in an invalid data.
	Reset()
}

Processor defines methods required by the processing pipeline.

type Reader

type Reader interface {
	// GetContext returns context with values of the current reader. Can be
	// helpful to provide data to structs that wrap `Reader`.
	GetContext() context.Context
	// Read should return next entry. If there are no more
	// entries it should return `io.EOF` error.
	Read() (interface{}, error)
	// Close should be called when reading is finished. This is especially
	// helpful when there are still some entries available so reader can stop
	// streaming them.
	Close() error
}

Reader interface placeholder

type Store

type Store struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Store allows storing data connected to pipeline execution. It exposes `Lock()` and `Unlock()` methods that must be called when accessing the `Store` for both `Put` and `Get` calls.

Example (incrementing a number): s.Lock() v := s.Get("value") s.Put("value", v.(int)+1) s.Unlock()

func (*Store) Get

func (s *Store) Get(name string) interface{}

func (*Store) Put

func (s *Store) Put(name string, value interface{})

type Writer

type Writer interface {
	// Write is used to pass entry to the next processor. It can return
	// `io.ErrClosedPipe` when the pipe between processors has been closed meaning
	// that next processor does not need more data. In such situation the current
	// processor can terminate as sending more entries to a `Writer`
	// does not make sense (will not be read).
	Write(interface{}) error
	// Close should be called when there are no more entries
	// to write.
	Close() error
}

Writer interface placeholder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL