Documentation ¶
Index ¶
- Variables
- func NewAsyncStageWithTasks(name StageName, tasks ...Task) *stage
- func NewCustomStage(name StageName, runner stageRunner) *stage
- func NewStageWithTasks(name StageName, tasks ...Task) *stage
- type CustomPipeline
- type DefaultPipeline
- type Logger
- type Options
- type Payload
- type PayloadFactory
- type Pipeline
- type Sink
- type Source
- type Stage
- type StageName
- type StageRunnerFunc
- type Stat
- type StatsRecorder
- type Task
- type TaskName
- type TaskValidator
Constants ¶
This section is empty.
Variables ¶
var ( ErrMissingStages = errors.New("provide stages to run concurrently") ErrMissingStage = errors.New("no stage to run") )
Functions ¶
func NewAsyncStageWithTasks ¶ added in v0.1.8
NewAsyncStageWithTasks creates a stage with tasks that will run concurrently
func NewCustomStage ¶ added in v0.1.7
func NewCustomStage(name StageName, runner stageRunner) *stage
NewCustomStage creates a stage with custom stagerunner
func NewStageWithTasks ¶ added in v0.1.8
NewStageWithTasks creates a stage with tasks that will run one by one
Types ¶
type CustomPipeline ¶ added in v0.1.7
type CustomPipeline interface { Pipeline AddStage(stage *stage) AddConcurrentStages(stages ...*stage) }
CustomPipeline is implemented by types that want to create a pipeline by adding their own stages
func NewCustom ¶ added in v0.1.7
func NewCustom(payloadFactor PayloadFactory) CustomPipeline
NewCustom creates a new pipeline that satisfies CustomPipeline
type DefaultPipeline ¶ added in v0.1.7
type DefaultPipeline interface { Pipeline SetTasks(stageName StageName, tasks ...Task) SetAsyncTasks(stageName StageName, tasks ...Task) SetCustomStage(stageName StageName, stageRunnerFunc stageRunner) }
DefaultPipeline is implemented by types that only want to configure existing stages in a pipeline
func NewDefault ¶ added in v0.1.7
func NewDefault(payloadFactor PayloadFactory) DefaultPipeline
NewDefault creates a new DefaultPipeline with all default stages set in default run order
type Logger ¶
type Logger interface { // Info logs info message Info(string) // Debug logs debug message Debug(string) }
Logger is implemented by types that want to hook up to logging mechanism in engine
type Payload ¶
type Payload interface { // MarkAsProcessed is invoked by the pipeline when the payloadMock // reaches the end of execution for current height MarkAsProcessed() }
Payload is implemented by values that can be sent through a pipeline.
type PayloadFactory ¶
PayloadFactory is implemented by objects which know how to create payloadMock for every height
type Pipeline ¶
type Pipeline interface { SetLogger(l Logger) AddStageBefore(existingStageName StageName, stage *stage) AddStageAfter(existingStageName StageName, stage *stage) RetryStage(existingStageName StageName, isTransient func(error) bool, maxRetries int) Start(ctx context.Context, source Source, sink Sink, options *Options) error Run(ctx context.Context, height int64, options *Options) (Payload, error) }
type Source ¶
type Source interface { // Next gets next height Next(context.Context, Payload) bool // Current returns current height Current() int64 // Err return error if any Err() error // Skip return bool to skip stage Skip(StageName) bool }
Source is executed before processing of individual heights. It is responsible for getting start and end height.
type StageName ¶
type StageName string
const ( // Context key used by StatsRecorder CtxStats = "stats" // Setup stage (Chore): performs setup tasks StageSetup StageName = "stage_setup" // Fetcher stage (Syncing): fetches data for indexing StageFetcher StageName = "stage_fetcher" // Parser stage (Syncing): parses and normalizes fetched data to a single structure StageParser StageName = "stage_parser" // Validator stage (Syncing): validates parsed data StageValidator StageName = "stage_validator" // Syncer stage (Syncing): saves data to datastore StageSyncer StageName = "stage_syncer" // Sequencer stage (Indexing): Creates sequences from synced data (syncable) StageSequencer StageName = "stage_sequencer" // Aggregator stage (Indexing): Creates aggregates from synced data (syncable) StageAggregator StageName = "stage_aggregator" // StagePersistor stage (Indexing): Persists data to datastore StagePersistor StageName = "stage_persistor" // Cleanup stage (Chore): Cleans up after execution StageCleanup StageName = "stage_cleanup" )
type StageRunnerFunc ¶
type StageRunnerFunc func(context.Context, Payload, TaskValidator) error
StageRunnerFunc is an adapter to allow the use of plain functions as stageRunner
func (StageRunnerFunc) Run ¶
func (srf StageRunnerFunc) Run(ctx context.Context, p Payload, f TaskValidator) error
Run calls f(ctx, p, f).
type Stat ¶
func (*Stat) SetCompleted ¶
SetComplete completes stat duration
type StatsRecorder ¶
type StatsRecorder struct {
Stat
}
func NewStatsRecorder ¶
func NewStatsRecorder() *StatsRecorder
StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task
type Task ¶
type Task interface { //Run Task Run(context.Context, Payload) error // GetName gets name of task GetName() string }
Task is implemented by types that want to be executed inside of a stage
type TaskValidator ¶
TaskValidator is a type for validating task by provided task name