pipeline

package
v0.0.0-...-672f35d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2019 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func (*Pipeline) AddStateProcessorTree

func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode)

func (*Pipeline) Node

func (p *Pipeline) Node(processor StateProcessor) *PipelineNode

func (*Pipeline) PrintStatus

func (p *Pipeline) PrintStatus()

func (*Pipeline) ProcessState

func (p *Pipeline) ProcessState(readCloser io.StateReadCloser) <-chan error

type PipelineNode

type PipelineNode struct {
	Processor StateProcessor
	Children  []*PipelineNode
	// contains filtered or unexported fields
}

func (*PipelineNode) Pipe

func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode

type ReduceStateProcessor

type ReduceStateProcessor struct {
}

ReduceStateProcessor forwards the final produced by applying all the ledger entries to the writer. Let's say that there are 3 ledger entries:

  • Create account A (XLM balance = 20)
  • Update XLM balance of A to 5
  • Update XLM balance of A to 15

Running ReduceStateProcessor will add a single ledger entry:

  • Create account A (XLM balance = 15)

to the writer.

type StateProcessor

type StateProcessor interface {
	// ProcessState is a main method of `StateProcessor`. It receives `io.StateReadCloser`
	// that contains object passed down the pipeline from the previous procesor. Writes to
	// `io.StateWriteCloser` will be passed to the next processor. WARNING! `ProcessState`
	// should **always** call `Close()` on `io.StateWriteCloser` when no more object will be
	// written and `Close()` on `io.StateReadCloser` when reading is finished.
	// Data required by following processors (like aggregated data) should be saved in
	// `Store`. Read `Store` godoc to understand how to use it.
	// The first argument `ctx` is a context with cancel. Processor should monitor
	// `ctx.Done()` channel and exit when it returns a value. This can happen when
	// pipeline execution is interrupted, ex. due to an error.
	//
	// Given all information above `ProcessState` should always look like this:
	//
	//    func (p *Processor) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error {
	//    	defer r.Close()
	//    	defer w.Close()
	//
	//    	// Some pre code...
	//
	//    	for {
	//    		entry, err := r.Read()
	//    		if err != nil {
	//    			if err == io.EOF {
	//    				break
	//    			} else {
	//    				return errors.Wrap(err, "Error reading from StateReadCloser in [ProcessorName]")
	//    			}
	//    		}
	//
	//    		// Process entry...
	//
	//    		// Write to StateWriteCloser if needed but exit if pipe is closed:
	//    		err := w.Write(entry)
	//    		if err != nil {
	//    			if err == io.ErrClosedPipe {
	//    				//    Reader does not need more data
	//    				return nil
	//    			}
	//    			return errors.Wrap(err, "Error writing to StateWriteCloser in [ProcessorName]")
	//    		}
	//
	//    		// Return errors if needed...
	//
	//    		// Exit when pipeline terminated due to an error in another processor...
	//    		select {
	//    		case <-ctx.Done():
	//    			return nil
	//    		default:
	//    			continue
	//    		}
	//    	}
	//
	//    	// Some post code...
	//
	//    	return nil
	//    }
	ProcessState(context.Context, *Store, io.StateReadCloser, io.StateWriteCloser) error
	// IsConcurrent defines if processing pipeline should start a single instance
	// of the processor or multiple instances. Multiple instances will read
	// from the same StateReader and write to the same StateWriter.
	// Example: the processor can insert entries to a DB in a single job but it
	// probably will be faster with multiple DB writers (especially when you want
	// to do some data conversions before inserting).
	IsConcurrent() bool
	// RequiresInput defines if processor requires input data (StateReader). If not,
	// it will receive empty reader, it's parent process will write to "void" and
	// writes to `writer` will go to "void".
	// This is useful for processors resposible for saving aggregated data that don't
	// need state objects.
	// TODO!
	RequiresInput() bool
	// Returns processor name. Helpful for errors, debuging and reports.
	Name() string
}

StateProcessor defines methods required by state processing pipeline.

type Store

type Store struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Store allows storing data connected to pipeline execution. It exposes `Lock()` and `Unlock()` methods that must be called when accessing the `Store` for both `Put` and `Get` calls.

Example (incrementing a number): s.Lock() v := s.Get("value") s.Put("value", v.(int)+1) s.Unlock()

func (*Store) Get

func (s *Store) Get(name string) interface{}

func (*Store) Put

func (s *Store) Put(name string, value interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL