pipeline

package
v0.10.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InputTaskKey = "input"
)

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
	ErrParameterEmpty        = errors.New("parameter is empty")
	ErrTooManyErrors         = errors.New("too many errors")
)
View Source
var (
	// TODO: Make private again after
	// https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline
	PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_task_execution_time",
		Help: "How long each pipeline task took to execute",
	},
		[]string{"job_id", "job_name", "task_type"},
	)
	PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_run_errors",
		Help: "Number of errors for each pipeline spec",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_run_total_time_to_completion",
		Help: "How long each pipeline run took to finish (from the moment it was created)",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_tasks_total_finished",
		Help: "The total number of pipeline tasks which have finished",
	},
		[]string{"job_id", "job_name", "task_type", "status"},
	)
	ErrRunPanicked = errors.New("pipeline run panicked")
)
View Source
var (
	ErrKeypathNotFound = errors.New("keypath not found")
	ErrKeypathTooDeep  = errors.New("keypath too deep (maximum 2 keys)")
	ErrVarsRoot        = errors.New("cannot get/set the root of a pipeline.Vars")
)
View Source
var (
	ErrNoSuchBridge = errors.New("no such bridge exists")
)

Functions

func CheckInputs added in v0.10.8

func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)

func FindBridge

func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)

FindBridge find a bridge using the given database

func NewORM

func NewORM(db *gorm.DB, config Config) *orm

func NewRunner

func NewRunner(orm ORM, config Config) *runner

func ResolveParam added in v0.10.8

func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error

Types

type AnyTask added in v0.10.0

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run added in v0.10.0

func (t *AnyTask) Run(_ context.Context, _ Vars, _ JSONSerializable, inputs []Result) (result Result)

func (*AnyTask) Type added in v0.10.0

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index   int32         `mapstructure:"index" json:"-" `
	Timeout time.Duration `mapstructure:"timeout"`
	// contains filtered or unexported fields
}

func NewBaseTask added in v0.9.9

func NewBaseTask(dotID string, t Task, index int32, numPredecessors int) BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) NumPredecessors added in v0.10.8

func (t BaseTask) NumPredecessors() int

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) OutputTask

func (t BaseTask) OutputTask() Task

func (*BaseTask) SetOutputTask

func (t *BaseTask) SetOutputTask(outputTask Task)

func (BaseTask) TaskTimeout added in v0.9.7

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BoolParam added in v0.10.8

type BoolParam bool

func (*BoolParam) UnmarshalPipelineParam added in v0.10.8

func (b *BoolParam) UnmarshalPipelineParam(val interface{}) error

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name              string `json:"name"`
	RequestData       string `json:"requestData"`
	IncludeInputAtKey string `json:"includeInputAtKey"`
	// contains filtered or unexported fields
}

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, vars Vars, meta JSONSerializable, inputs []Result) Result

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type BytesParam added in v0.10.8

type BytesParam string

func (*BytesParam) UnmarshalPipelineParam added in v0.10.8

func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() url.URL
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	DefaultMaxHTTPAttempts() uint
	DefaultHTTPAllowUnrestrictedNetworkAccess() bool
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineMaxRunDuration() time.Duration
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type DecimalParam added in v0.10.8

type DecimalParam decimal.Decimal

func (DecimalParam) Decimal added in v0.10.8

func (d DecimalParam) Decimal() decimal.Decimal

func (*DecimalParam) UnmarshalPipelineParam added in v0.10.8

func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error

type DecimalSliceParam added in v0.10.8

type DecimalSliceParam []decimal.Decimal

func (*DecimalSliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error

type FinalResult added in v0.9.10

type FinalResult struct {
	Values []interface{}
	Errors []error
}

FinalResult is the result of a Run

func (FinalResult) ErrorsDB added in v0.9.10

func (result FinalResult) ErrorsDB() RunErrors

ErrorsDB dumps a result error for a pipeline_run

func (FinalResult) HasErrors added in v0.9.10

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) OutputsDB added in v0.9.10

func (result FinalResult) OutputsDB() JSONSerializable

OutputsDB dumps a result output for a pipeline_run

func (FinalResult) SingularResult added in v0.9.10

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type GetterFunc added in v0.10.8

type GetterFunc func() (interface{}, error)

func From added in v0.10.8

func From(getters ...interface{}) []GetterFunc

func Input added in v0.10.8

func Input(inputs []Result, index int) GetterFunc

func Inputs added in v0.10.8

func Inputs(inputs []Result) GetterFunc

func JSONWithVarExprs added in v0.10.8

func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc

func NonemptyString added in v0.10.8

func NonemptyString(s string) GetterFunc

func VarExpr added in v0.10.8

func VarExpr(s string, vars Vars) GetterFunc

type HTTPTask

type HTTPTask struct {
	BaseTask                       `mapstructure:",squash"`
	Method                         string
	URL                            string
	RequestData                    string `json:"requestData"`
	AllowUnrestrictedNetworkAccess string
	// contains filtered or unexported fields
}

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, vars Vars, _ JSONSerializable, inputs []Result) Result

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     string `json:"path"`
	Data     string `json:"data"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax string
}

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONSerializable

type JSONSerializable struct {
	Val  interface{}
	Null bool
}

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type Keypath added in v0.10.8

type Keypath [2][]byte

func (Keypath) NumParts added in v0.10.8

func (keypath Keypath) NumParts() int

func (Keypath) String added in v0.10.8

func (keypath Keypath) String() string

type MapParam added in v0.10.8

type MapParam map[string]interface{}

func (*MapParam) UnmarshalPipelineParam added in v0.10.8

func (m *MapParam) UnmarshalPipelineParam(val interface{}) error

type MaybeUint64Param added in v0.10.8

type MaybeUint64Param struct {
	// contains filtered or unexported fields
}

func (MaybeUint64Param) Uint64 added in v0.10.8

func (u MaybeUint64Param) Uint64() (uint64, bool)

func (*MaybeUint64Param) UnmarshalPipelineParam added in v0.10.8

func (u *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
	Times    string `json:"times"`
}

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(ctx context.Context, tx *gorm.DB, taskDAG TaskDAG, maxTaskTimeout models.Interval) (int32, error)
	InsertFinishedRun(db *gorm.DB, run Run, trrs []TaskRunResult, saveSuccessfulTaskRuns bool) (runID int64, err error)
	DeleteRunsOlderThan(threshold time.Duration) error
	FindBridge(name models.TaskType) (models.BridgeType, error)
	FindRun(id int64) (Run, error)
	GetAllRuns() ([]Run, error)
	DB() *gorm.DB
}

type PanicTask added in v0.10.4

type PanicTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

func (*PanicTask) Run added in v0.10.4

func (t *PanicTask) Run(_ context.Context, vars Vars, _ JSONSerializable, _ []Result) (result Result)

func (*PanicTask) Type added in v0.10.4

func (t *PanicTask) Type() TaskType

type PipelineParamUnmarshaler added in v0.10.8

type PipelineParamUnmarshaler interface {
	UnmarshalPipelineParam(val interface{}) error
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB added in v0.9.10

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB added in v0.9.10

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type Run

type Run struct {
	ID             int64            `json:"-" gorm:"primary_key"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"pipelineSpec"`
	Meta           JSONSerializable `json:"meta"`
	// The errors are only ever strings
	// DB example: [null, null, "my error"]
	Errors RunErrors `json:"errors" gorm:"type:jsonb"`
	// The outputs can be anything.
	// DB example: [1234, {"a": 10}, null]
	Outputs          JSONSerializable `json:"outputs" gorm:"type:jsonb"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       *time.Time       `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"`
}

func (Run) GetID added in v0.9.6

func (r Run) GetID() string

func (Run) HasErrors added in v0.9.10

func (r Run) HasErrors() bool

func (*Run) SetID added in v0.9.6

func (r *Run) SetID(value string) error

func (*Run) Status added in v0.10.3

func (r *Run) Status() RunStatus

Status determines the status of the run.

func (Run) TableName

func (Run) TableName() string

type RunErrors added in v0.10.4

type RunErrors []null.String

func (RunErrors) HasError added in v0.10.4

func (re RunErrors) HasError() bool

func (*RunErrors) Scan added in v0.10.4

func (re *RunErrors) Scan(value interface{}) error

func (RunErrors) Value added in v0.10.4

func (re RunErrors) Value() (driver.Value, error)

type RunStatus added in v0.10.3

type RunStatus int

RunStatus represents the status of a run

const (
	// RunStatusUnknown is the when the run status cannot be determined.
	RunStatusUnknown RunStatus = iota
	// RunStatusInProgress is used for when a run is actively being executed.
	RunStatusInProgress
	// RunStatusErrored is used for when a run has errored and will not complete.
	RunStatusErrored
	// RunStatusCompleted is used for when a run has successfully completed execution.
	RunStatusCompleted
)

func (RunStatus) Completed added in v0.10.3

func (s RunStatus) Completed() bool

Completed returns true if the status is RunStatusCompleted.

func (RunStatus) Errored added in v0.10.3

func (s RunStatus) Errored() bool

Errored returns true if the status is RunStatusErrored.

func (RunStatus) Finished added in v0.10.3

func (s RunStatus) Finished() bool

Finished returns true if the status is final and can't be changed.

type RunWithResults added in v0.10.3

type RunWithResults struct {
	Run            Run
	TaskRunResults TaskRunResults
}

type Runner

type Runner interface {
	service.Service

	// We expect spec.JobID and spec.JobName to be set for logging/prometheus.
	// ExecuteRun executes a new run in-memory according to a spec and returns the results.
	ExecuteRun(ctx context.Context, spec Spec, pipelineInput interface{}, meta JSONSerializable, l logger.Logger) (run Run, trrs TaskRunResults, err error)
	// InsertFinishedRun saves the run results in the database.
	InsertFinishedRun(db *gorm.DB, run Run, trrs TaskRunResults, saveSuccessfulTaskRuns bool) (int64, error)

	// ExecuteAndInsertNewRun executes a new run in-memory according to a spec, persists and saves the results.
	// It is a combination of ExecuteRun and InsertFinishedRun.
	// Note that the spec MUST have a DOT graph for this to work.
	ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, pipelineInput interface{}, meta JSONSerializable, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error)

	// Test method for inserting completed non-pipeline job runs
	TestInsertFinishedRun(db *gorm.DB, jobID int32, jobName string, jobType string, specID int32) (int64, error)
}

type SafeTx added in v0.10.4

type SafeTx struct {
	// contains filtered or unexported fields
}

Bundled tx and txmutex for multiple goroutines inside the same transaction. This mutex is necessary to work to avoid concurrent database calls inside the same transaction to fail. With the pq driver: `pq: unexpected Parse response 'C'` With the pgx driver: `conn busy`.

type SliceParam added in v0.10.8

type SliceParam []interface{}

func (SliceParam) FilterErrors added in v0.10.8

func (s SliceParam) FilterErrors() (SliceParam, int)

func (*SliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error

type Spec

type Spec struct {
	ID              int32           `gorm:"primary_key"`
	DotDagSource    string          `json:"dotDagSource"`
	CreatedAt       time.Time       `json:"-"`
	MaxTaskDuration models.Interval `json:"-"`

	JobID   int32  `gorm:"-" json:"-"`
	JobName string `gorm:"-" json:"-"`
}

func (Spec) TableName

func (Spec) TableName() string

func (Spec) TasksInDependencyOrder added in v0.10.4

func (s Spec) TasksInDependencyOrder() ([]Task, error)

type StringParam added in v0.10.8

type StringParam string

func (*StringParam) UnmarshalPipelineParam added in v0.10.8

func (s *StringParam) UnmarshalPipelineParam(val interface{}) error

type StringSliceParam added in v0.10.8

type StringSliceParam []string

func (*StringSliceParam) UnmarshalPipelineParam added in v0.10.8

func (p *StringSliceParam) UnmarshalPipelineParam(val interface{}) error

type Task

type Task interface {
	Type() TaskType
	DotID() string
	Run(ctx context.Context, vars Vars, meta JSONSerializable, inputs []Result) Result
	OutputTask() Task
	SetOutputTask(task Task)
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
	NumPredecessors() int
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, dotID string, config Config, txdb *gorm.DB, txdbMutex *sync.Mutex, numPredecessors int) (_ Task, err error)

type TaskDAG

type TaskDAG struct {
	*simple.DirectedGraph
	DOTSource string
}

TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks. NOTE: We only permit one child

func NewTaskDAG

func NewTaskDAG() *TaskDAG

func (*TaskDAG) HasCycles

func (g *TaskDAG) HasCycles() bool

func (TaskDAG) MinTimeout added in v0.9.10

func (g TaskDAG) MinTimeout() (time.Duration, bool, error)

func (*TaskDAG) NewNode

func (g *TaskDAG) NewNode() graph.Node

func (TaskDAG) TasksInDependencyOrder

func (g TaskDAG) TasksInDependencyOrder() ([]Task, error)

Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.

func (*TaskDAG) UnmarshalText

func (g *TaskDAG) UnmarshalText(bs []byte) (err error)

type TaskDAGNode added in v0.10.6

type TaskDAGNode struct {
	graph.Node
	// contains filtered or unexported fields
}

func NewTaskDAGNode added in v0.10.6

func NewTaskDAGNode(n graph.Node, dotID string, attrs map[string]string) *TaskDAGNode

func (*TaskDAGNode) Attributes added in v0.10.6

func (n *TaskDAGNode) Attributes() []encoding.Attribute

func (*TaskDAGNode) DOTID added in v0.10.6

func (n *TaskDAGNode) DOTID() string

func (*TaskDAGNode) SetAttribute added in v0.10.6

func (n *TaskDAGNode) SetAttribute(attr encoding.Attribute) error

func (*TaskDAGNode) SetDAG added in v0.10.6

func (n *TaskDAGNode) SetDAG(g *TaskDAG)

func (*TaskDAGNode) SetDOTID added in v0.10.6

func (n *TaskDAGNode) SetDOTID(id string)

func (*TaskDAGNode) String added in v0.10.6

func (n *TaskDAGNode) String() string

type TaskRun

type TaskRun struct {
	ID            int64             `json:"-" gorm:"primary_key"`
	Type          TaskType          `json:"type"`
	PipelineRun   Run               `json:"-"`
	PipelineRunID int64             `json:"-"`
	Output        *JSONSerializable `json:"output" gorm:"type:jsonb"`
	Error         null.String       `json:"error"`
	CreatedAt     time.Time         `json:"createdAt"`
	FinishedAt    *time.Time        `json:"finishedAt"`
	Index         int32             `json:"index"`
	DotID         string            `json:"dotId"`
}

func (TaskRun) GetDotID added in v0.10.4

func (tr TaskRun) GetDotID() string

func (TaskRun) GetID added in v0.9.6

func (tr TaskRun) GetID() string

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID added in v0.9.6

func (tr *TaskRun) SetID(value string) error

func (TaskRun) TableName

func (TaskRun) TableName() string

type TaskRunResult added in v0.9.10

type TaskRunResult struct {
	ID         int64
	Task       Task
	TaskRun    TaskRun
	Result     Result
	CreatedAt  time.Time
	FinishedAt time.Time
	IsTerminal bool
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

type TaskRunResults added in v0.9.10

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult added in v0.9.10

func (trrs TaskRunResults) FinalResult() FinalResult

FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task

type TaskType

type TaskType string
const (
	TaskTypeHTTP      TaskType = "http"
	TaskTypeBridge    TaskType = "bridge"
	TaskTypeMedian    TaskType = "median"
	TaskTypeMultiply  TaskType = "multiply"
	TaskTypeJSONParse TaskType = "jsonparse"
	TaskTypeAny       TaskType = "any"
	TaskTypeVRF       TaskType = "vrf"

	// Testing only.
	TaskTypePanic TaskType = "panic"
)

func (TaskType) String added in v0.10.6

func (t TaskType) String() string

type URLParam added in v0.10.8

type URLParam url.URL

func (*URLParam) String added in v0.10.8

func (u *URLParam) String() string

func (*URLParam) UnmarshalPipelineParam added in v0.10.8

func (u *URLParam) UnmarshalPipelineParam(val interface{}) error

type Uint64Param added in v0.10.8

type Uint64Param uint64

func (*Uint64Param) UnmarshalPipelineParam added in v0.10.8

func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error

type VRFTask added in v0.10.8

type VRFTask struct {
	BaseTask `mapstructure:",squash"`
}

func (*VRFTask) Run added in v0.10.8

func (t *VRFTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)

func (*VRFTask) Type added in v0.10.8

func (t *VRFTask) Type() TaskType

type Vars added in v0.10.8

type Vars struct {
	// contains filtered or unexported fields
}

func NewVarsFrom added in v0.10.8

func NewVarsFrom(m map[string]interface{}) Vars

func (Vars) Get added in v0.10.8

func (vars Vars) Get(keypathStr string) (interface{}, error)

func (Vars) Set added in v0.10.8

func (vars Vars) Set(dotID string, value interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL