Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func (*Pipeline) AddStateProcessorTree ¶
func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode)
func (*Pipeline) Node ¶
func (p *Pipeline) Node(processor StateProcessor) *PipelineNode
func (*Pipeline) PrintStatus ¶
func (p *Pipeline) PrintStatus()
func (*Pipeline) ProcessState ¶
func (p *Pipeline) ProcessState(readCloser io.StateReadCloser) <-chan error
type PipelineNode ¶
type PipelineNode struct { Processor StateProcessor Children []*PipelineNode // contains filtered or unexported fields }
func (*PipelineNode) Pipe ¶
func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode
type ReduceStateProcessor ¶
type ReduceStateProcessor struct { }
ReduceStateProcessor forwards the final produced by applying all the ledger entries to the writer. Let's say that there are 3 ledger entries:
- Create account A (XLM balance = 20)
- Update XLM balance of A to 5
- Update XLM balance of A to 15
Running ReduceStateProcessor will add a single ledger entry:
- Create account A (XLM balance = 15)
to the writer.
type StateProcessor ¶
type StateProcessor interface { // ProcessState is a main method of `StateProcessor`. It receives `io.StateReadCloser` // that contains object passed down the pipeline from the previous procesor. Writes to // `io.StateWriteCloser` will be passed to the next processor. WARNING! `ProcessState` // should **always** call `Close()` on `io.StateWriteCloser` when no more object will be // written and `Close()` on `io.StateReadCloser` when reading is finished. // Data required by following processors (like aggregated data) should be saved in // `Store`. Read `Store` godoc to understand how to use it. // The first argument `ctx` is a context with cancel. Processor should monitor // `ctx.Done()` channel and exit when it returns a value. This can happen when // pipeline execution is interrupted, ex. due to an error. // // Given all information above `ProcessState` should always look like this: // // func (p *Processor) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error { // defer r.Close() // defer w.Close() // // // Some pre code... // // for { // entry, err := r.Read() // if err != nil { // if err == io.EOF { // break // } else { // return errors.Wrap(err, "Error reading from StateReadCloser in [ProcessorName]") // } // } // // // Process entry... // // // Write to StateWriteCloser if needed but exit if pipe is closed: // err := w.Write(entry) // if err != nil { // if err == io.ErrClosedPipe { // // Reader does not need more data // return nil // } // return errors.Wrap(err, "Error writing to StateWriteCloser in [ProcessorName]") // } // // // Return errors if needed... // // // Exit when pipeline terminated due to an error in another processor... // select { // case <-ctx.Done(): // return nil // default: // continue // } // } // // // Some post code... // // return nil // } ProcessState(context.Context, *Store, io.StateReadCloser, io.StateWriteCloser) error // IsConcurrent defines if processing pipeline should start a single instance // of the processor or multiple instances. Multiple instances will read // from the same StateReader and write to the same StateWriter. // Example: the processor can insert entries to a DB in a single job but it // probably will be faster with multiple DB writers (especially when you want // to do some data conversions before inserting). IsConcurrent() bool // RequiresInput defines if processor requires input data (StateReader). If not, // it will receive empty reader, it's parent process will write to "void" and // writes to `writer` will go to "void". // This is useful for processors resposible for saving aggregated data that don't // need state objects. // TODO! RequiresInput() bool // Returns processor name. Helpful for errors, debuging and reports. Name() string }
StateProcessor defines methods required by state processing pipeline.
Click to show internal directories.
Click to hide internal directories.