Documentation ¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)
- func NewORM(db *gorm.DB, config Config) *orm
- func NewRunner(orm ORM, config Config) *runner
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- type AnyTask
- type BaseTask
- type BoolParam
- type BridgeTask
- type BytesParam
- type Config
- type DecimalParam
- type DecimalSliceParam
- type FinalResult
- type GetterFunc
- type HTTPTask
- type JSONParseTask
- type JSONSerializable
- type Keypath
- type MapParam
- type MaybeUint64Param
- type MedianTask
- type MultiplyTask
- type ORM
- type PanicTask
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type Run
- type RunErrors
- type RunStatus
- type RunWithResults
- type Runner
- type SafeTx
- type SliceParam
- type Spec
- type StringParam
- type StringSliceParam
- type Task
- type TaskDAG
- type TaskDAGNode
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type URLParam
- type Uint64Param
- type VRFTask
- type Vars
Constants ¶
const (
InputTaskKey = "input"
)
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrParameterEmpty = errors.New("parameter is empty") ErrTooManyErrors = errors.New("too many errors") )
var ( // TODO: Make private again after // https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_task_execution_time", Help: "How long each pipeline task took to execute", }, []string{"job_id", "job_name", "task_type"}, ) PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_run_errors", Help: "Number of errors for each pipeline spec", }, []string{"job_id", "job_name"}, ) PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_run_total_time_to_completion", Help: "How long each pipeline run took to finish (from the moment it was created)", }, []string{"job_id", "job_name"}, ) PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_tasks_total_finished", Help: "The total number of pipeline tasks which have finished", }, []string{"job_id", "job_name", "task_type", "status"}, ) ErrRunPanicked = errors.New("pipeline run panicked") )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrKeypathTooDeep = errors.New("keypath too deep (maximum 2 keys)") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") )
var (
ErrNoSuchBridge = errors.New("no such bridge exists")
)
Functions ¶
func CheckInputs ¶ added in v0.10.8
func FindBridge ¶
FindBridge find a bridge using the given database
func ResolveParam ¶ added in v0.10.8
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
Types ¶
type AnyTask ¶ added in v0.10.0
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout time.Duration `mapstructure:"timeout"` // contains filtered or unexported fields }
func NewBaseTask ¶ added in v0.9.9
func (BaseTask) NumPredecessors ¶ added in v0.10.8
func (BaseTask) OutputIndex ¶
func (BaseTask) OutputTask ¶
func (*BaseTask) SetOutputTask ¶
type BoolParam ¶ added in v0.10.8
type BoolParam bool
func (*BoolParam) UnmarshalPipelineParam ¶ added in v0.10.8
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData string `json:"requestData"` IncludeInputAtKey string `json:"includeInputAtKey"` // contains filtered or unexported fields }
func (*BridgeTask) Run ¶
func (t *BridgeTask) Run(ctx context.Context, vars Vars, meta JSONSerializable, inputs []Result) Result
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type BytesParam ¶ added in v0.10.8
type BytesParam string
func (*BytesParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type Config ¶
type Config interface { BridgeResponseURL() *url.URL DatabaseMaximumTxDuration() time.Duration DatabaseURL() url.URL DefaultHTTPLimit() int64 DefaultHTTPTimeout() models.Duration DefaultMaxHTTPAttempts() uint DefaultHTTPAllowUnrestrictedNetworkAccess() bool TriggerFallbackDBPollInterval() time.Duration JobPipelineMaxRunDuration() time.Duration JobPipelineReaperInterval() time.Duration JobPipelineReaperThreshold() time.Duration }
type DecimalParam ¶ added in v0.10.8
func (DecimalParam) Decimal ¶ added in v0.10.8
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶ added in v0.10.8
func (*DecimalSliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type FinalResult ¶ added in v0.9.10
type FinalResult struct { Values []interface{} Errors []error }
FinalResult is the result of a Run
func (FinalResult) ErrorsDB ¶ added in v0.9.10
func (result FinalResult) ErrorsDB() RunErrors
ErrorsDB dumps a result error for a pipeline_run
func (FinalResult) HasErrors ¶ added in v0.9.10
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) OutputsDB ¶ added in v0.9.10
func (result FinalResult) OutputsDB() JSONSerializable
OutputsDB dumps a result output for a pipeline_run
func (FinalResult) SingularResult ¶ added in v0.9.10
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GetterFunc ¶ added in v0.10.8
type GetterFunc func() (interface{}, error)
func From ¶ added in v0.10.8
func From(getters ...interface{}) []GetterFunc
func Input ¶ added in v0.10.8
func Input(inputs []Result, index int) GetterFunc
func Inputs ¶ added in v0.10.8
func Inputs(inputs []Result) GetterFunc
func JSONWithVarExprs ¶ added in v0.10.8
func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc
func NonemptyString ¶ added in v0.10.8
func NonemptyString(s string) GetterFunc
func VarExpr ¶ added in v0.10.8
func VarExpr(s string, vars Vars) GetterFunc
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL string RequestData string `json:"requestData"` AllowUnrestrictedNetworkAccess string // contains filtered or unexported fields }
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path string `json:"path"` Data string `json:"data"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax string }
func (*JSONParseTask) Run ¶
func (t *JSONParseTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONSerializable ¶
type JSONSerializable struct { Val interface{} Null bool }
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
type MapParam ¶ added in v0.10.8
type MapParam map[string]interface{}
func (*MapParam) UnmarshalPipelineParam ¶ added in v0.10.8
type MaybeUint64Param ¶ added in v0.10.8
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func (MaybeUint64Param) Uint64 ¶ added in v0.10.8
func (u MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (u *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
func (*MedianTask) Run ¶
func (t *MedianTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MultiplyTask ¶
type MultiplyTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Times string `json:"times"` }
func (*MultiplyTask) Run ¶
func (t *MultiplyTask) Run(_ context.Context, vars Vars, _ JSONSerializable, inputs []Result) (result Result)
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { CreateSpec(ctx context.Context, tx *gorm.DB, taskDAG TaskDAG, maxTaskTimeout models.Interval) (int32, error) InsertFinishedRun(db *gorm.DB, run Run, trrs []TaskRunResult, saveSuccessfulTaskRuns bool) (runID int64, err error) DeleteRunsOlderThan(threshold time.Duration) error FindBridge(name models.TaskType) (models.BridgeType, error) FindRun(id int64) (Run, error) GetAllRuns() ([]Run, error) DB() *gorm.DB }
type PipelineParamUnmarshaler ¶ added in v0.10.8
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) ErrorDB ¶ added in v0.9.10
func (result Result) ErrorDB() null.String
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶ added in v0.9.10
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type Run ¶
type Run struct { ID int64 `json:"-" gorm:"primary_key"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"pipelineSpec"` Meta JSONSerializable `json:"meta"` // The errors are only ever strings // DB example: [null, null, "my error"] Errors RunErrors `json:"errors" gorm:"type:jsonb"` // The outputs can be anything. // DB example: [1234, {"a": 10}, null] Outputs JSONSerializable `json:"outputs" gorm:"type:jsonb"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"` }
type RunStatus ¶ added in v0.10.3
type RunStatus int
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = iota // RunStatusInProgress is used for when a run is actively being executed. RunStatusInProgress // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted )
func (RunStatus) Completed ¶ added in v0.10.3
Completed returns true if the status is RunStatusCompleted.
type RunWithResults ¶ added in v0.10.3
type RunWithResults struct { Run Run TaskRunResults TaskRunResults }
type Runner ¶
type Runner interface { service.Service // We expect spec.JobID and spec.JobName to be set for logging/prometheus. // ExecuteRun executes a new run in-memory according to a spec and returns the results. ExecuteRun(ctx context.Context, spec Spec, pipelineInput interface{}, meta JSONSerializable, l logger.Logger) (run Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. InsertFinishedRun(db *gorm.DB, run Run, trrs TaskRunResults, saveSuccessfulTaskRuns bool) (int64, error) // ExecuteAndInsertNewRun executes a new run in-memory according to a spec, persists and saves the results. // It is a combination of ExecuteRun and InsertFinishedRun. // Note that the spec MUST have a DOT graph for this to work. ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, pipelineInput interface{}, meta JSONSerializable, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error) // Test method for inserting completed non-pipeline job runs TestInsertFinishedRun(db *gorm.DB, jobID int32, jobName string, jobType string, specID int32) (int64, error) }
type SafeTx ¶ added in v0.10.4
type SafeTx struct {
// contains filtered or unexported fields
}
Bundled tx and txmutex for multiple goroutines inside the same transaction. This mutex is necessary to work to avoid concurrent database calls inside the same transaction to fail. With the pq driver: `pq: unexpected Parse response 'C'` With the pgx driver: `conn busy`.
type SliceParam ¶ added in v0.10.8
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶ added in v0.10.8
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type Spec struct { ID int32 `gorm:"primary_key"` DotDagSource string `json:"dotDagSource"` CreatedAt time.Time `json:"-"` MaxTaskDuration models.Interval `json:"-"` JobID int32 `gorm:"-" json:"-"` JobName string `gorm:"-" json:"-"` }
func (Spec) TasksInDependencyOrder ¶ added in v0.10.4
type StringParam ¶ added in v0.10.8
type StringParam string
func (*StringParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type StringSliceParam ¶ added in v0.10.8
type StringSliceParam []string
func (*StringSliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (p *StringSliceParam) UnmarshalPipelineParam(val interface{}) error
type Task ¶
type TaskDAG ¶
type TaskDAG struct { *simple.DirectedGraph DOTSource string }
TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks. NOTE: We only permit one child
func NewTaskDAG ¶
func NewTaskDAG() *TaskDAG
func (TaskDAG) MinTimeout ¶ added in v0.9.10
func (TaskDAG) TasksInDependencyOrder ¶
Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.
func (*TaskDAG) UnmarshalText ¶
type TaskDAGNode ¶ added in v0.10.6
func NewTaskDAGNode ¶ added in v0.10.6
func (*TaskDAGNode) Attributes ¶ added in v0.10.6
func (n *TaskDAGNode) Attributes() []encoding.Attribute
func (*TaskDAGNode) DOTID ¶ added in v0.10.6
func (n *TaskDAGNode) DOTID() string
func (*TaskDAGNode) SetAttribute ¶ added in v0.10.6
func (n *TaskDAGNode) SetAttribute(attr encoding.Attribute) error
func (*TaskDAGNode) SetDAG ¶ added in v0.10.6
func (n *TaskDAGNode) SetDAG(g *TaskDAG)
func (*TaskDAGNode) SetDOTID ¶ added in v0.10.6
func (n *TaskDAGNode) SetDOTID(id string)
func (*TaskDAGNode) String ¶ added in v0.10.6
func (n *TaskDAGNode) String() string
type TaskRun ¶
type TaskRun struct { ID int64 `json:"-" gorm:"primary_key"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output *JSONSerializable `json:"output" gorm:"type:jsonb"` Error null.String `json:"error"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` Index int32 `json:"index"` DotID string `json:"dotId"` }
type TaskRunResult ¶ added in v0.9.10
type TaskRunResult struct { ID int64 Task Task TaskRun TaskRun Result Result CreatedAt time.Time FinishedAt time.Time IsTerminal bool }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
type TaskRunResults ¶ added in v0.9.10
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶ added in v0.9.10
func (trrs TaskRunResults) FinalResult() FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
type URLParam ¶ added in v0.10.8
func (*URLParam) UnmarshalPipelineParam ¶ added in v0.10.8
type Uint64Param ¶ added in v0.10.8
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
type VRFTask ¶ added in v0.10.8
type VRFTask struct {
BaseTask `mapstructure:",squash"`
}
type Vars ¶ added in v0.10.8
type Vars struct {
// contains filtered or unexported fields
}