Documentation ¶
Index ¶
- func NewStage(name StageName, runner StageRunner, stageType StageType) *stage
- type Logger
- type Options
- type Payload
- type PayloadFactory
- type Pipeline
- func (p *Pipeline) AddStageAfter(existingStageName StageName, name StageName, stageRunner StageRunner)
- func (p *Pipeline) AddStageBefore(existingStageName StageName, name StageName, stageRunner StageRunner)
- func (p *Pipeline) SetAggregatorStage(stageRunner StageRunner)
- func (p *Pipeline) SetCleanupStage(stageRunner StageRunner)
- func (p *Pipeline) SetFetcherStage(stageRunner StageRunner)
- func (p *Pipeline) SetOptions(o *Options)
- func (p *Pipeline) SetParserStage(stageRunner StageRunner)
- func (p *Pipeline) SetSequencerStage(stageRunner StageRunner)
- func (p *Pipeline) SetSetupStage(stageRunner StageRunner)
- func (p *Pipeline) SetSyncerStage(stageRunner StageRunner)
- func (p *Pipeline) SetValidatorStage(stageRunner StageRunner)
- func (p *Pipeline) Start(ctx context.Context, source Source, sink Sink) error
- type Sink
- type Source
- type Stage
- type StageName
- type StageRunner
- type StageRunnerFunc
- type StageType
- type Stat
- type StatsRecorder
- type Task
- type TaskFunc
- type TaskValidator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewStage ¶
func NewStage(name StageName, runner StageRunner, stageType StageType) *stage
Types ¶
type Logger ¶
type Logger interface { Info(interface{}, ...interface{}) Debug(interface{}, ...interface{}) DebugJSON(interface{}, ...interface{}) Warn(interface{}, ...interface{}) Err(error, ...interface{}) }
Logger is implemented by types wanting to hook up for logging
type Options ¶
type Options struct { // StagesWhitelist holds name of stages that will be executed StagesWhitelist []StageName // StagesBlacklist holds name of stages that will NOT be executed StagesBlacklist []StageName // IndexingTasksWhitelist holds name of indexing tasks which will be executed IndexingTasksWhitelist []string // IndexingTasksBlacklist holds name of indexing tasks which will NOT be executed IndexingTasksBlacklist []string }
type Payload ¶
type Payload interface { // Set current height to be processed SetCurrentHeight(int64) // MarkAsProcessed is invoked by the pipeline when the payload // reaches the end of execution for current height MarkAsProcessed() }
Payload is implemented by values that can be sent through a pipeline.
type PayloadFactory ¶
type PayloadFactory interface { // Gets new payload GetPayload() Payload }
PayloadFactory is implemented by objects which know how to create payload for every height
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline implements a modular, multi-stage pipeline
func New ¶
func New(payloadFactor PayloadFactory) *Pipeline
func (*Pipeline) AddStageAfter ¶
func (p *Pipeline) AddStageAfter(existingStageName StageName, name StageName, stageRunner StageRunner)
AddStageBefore adds custom stage after existing stage
func (*Pipeline) AddStageBefore ¶
func (p *Pipeline) AddStageBefore(existingStageName StageName, name StageName, stageRunner StageRunner)
AddStageBefore adds custom stage before existing stage
func (*Pipeline) SetAggregatorStage ¶
func (p *Pipeline) SetAggregatorStage(stageRunner StageRunner)
SetAggregatorStage add aggregator stage to list of available stages
func (*Pipeline) SetCleanupStage ¶
func (p *Pipeline) SetCleanupStage(stageRunner StageRunner)
SetCleanupStage add cleanup stage to list of available stages
func (*Pipeline) SetFetcherStage ¶
func (p *Pipeline) SetFetcherStage(stageRunner StageRunner)
SetFetcherStage add fetcher stage to list of available stages
func (*Pipeline) SetOptions ¶
SetOptions sets pipeline options
func (*Pipeline) SetParserStage ¶
func (p *Pipeline) SetParserStage(stageRunner StageRunner)
SetParserStage add parser stage to list of available stages
func (*Pipeline) SetSequencerStage ¶
func (p *Pipeline) SetSequencerStage(stageRunner StageRunner)
SetSequencerStage add sequencer stage to list of available stages
func (*Pipeline) SetSetupStage ¶
func (p *Pipeline) SetSetupStage(stageRunner StageRunner)
SetSetupStage add setup stage to list of available stages
func (*Pipeline) SetSyncerStage ¶
func (p *Pipeline) SetSyncerStage(stageRunner StageRunner)
SetSyncerStage add syncer stage to list of available stages
func (*Pipeline) SetValidatorStage ¶
func (p *Pipeline) SetValidatorStage(stageRunner StageRunner)
SetValidatorStage add validator stage to list of available stages
type Source ¶
type Source interface { // Next gets next height Next(context.Context, Payload) bool // Current returns current height Current() int64 // Err return error if any Err() error }
Source is executed before processing of individual heights. It is responsible for getting start and end height.
type StageName ¶
type StageName string
const ( // Context key used by StatsRecorder CtxStats = "stats" // Setup stage (Chore): performs setup tasks StageSetup StageName = "stage_setup" // Fetcher stage (Syncing): fetches data for indexing StageFetcher StageName = "stage_fetcher" // Parser stage (Syncing): parses and normalizes fetched data to a single structure StageParser StageName = "stage_parser" // Validator stage (Syncing): validates parsed data StageValidator StageName = "stage_validator" // Syncer stage (Syncing): saves data to datastore StageSyncer StageName = "stage_syncer" // Sequencer stage (Indexing): Creates sequences from synced data (syncable) StageSequencer StageName = "stage_sequencer" // Aggregator stage (Indexing): Creates aggregates from synced data (syncable) StageAggregator StageName = "stage_aggregator" // Cleanup stage (Chore): Cleans up after execution StageCleanup StageName = "stage_cleanup" )
type StageRunner ¶
type StageRunner interface { // Run StageRunner Run(context.Context, Payload, TaskValidator) error }
StageRunner is implemented by types that know how to run tasks
func RetryingStageRunner ¶
func RetryingStageRunner(sr StageRunner, isTransient func(error) bool, maxRetries int) StageRunner
RetryingStageRunner implement retry mechanism for StageRunner
type StageRunnerFunc ¶
type StageRunnerFunc func(context.Context, Payload, TaskValidator) error
StageRunnerFunc is an adapter to allow the use of plain functions as StageRunner
func (StageRunnerFunc) Run ¶
func (srf StageRunnerFunc) Run(ctx context.Context, p Payload, f TaskValidator) error
Run calls f(ctx, p, f).
type Stat ¶
func (*Stat) SetCompleted ¶
SetComplete completes stat duration
type StatsRecorder ¶
type StatsRecorder struct {
Stat
}
func NewStatsRecorder ¶
func NewStatsRecorder() *StatsRecorder
StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task
type TaskValidator ¶
TaskValidator is a type for validating task by provided task name