pipeline

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2020 License: Apache-2.0 Imports: 7 Imported by: 11

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingStages = errors.New("provide stages to run concurrently")
)

Functions

func NewStage

func NewStage(name StageName, runner StageRunner) *stage

Types

type Logger

type Logger interface {
	// Info logs info message
	Info(string)

	// Debug logs debug message
	Debug(string)
}

Logger is implemented by types that want to hook up to logging mechanism in engine

type Options

type Options struct {
	// StagesBlacklist holds list of stages to turn off
	StagesBlacklist []StageName

	// TaskWhitelist holds name of indexing tasks which will be executed
	TaskWhitelist []TaskName
}

type Payload

type Payload interface {
	// MarkAsProcessed is invoked by the pipeline when the payloadMock
	// reaches the end of execution for current height
	MarkAsProcessed()
}

Payload is implemented by values that can be sent through a pipeline.

type PayloadFactory

type PayloadFactory interface {
	// Gets new payloadMock
	GetPayload(int64) Payload
}

PayloadFactory is implemented by objects which know how to create payloadMock for every height

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline implements a modular, multi-stage pipeline

func New

func New(payloadFactor PayloadFactory) *Pipeline

func (*Pipeline) AddStageAfter

func (p *Pipeline) AddStageAfter(existingStageName StageName, name StageName, stageRunner StageRunner)

AddStageBefore adds custom stage after existing stage

func (*Pipeline) AddStageBefore

func (p *Pipeline) AddStageBefore(existingStageName StageName, name StageName, stageRunner StageRunner)

AddStageBefore adds custom stage before existing stage

func (*Pipeline) Run added in v0.1.5

func (p *Pipeline) Run(ctx context.Context, height int64, options *Options) (Payload, error)

Run run one-off pipeline iteration for given height

func (*Pipeline) SetLogger added in v0.1.5

func (p *Pipeline) SetLogger(l Logger)

SetLogger sets logger

func (*Pipeline) SetOptions

func (p *Pipeline) SetOptions(o *Options)

SetOptions sets pipeline options

func (*Pipeline) SetStage added in v0.1.5

func (p *Pipeline) SetStage(stageName StageName, stageRunner StageRunner)

SetStage sets up stage runner for given stage

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context, source Source, sink Sink, options *Options) error

Start starts the pipeline

type Sink

type Sink interface {
	// Consume consumes payloadMock
	Consume(context.Context, Payload) error
}

Sink is executed as a last stage in the pipeline

type Source

type Source interface {
	// Next gets next height
	Next(context.Context, Payload) bool

	// Current returns current height
	Current() int64

	// Err return error if any
	Err() error
}

Source is executed before processing of individual heights. It is responsible for getting start and end height.

type Stage

type Stage interface {
	// Run Stage
	Run(context.Context, Payload, *Options) error
}

Stage is implemented by types which invoke StageRunner

type StageName

type StageName string
const (
	// Context key used by StatsRecorder
	CtxStats = "stats"

	// Setup stage (Chore): performs setup tasks
	StageSetup StageName = "stage_setup"

	// Fetcher stage (Syncing): fetches data for indexing
	StageFetcher StageName = "stage_fetcher"

	// Parser stage (Syncing): parses and normalizes fetched data to a single structure
	StageParser StageName = "stage_parser"

	// Validator stage (Syncing): validates parsed data
	StageValidator StageName = "stage_validator"

	// Syncer stage (Syncing): saves data to datastore
	StageSyncer StageName = "stage_syncer"

	// Sequencer stage (Indexing): Creates sequences from synced data (syncable)
	StageSequencer StageName = "stage_sequencer"

	// Aggregator stage (Indexing): Creates aggregates from synced data (syncable)
	StageAggregator StageName = "stage_aggregator"

	// StagePersistor stage (Indexing): Persists data to datastore
	StagePersistor StageName = "stage_persistor"

	// Cleanup stage (Chore): Cleans up after execution
	StageCleanup StageName = "stage_cleanup"
)

type StageRunner

type StageRunner interface {
	// Run StageRunner
	Run(context.Context, Payload, TaskValidator) error
}

StageRunner is implemented by types that know how to run tasks

func AsyncRunner

func AsyncRunner(tasks ...Task) StageRunner

AsyncRunner runs tasks concurrently

func RetryingStageRunner

func RetryingStageRunner(sr StageRunner, isTransient func(error) bool, maxRetries int) StageRunner

RetryingStageRunner implement retry mechanism for StageRunner

func SyncRunner

func SyncRunner(tasks ...Task) StageRunner

SyncRunner runs tasks one by one

type StageRunnerFunc

type StageRunnerFunc func(context.Context, Payload, TaskValidator) error

StageRunnerFunc is an adapter to allow the use of plain functions as StageRunner

func (StageRunnerFunc) Run

Run calls f(ctx, p, f).

type Stat

type Stat struct {
	StartTime time.Time
	EndTime   time.Time
	Duration  time.Duration
	Success   bool
}

func NewStat

func NewStat() *Stat

func (*Stat) SetCompleted

func (s *Stat) SetCompleted(success bool)

SetComplete completes stat duration

type StatsRecorder

type StatsRecorder struct {
	Stat
}

func NewStatsRecorder

func NewStatsRecorder() *StatsRecorder

StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task

type Task

type Task interface {
	//Run Task
	Run(context.Context, Payload) error

	// GetName gets name of task
	GetName() string
}

Task is implemented by types that want to be executed inside of a stage

func RetryingTask

func RetryingTask(st Task, isTransient func(error) bool, maxRetries int) Task

RetryingTask implement retry mechanism for Task

type TaskName added in v0.1.5

type TaskName string

type TaskValidator

type TaskValidator func(string) bool

TaskValidator is a type for validating task by provided task name

Directories

Path Synopsis
Package mock_pipeline is a generated GoMock package.
Package mock_pipeline is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL