pipeline

package
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2020 License: Apache-2.0 Imports: 7 Imported by: 11

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingStages = errors.New("provide stages to run concurrently")
	ErrMissingStage  = errors.New("no stage to run")
)

Functions

func NewAsyncStageWithTasks added in v0.1.8

func NewAsyncStageWithTasks(name StageName, tasks ...Task) *stage

NewAsyncStageWithTasks creates a stage with tasks that will run concurrently

func NewCustomStage added in v0.1.7

func NewCustomStage(name StageName, runner stageRunner) *stage

NewCustomStage creates a stage with custom stagerunner

func NewStageWithTasks added in v0.1.8

func NewStageWithTasks(name StageName, tasks ...Task) *stage

NewStageWithTasks creates a stage with tasks that will run one by one

Types

type CustomPipeline added in v0.1.7

type CustomPipeline interface {
	Pipeline

	AddStage(stage *stage)
	AddConcurrentStages(stages ...*stage)
}

CustomPipeline is implemented by types that want to create a pipeline by adding their own stages

func NewCustom added in v0.1.7

func NewCustom(payloadFactor PayloadFactory) CustomPipeline

NewCustom creates a new pipeline that satisfies CustomPipeline

type DefaultPipeline added in v0.1.7

type DefaultPipeline interface {
	Pipeline

	SetTasks(stageName StageName, tasks ...Task)
	SetAsyncTasks(stageName StageName, tasks ...Task)
	SetCustomStage(stageName StageName, stageRunnerFunc stageRunner)
}

DefaultPipeline is implemented by types that only want to configure existing stages in a pipeline

func NewDefault added in v0.1.7

func NewDefault(payloadFactor PayloadFactory) DefaultPipeline

NewDefault creates a new DefaultPipeline with all default stages set in default run order

type Logger

type Logger interface {
	// Info logs info message
	Info(string)

	// Debug logs debug message
	Debug(string)
}

Logger is implemented by types that want to hook up to logging mechanism in engine

type Options

type Options struct {
	// StagesBlacklist holds list of stages to turn off
	StagesBlacklist []StageName

	// TaskWhitelist holds name of indexing tasks which will be executed
	TaskWhitelist []TaskName
}

type Payload

type Payload interface {
	// MarkAsProcessed is invoked by the pipeline when the payloadMock
	// reaches the end of execution for current height
	MarkAsProcessed()
}

Payload is implemented by values that can be sent through a pipeline.

type PayloadFactory

type PayloadFactory interface {
	// Gets new payloadMock
	GetPayload(int64) Payload
}

PayloadFactory is implemented by objects which know how to create payloadMock for every height

type Pipeline

type Pipeline interface {
	SetLogger(l Logger)
	AddStageBefore(existingStageName StageName, stage *stage)
	AddStageAfter(existingStageName StageName, stage *stage)
	RetryStage(existingStageName StageName, isTransient func(error) bool, maxRetries int)
	Start(ctx context.Context, source Source, sink Sink, options *Options) error
	Run(ctx context.Context, height int64, options *Options) (Payload, error)
}

type Sink

type Sink interface {
	// Consume consumes payloadMock
	Consume(context.Context, Payload) error
}

Sink is executed as a last stage in the pipeline

type Source

type Source interface {
	// Next gets next height
	Next(context.Context, Payload) bool

	// Current returns current height
	Current() int64

	// Err return error if any
	Err() error

	// Skip return bool to skip stage
	Skip(StageName) bool
}

Source is executed before processing of individual heights. It is responsible for getting start and end height.

func NewSource added in v0.1.14

func NewSource() Source

type Stage

type Stage interface {
	// Run Stage
	Run(context.Context, Payload, *Options) error
}

Stage is implemented by types which invoke StageRunner

type StageName

type StageName string
const (
	// Context key used by StatsRecorder
	CtxStats = "stats"

	// Setup stage (Chore): performs setup tasks
	StageSetup StageName = "stage_setup"

	// Fetcher stage (Syncing): fetches data for indexing
	StageFetcher StageName = "stage_fetcher"

	// Parser stage (Syncing): parses and normalizes fetched data to a single structure
	StageParser StageName = "stage_parser"

	// Validator stage (Syncing): validates parsed data
	StageValidator StageName = "stage_validator"

	// Syncer stage (Syncing): saves data to datastore
	StageSyncer StageName = "stage_syncer"

	// Sequencer stage (Indexing): Creates sequences from synced data (syncable)
	StageSequencer StageName = "stage_sequencer"

	// Aggregator stage (Indexing): Creates aggregates from synced data (syncable)
	StageAggregator StageName = "stage_aggregator"

	// StagePersistor stage (Indexing): Persists data to datastore
	StagePersistor StageName = "stage_persistor"

	// Cleanup stage (Chore): Cleans up after execution
	StageCleanup StageName = "stage_cleanup"
)

type StageRunnerFunc

type StageRunnerFunc func(context.Context, Payload, TaskValidator) error

StageRunnerFunc is an adapter to allow the use of plain functions as stageRunner

func (StageRunnerFunc) Run

Run calls f(ctx, p, f).

type Stat

type Stat struct {
	StartTime time.Time
	EndTime   time.Time
	Duration  time.Duration
	Success   bool
}

func NewStat

func NewStat() *Stat

func (*Stat) SetCompleted

func (s *Stat) SetCompleted(success bool)

SetComplete completes stat duration

type StatsRecorder

type StatsRecorder struct {
	Stat
}

func NewStatsRecorder

func NewStatsRecorder() *StatsRecorder

StateRecorder is responsible for recording statistics during pipeline execution TODO: Add stats for every stage and every task

type Task

type Task interface {
	//Run Task
	Run(context.Context, Payload) error

	// GetName gets name of task
	GetName() string
}

Task is implemented by types that want to be executed inside of a stage

func RetryingTask

func RetryingTask(st Task, isTransient func(error) bool, maxRetries int) Task

RetryingTask implements retry mechanism for Task

type TaskName added in v0.1.5

type TaskName string

type TaskValidator

type TaskValidator func(string) bool

TaskValidator is a type for validating task by provided task name

Directories

Path Synopsis
Package mock_pipeline is a generated GoMock package.
Package mock_pipeline is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL