Documentation ¶
Index ¶
- func GetLedgerHeaderFromContext(ctx context.Context) xdr.LedgerHeaderHistoryEntry
- func GetLedgerSequenceFromContext(ctx context.Context) uint32
- func GetLedgerUpgradeChangesFromContext(ctx context.Context) []io.Change
- func LedgerNode(processor LedgerProcessor) *supportPipeline.PipelineNode
- func StateNode(processor StateProcessor) *supportPipeline.PipelineNode
- type ContextKey
- type LedgerPipeline
- type LedgerProcessor
- type StatePipeline
- type StateProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetLedgerHeaderFromContext ¶
func GetLedgerHeaderFromContext(ctx context.Context) xdr.LedgerHeaderHistoryEntry
func LedgerNode ¶
func LedgerNode(processor LedgerProcessor) *supportPipeline.PipelineNode
func StateNode ¶
func StateNode(processor StateProcessor) *supportPipeline.PipelineNode
Types ¶
type ContextKey ¶
type ContextKey string
const ( LedgerSequenceContextKey ContextKey = "ledger_sequence" LedgerHeaderContextKey ContextKey = "ledger_header" LedgerUpgradeChangesContextKey ContextKey = "ledger_upgrade_changes" )
type LedgerPipeline ¶
type LedgerPipeline struct {
supportPipeline.Pipeline
}
func (*LedgerPipeline) Process ¶
func (p *LedgerPipeline) Process(reader io.LedgerReader) <-chan error
type LedgerProcessor ¶
type LedgerProcessor interface { // ProcessLedger is a main method of `LedgerProcessor`. It receives `io.LedgerReader` // that contains object passed down the pipeline from the previous procesor. Writes to // `io.LedgerWriter` will be passed to the next processor. WARNING! `ProcessLedger` // should **always** call `Close()` on `io.LedgerWriter` when no more object will be // written and `Close()` on `io.LedgerReader` when reading is finished. // Data required by following processors (like aggregated data) should be saved in // `Store`. Read `Store` godoc to understand how to use it. // The first argument `ctx` is a context with cancel. Processor should monitor // `ctx.Done()` channel and exit when it returns a value. This can happen when // pipeline execution is interrupted, ex. due to an error. // Please note that processor can filter transactions (by not passing them to // `io.LedgerWriter`) but it cannot filter ledger upgrade changes // (`io.LeaderReader.ReadUpgradeChange`). All upgrade changes will be available // for the next processor to read. // // Given all information above `ProcessLedger` should always look like this: // // func (p *Processor) ProcessLedger(ctx context.Context, store *pipeline.Store, r io.LedgerReader, w io.LedgerWriter) (err error) { // defer func() { // // io.LedgerReader.Close() returns error if upgrade changes have not // // been processed so it's worth checking the error. // closeErr := r.Close() // // Do not overwrite the previous error // if err == nil { // err = closeErr // } // }() // defer w.Close() // // // Some pre code... // // for { // entry, err := r.Read() // if err != nil { // if err == io.EOF { // break // } else { // return errors.Wrap(err, "Error reading from LedgerReader in [ProcessorName]") // } // } // // // Process entry... // // // Write to LedgerWriter if needed but exit if pipe is closed: // err = w.Write(entry) // if err != nil { // if err == io.ErrClosedPipe { // // Reader does not need more data // return nil // } // return errors.Wrap(err, "Error writing to LedgerWriter in [ProcessorName]") // } // // // Return errors if needed... // // // Exit when pipeline terminated due to an error in another processor... // select { // case <-ctx.Done(): // return nil // default: // continue // } // } // // for { // change, err := r.ReadUpgradeChange() // if err != nil { // if err == stdio.EOF { // break // } else { // return err // } // // // Process ledger upgrade change... // } // // // Some post code... // // return nil // } ProcessLedger(context.Context, *supportPipeline.Store, io.LedgerReader, io.LedgerWriter) error // Returns processor name. Helpful for errors, debuging and reports. Name() string // Reset resets internal state of the processor. This is run by the pipeline // everytime the processing is done. It is extremely important to implement // this method, otherwise internal state of the processor will be maintained // between pipeline runs and may result in invalid data. Reset() }
LedgerProcessor defines methods required by ledger processing pipeline.
type StatePipeline ¶
type StatePipeline struct {
supportPipeline.Pipeline
}
func (*StatePipeline) Process ¶
func (p *StatePipeline) Process(reader io.StateReader) <-chan error
type StateProcessor ¶
type StateProcessor interface { // ProcessState is a main method of `StateProcessor`. It receives `io.StateReader` // that contains object passed down the pipeline from the previous procesor. Writes to // `io.StateWriter` will be passed to the next processor. WARNING! `ProcessState` // should **always** call `Close()` on `io.StateWriter` when no more object will be // written and `Close()` on `io.StateReader` when reading is finished. // Data required by following processors (like aggregated data) should be saved in // `Store`. Read `Store` godoc to understand how to use it. // The first argument `ctx` is a context with cancel. Processor should monitor // `ctx.Done()` channel and exit when it returns a value. This can happen when // pipeline execution is interrupted, ex. due to an error. // // Given all information above `ProcessState` should always look like this: // // func (p *Processor) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReader, w io.StateWriter) error { // defer r.Close() // defer w.Close() // // // Some pre code... // // for { // entry, err := r.Read() // if err != nil { // if err == io.EOF { // break // } else { // return errors.Wrap(err, "Error reading from StateReader in [ProcessorName]") // } // } // // // Process entry... // // // Write to StateWriter if needed but exit if pipe is closed: // err = w.Write(entry) // if err != nil { // if err == io.ErrClosedPipe { // // Reader does not need more data // return nil // } // return errors.Wrap(err, "Error writing to StateWriter in [ProcessorName]") // } // // // Return errors if needed... // // // Exit when pipeline terminated due to an error in another processor... // select { // case <-ctx.Done(): // return nil // default: // continue // } // } // // // Some post code... // // return nil // } ProcessState(context.Context, *supportPipeline.Store, io.StateReader, io.StateWriter) error // Returns processor name. Helpful for errors, debuging and reports. Name() string // Reset resets internal state of the processor. This is run by the pipeline // everytime the processing is done. It is extremely important to implement // this method, otherwise internal state of the processor will be maintained // between pipeline runs and may result in invalid data. Reset() }
StateProcessor defines methods required by state processing pipeline.
Click to show internal directories.
Click to hide internal directories.