pipeline

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2021 License: Apache-2.0 Imports: 8 Imported by: 11

README

Pipeline

The pipeline package helps to build indexers using simple to use DSL. Its goal is to provide a logical structure to the process of indexing.

Default pipeline

Every default indexing pipeline has fixed stages available to hook in to:

  • Setup stage: performs setup tasks
  • Syncer stage: creates syncable
  • Fetcher stage: fetches data for indexing
  • Parser stage: parses and normalizes fetched data to a single structure
  • Validator stage: validates parsed data
  • Sequencer stage: Creates sequences from fetched or/and parsed data
  • Aggregator stage: Creates aggregates from fetched or/and parsed data
  • Persistor stage: Saves data to data store
  • Cleanup stage: Cleans up after execution

Besides that there are 2 additional components: Source and Sink. Source is responsible for providing height iterator for the pipeline and Sink is gathering output data which can be used after the pipeline is done processing.

Below flow-chart depicts all the available stages that come with this package and order in which they are executed.

Pipeline flow

Please note that all syncing phase stages are executed in sequence, whereas indexing phase stages are executed concurrently in order to speed up the indexing process.

Creating new pipeline

To create a new default pipeline use:

NewDefault([payloadFactory])
Setting up tasks in stages

In order to set tasks for a specific stage you can use:

p.SetTasks(
  [Name of the stage],
  NewTask(),
  NewTask(),
)

To set tasks that will run at the same time, use:

p.SetAsyncTasks(
  [Name of the stage],
  NewTask(),
  NewTask(),
)

If you want to use your own method of running task inside of a stage, you can easily create your own implementation of a StageRunnerFunc and pass it to SetCustomStage.

p.SetCustomStage(
  [Name of the stage],
  [custom Stagerunnerfunc]
)
Starting pipeline

Once stages are setup, we can run our pipeline

options := &pipeline.Options{}
if err := p.Start(ctx, NewSource(), NewSink(), options); err != nil {
    return err
}

This will execute all the tasks for every iteration of all the items in the source created with NewSource() If you want to run one-off iteration of pipeline for specific height you can use Run()

height := 100
payload, err := p.Run(ctx, height, options)

It will return a payload collected for that one iteration of the source.

Adding custom stages

If you want to perform some action on but provided stages are not good logic fit for it, you can always add custom stages BEFORE or AFTER existing ones. In order to do that you can use:

  • AddStageBefore - adds stage before provided existing stage
  • AddStageAfter - adds stage after provided existing stage Below is an example showing how you can add custom stage (as a func) after Fetcher stage
const (
    CustomStageName = "AfterFetcher"
)

afterFetcherFunc := pipeline.StageRunnerFunc(func(ctx context.Context, p pipeline.Payload, f pipeline.TaskValidator) error {
    //...
    return nil
})

p.AddStageAfter(
  pipeline.StageFetcher,
  pipeline.NewCustomStage(CustomStageName, afterFetcherFunc)
)
Retrying

The indexing pipeline provides 2 types of retrying mechanisms:

  • RetryStage - which is responsible for retrying the entire stage if error occurred
  • RetryingTask - which is responsible for retrying individual tasks if it return error

In order to implement retrying mechanism you need to wrap stage or task with above functions. Here is an example of use of RetryingTask:

p.SetTasks(
 pipeline.StageFetcher,
 pipeline.RetryingTask(NewFetcherTask(), func(err error) bool {
     // Make error always transient for simplicity
     return true
 }, 3),
)
Selective execution

Indexing pipeline provides you with options to run stages and individual tasks selectively. You have 2 options you can use for this purpose:

  • StagesBlacklist - list of stages to NOT execute
  • TasksWhitelist - list of indexing tasks to execute

In order to use above options you have to use setOptions method of pipeline like so:

p.SetOptions(&pipeline.Options{
    TasksWhitelist: []string{"SequencerTask"},
})

Above example would run only SequencerTask during indexing process. It is useful if you want to reindex the data but you only care about specific set of data.

Custom pipeline

If the default pipeline stages and run order does not suit your needs, then you can create an empty pipeline where you must add each stage individually.

p := pipeline.NewCustom(payloadFactory)

This creates a pipeline with no set stages. To add a stage, run:

p.AddStage(
  pipeline.NewStage([Name of the stage],  NewTask())
)

To add a stage where tasks will run concurrently, run:

p.AddStage(
  pipeline.NewAsyncStage([Name of the stage],  NewTask1(), NewTask2())
)

The order in which the stages will run is determined by the order in which they are added.

If you want to run stages concurrently, add them together using AddConcurrentStages

p.AddConcurrentStages(
  pipeline.NewStage(pipeline.StageAggregator, NewTask())
  pipeline.NewStage(pipeline.StageSequencer, NewTask())
)

Built-in metrics

The indexing pipeline comes with a set of built-in metrics:

Name Description
indexer_pipeline_task_duration The total time spent processing an indexing task
indexer_pipeline_stage_duration The total time spent processing an indexing stage
indexer_pipeline_height_duration The total time spent indexing a height
indexer_pipeline_heights_total The total number of successfully indexed heights
indexer_pipeline_errors_total The total number of indexing errors

For more information about metrics, see the documentation of the metrics package.

Examples

In example folder you can find an example of a pipeline. To run it use:

go run example/default/main.go

go run example/custom/main.go

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidLatestHeight is returned when the latest height is a negative number
	ErrInvalidLatestHeight = errors.New("lastest height is invalid")

	// ErrInvalidInitialHeight is returned when the initial height is a negative number
	ErrInvalidInitialHeight = errors.New("initial height is invalid")

	// ErrInvalidBatchSize is returned when the batch size is a negative number
	ErrInvalidBatchSize = errors.New("batch size is invalid")

	// ErrNothingToProcess is returned when there are no heights to process
	ErrNothingToProcess = errors.New("nothing to process")
)
View Source
var (
	ErrMissingStages = errors.New("provide stages to run concurrently")
	ErrMissingStage  = errors.New("no stage to run")
)

Functions

func NewAsyncStageWithTasks added in v0.1.8

func NewAsyncStageWithTasks(name StageName, tasks ...Task) *stage

NewAsyncStageWithTasks creates a stage with tasks that will run concurrently

func NewCustomStage added in v0.1.7

func NewCustomStage(name StageName, runner stageRunner) *stage

NewCustomStage creates a stage with custom stagerunner

func NewStageWithTasks added in v0.1.8

func NewStageWithTasks(name StageName, tasks ...Task) *stage

NewStageWithTasks creates a stage with tasks that will run one by one

Types

type CustomPipeline added in v0.1.7

type CustomPipeline interface {
	Pipeline

	AddStage(stage *stage)
	AddConcurrentStages(stages ...*stage)
}

CustomPipeline is implemented by types that want to create a pipeline by adding their own stages

func NewCustom added in v0.1.7

func NewCustom(payloadFactor PayloadFactory) CustomPipeline

NewCustom creates a new pipeline that satisfies CustomPipeline

type DefaultPipeline added in v0.1.7

type DefaultPipeline interface {
	Pipeline

	SetTasks(stageName StageName, tasks ...Task)
	SetAsyncTasks(stageName StageName, tasks ...Task)
	SetCustomStage(stageName StageName, stageRunnerFunc stageRunner)
}

DefaultPipeline is implemented by types that only want to configure existing stages in a pipeline

func NewDefault added in v0.1.7

func NewDefault(payloadFactor PayloadFactory) DefaultPipeline

NewDefault creates a new DefaultPipeline with all default stages set in default run order

type HeightRange added in v0.4.0

type HeightRange struct {
	// The most recent height
	LatestHeight int64

	// The last processed height
	LastHeight int64

	// The starting height when no height has been processed yet
	InitialHeight int64

	// The number of heights processed in one run
	BatchSize int64
}

HeightRange represents a range of heights to be processed

func (*HeightRange) EndHeight added in v0.4.0

func (hr *HeightRange) EndHeight() int64

EndHeight calculates the last height to process

func (*HeightRange) Length added in v0.4.0

func (hr *HeightRange) Length() int64

Length calculates the number of heights to process

func (*HeightRange) StartHeight added in v0.4.0

func (hr *HeightRange) StartHeight() int64

StartHeight calculates the first height to process

func (*HeightRange) Validate added in v0.4.0

func (hr *HeightRange) Validate(checkLength bool) error

Validate checks if the height range is valid

type Logger

type Logger interface {
	// Info logs info message
	Info(string)

	// Debug logs debug message
	Debug(string)
}

Logger is implemented by types that want to hook up to logging mechanism in engine

type Options

type Options struct {
	// StagesBlacklist holds list of stages to turn off
	StagesBlacklist []StageName

	// TaskWhitelist holds name of indexing tasks which will be executed
	TaskWhitelist []TaskName
}

type Payload

type Payload interface {
	// MarkAsProcessed is invoked by the pipeline when the payloadMock
	// reaches the end of execution for current height
	MarkAsProcessed()
}

Payload is implemented by values that can be sent through a pipeline.

type PayloadFactory

type PayloadFactory interface {
	// Gets new payloadMock
	GetPayload(int64) Payload
}

PayloadFactory is implemented by objects which know how to create payloadMock for every height

type Pipeline

type Pipeline interface {
	SetLogger(l Logger)
	AddStageBefore(existingStageName StageName, stage *stage)
	AddStageAfter(existingStageName StageName, stage *stage)
	RetryStage(existingStageName StageName, isTransient func(error) bool, maxRetries int)
	Start(ctx context.Context, source Source, sink Sink, options *Options) error
	Run(ctx context.Context, height int64, options *Options) (Payload, error)
}

type Sink

type Sink interface {
	// Consume consumes payloadMock
	Consume(context.Context, Payload) error
}

Sink is executed as a last stage in the pipeline

type Source

type Source interface {
	// Next gets next height
	Next(context.Context, Payload) bool

	// Current returns current height
	Current() int64

	// Err return error if any
	Err() error

	// Skip return bool to skip stage
	Skip(StageName) bool
}

Source is executed before processing of individual heights. It is responsible for getting start and end height.

func NewSource added in v0.1.14

func NewSource() Source

type Stage

type Stage interface {
	// Run Stage
	Run(context.Context, Payload, *Options) error
}

Stage is implemented by types which invoke StageRunner

type StageName

type StageName string
const (
	// Context key used by StatsRecorder
	CtxStats = "stats"

	// Setup stage (Chore): performs setup tasks
	StageSetup StageName = "stage_setup"

	// Fetcher stage (Syncing): fetches data for indexing
	StageFetcher StageName = "stage_fetcher"

	// Parser stage (Syncing): parses and normalizes fetched data to a single structure
	StageParser StageName = "stage_parser"

	// Validator stage (Syncing): validates parsed data
	StageValidator StageName = "stage_validator"

	// Syncer stage (Syncing): saves data to datastore
	StageSyncer StageName = "stage_syncer"

	// Sequencer stage (Indexing): Creates sequences from synced data (syncable)
	StageSequencer StageName = "stage_sequencer"

	// Aggregator stage (Indexing): Creates aggregates from synced data (syncable)
	StageAggregator StageName = "stage_aggregator"

	// StagePersistor stage (Indexing): Persists data to datastore
	StagePersistor StageName = "stage_persistor"

	// Cleanup stage (Chore): Cleans up after execution
	StageCleanup StageName = "stage_cleanup"
)

type StageRunnerFunc

type StageRunnerFunc func(context.Context, Payload, TaskValidator) error

StageRunnerFunc is an adapter to allow the use of plain functions as stageRunner

func (StageRunnerFunc) Run

Run calls f(ctx, p, f).

type Stat

type Stat struct {
	StartTime time.Time
	EndTime   time.Time
	Duration  time.Duration
	Success   bool
}

func NewStat

func NewStat() *Stat

func (*Stat) SetCompleted

func (s *Stat) SetCompleted(success bool)

SetComplete completes stat duration

type StatsRecorder

type StatsRecorder struct {
	Stat
}

func NewStatsRecorder

func NewStatsRecorder() *StatsRecorder

StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task

type Task

type Task interface {
	//Run Task
	Run(context.Context, Payload) error

	// GetName gets name of task
	GetName() string
}

Task is implemented by types that want to be executed inside of a stage

func RetryingTask

func RetryingTask(st Task, isTransient func(error) bool, maxRetries int) Task

RetryingTask implements retry mechanism for Task

type TaskName added in v0.1.5

type TaskName string

type TaskValidator

type TaskValidator func(string) bool

TaskValidator is a type for validating task by provided task name

Directories

Path Synopsis
example
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL