pipeline

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2020 License: Apache-2.0 Imports: 12 Imported by: 11

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrVersionsDirNotSet = errors.New("versions directory not set")
)

Functions

func NewStage

func NewStage(name StageName, runner StageRunner, stageType StageType) *stage

func NewVersionReader added in v0.1.4

func NewVersionReader(dir string) *versionReader

NewVersionReader VersionReader constructor

Types

type Mode added in v0.1.4

type Mode byte
const (
	ModeAll Mode = iota
	ModeUp
	ModeVersion
)

type Options

type Options struct {
	// TaskWhitelist holds name of indexing tasks which will be executed
	TaskWhitelist VersionTasks
}

type Payload

type Payload interface {
	// MarkAsProcessed is invoked by the pipeline when the payload
	// reaches the end of execution for current height
	MarkAsProcessed()
}

Payload is implemented by values that can be sent through a pipeline.

type PayloadFactory

type PayloadFactory interface {
	// Gets new payload
	GetPayload(int64) Payload
}

PayloadFactory is implemented by objects which know how to create payload for every height

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline implements a modular, multi-stage pipeline

func New

func New(payloadFactor PayloadFactory) *Pipeline

func (*Pipeline) AddStageAfter

func (p *Pipeline) AddStageAfter(existingStageName StageName, name StageName, stageRunner StageRunner)

AddStageBefore adds custom stage after existing stage

func (*Pipeline) AddStageBefore

func (p *Pipeline) AddStageBefore(existingStageName StageName, name StageName, stageRunner StageRunner)

AddStageBefore adds custom stage before existing stage

func (*Pipeline) SetAggregatorStage

func (p *Pipeline) SetAggregatorStage(stageRunner StageRunner)

SetAggregatorStage add aggregator stage to list of available stages

func (*Pipeline) SetCleanupStage

func (p *Pipeline) SetCleanupStage(stageRunner StageRunner)

SetCleanupStage add cleanup stage to list of available stages

func (*Pipeline) SetFetcherStage

func (p *Pipeline) SetFetcherStage(stageRunner StageRunner)

SetFetcherStage add fetcher stage to list of available stages

func (*Pipeline) SetOptions

func (p *Pipeline) SetOptions(o *Options)

SetOptions sets pipeline options

func (*Pipeline) SetParserStage

func (p *Pipeline) SetParserStage(stageRunner StageRunner)

SetParserStage add parser stage to list of available stages

func (*Pipeline) SetSequencerStage

func (p *Pipeline) SetSequencerStage(stageRunner StageRunner)

SetSequencerStage add sequencer stage to list of available stages

func (*Pipeline) SetSetupStage

func (p *Pipeline) SetSetupStage(stageRunner StageRunner)

SetSetupStage add setup stage to list of available stages

func (*Pipeline) SetSyncerStage

func (p *Pipeline) SetSyncerStage(stageRunner StageRunner)

SetSyncerStage add syncer stage to list of available stages

func (*Pipeline) SetValidatorStage

func (p *Pipeline) SetValidatorStage(stageRunner StageRunner)

SetValidatorStage add validator stage to list of available stages

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context, source Source, sink Sink, options *Options) error

Start starts the pipeline

type Sink

type Sink interface {
	// Consume consumes payload
	Consume(context.Context, Payload) error
}

Sink is executed as a last stage in the pipeline

type Source

type Source interface {
	// Next gets next height
	Next(context.Context, Payload) bool

	// Current returns current height
	Current() int64

	// Err return error if any
	Err() error
}

Source is executed before processing of individual heights. It is responsible for getting start and end height.

type Stage

type Stage interface {
	// Run Stage
	Run(context.Context, Payload, *Options) error
}

Stage is implemented by types which invoke StageRunner

type StageName

type StageName string
const (
	// Context key used by StatsRecorder
	CtxStats = "stats"

	// Setup stage (Chore): performs setup tasks
	StageSetup StageName = "stage_setup"

	// Fetcher stage (Syncing): fetches data for indexing
	StageFetcher StageName = "stage_fetcher"

	// Parser stage (Syncing): parses and normalizes fetched data to a single structure
	StageParser StageName = "stage_parser"

	// Validator stage (Syncing): validates parsed data
	StageValidator StageName = "stage_validator"

	// Syncer stage (Syncing): saves data to datastore
	StageSyncer StageName = "stage_syncer"

	// Sequencer stage (Indexing): Creates sequences from synced data (syncable)
	StageSequencer StageName = "stage_sequencer"

	// Aggregator stage (Indexing): Creates aggregates from synced data (syncable)
	StageAggregator StageName = "stage_aggregator"

	// Cleanup stage (Chore): Cleans up after execution
	StageCleanup StageName = "stage_cleanup"
)

type StageRunner

type StageRunner interface {
	// Run StageRunner
	Run(context.Context, Payload, TaskValidator) error
}

StageRunner is implemented by types that know how to run tasks

func AsyncRunner

func AsyncRunner(tasks ...Task) StageRunner

AsyncRunner runs tasks concurrently

func RetryingStageRunner

func RetryingStageRunner(sr StageRunner, isTransient func(error) bool, maxRetries int) StageRunner

RetryingStageRunner implement retry mechanism for StageRunner

func SyncRunner

func SyncRunner(tasks ...Task) StageRunner

SyncRunner runs tasks one by one

type StageRunnerFunc

type StageRunnerFunc func(context.Context, Payload, TaskValidator) error

StageRunnerFunc is an adapter to allow the use of plain functions as StageRunner

func (StageRunnerFunc) Run

Run calls f(ctx, p, f).

type StageType

type StageType int64
const (
	StageTypeChore StageType = iota
	StageTypeSyncing
	StageTypeIndexing

	// StageTypeCustom it is a stage added dynamically by the user
	StageTypeCustom
)

type Stat

type Stat struct {
	StartTime time.Time
	EndTime   time.Time
	Duration  time.Duration
	Success   bool
}

func NewStat

func NewStat() *Stat

func (*Stat) SetCompleted

func (s *Stat) SetCompleted(success bool)

SetComplete completes stat duration

type StatsRecorder

type StatsRecorder struct {
	Stat
}

func NewStatsRecorder

func NewStatsRecorder() *StatsRecorder

StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task

type Task

type Task interface {
	//Run Task
	Run(context.Context, Payload) error

	// GetName gets name of task
	GetName() string
}

Task is implemented by types that want to be executed inside of a stage

func RetryingTask

func RetryingTask(st Task, isTransient func(error) bool, maxRetries int) Task

RetryingTask implement retry mechanism for Task

type TaskValidator

type TaskValidator func(string) bool

TaskValidator is a type for validating task by provided task name

type VersionTasks added in v0.1.4

type VersionTasks []string

VersionTasks slice of task names

Directories

Path Synopsis
Package mock_pipeline is a generated GoMock package.
Package mock_pipeline is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL