Documentation ¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func NewORM(db *gorm.DB) *orm
- func NewRunner(orm ORM, config Config, ethClient eth.Client, txManager TxManager) *runner
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- type AddressParam
- type AddressSliceParam
- type AnyTask
- type BaseTask
- type BoolParam
- type BridgeTask
- type BytesParam
- type CBORParseTask
- type Config
- type DecimalParam
- type DecimalSliceParam
- type DivideTask
- type ETHABIDecodeLogTask
- type ETHABIDecodeTask
- type ETHABIEncodeTask
- type ETHCallTask
- type ETHTxTask
- type ErrRunPanicked
- type FinalResult
- type GetterFunc
- type Graph
- type GraphNode
- type HTTPTask
- type HashSliceParam
- type JSONParseTask
- type JSONPathParam
- type JSONSerializable
- type KeyStore
- type Keypath
- type MapParam
- type MaybeInt32Param
- type MaybeUint64Param
- type MeanTask
- type MedianTask
- type ModeTask
- type MultiplyTask
- type ORM
- type PanicTask
- type Pipeline
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type Run
- type RunErrors
- type RunStatus
- type RunWithResults
- type Runner
- type SliceParam
- type Spec
- type StringParam
- type SumTask
- type Task
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type TxManager
- type URLParam
- type Uint64Param
- type VRFTask
- type Vars
Constants ¶
const (
InputTaskKey = "input"
)
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrParameterEmpty = errors.New("parameter is empty") ErrTooManyErrors = errors.New("too many errors") ErrTimeout = errors.New("timeout") ErrTaskRunFailed = errors.New("task run failed") )
var ( // PromPipelineTaskExecutionTime reports how long each pipeline task took to execute // TODO: Make private again after // https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_task_execution_time", Help: "How long each pipeline task took to execute", }, []string{"job_id", "job_name", "task_type"}, ) PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_run_errors", Help: "Number of errors for each pipeline spec", }, []string{"job_id", "job_name"}, ) PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_run_total_time_to_completion", Help: "How long each pipeline run took to finish (from the moment it was created)", }, []string{"job_id", "job_name"}, ) PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_tasks_total_finished", Help: "The total number of pipeline tasks which have finished", }, []string{"job_id", "job_name", "task_type", "status"}, ) )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrKeypathTooDeep = errors.New("keypath too deep (maximum 2 keys)") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") )
var (
ErrNoSuchBridge = errors.New("no such bridge exists")
)
Functions ¶
func CheckInputs ¶
func ResolveParam ¶
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
Types ¶
type AddressParam ¶
func (*AddressParam) UnmarshalPipelineParam ¶
func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error
type AddressSliceParam ¶
func (*AddressSliceParam) UnmarshalPipelineParam ¶
func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error
type AnyTask ¶
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout time.Duration `mapstructure:"timeout"` // contains filtered or unexported fields }
func NewBaseTask ¶
func (BaseTask) OutputIndex ¶
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData string `json:"requestData"` IncludeInputAtKey string `json:"includeInputAtKey"` // contains filtered or unexported fields }
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type BytesParam ¶
type BytesParam []byte
func (*BytesParam) UnmarshalPipelineParam ¶
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type CBORParseTask ¶
func (*CBORParseTask) Type ¶
func (t *CBORParseTask) Type() TaskType
type Config ¶
type Config interface { BridgeResponseURL() *url.URL DatabaseMaximumTxDuration() time.Duration DatabaseURL() url.URL DefaultHTTPLimit() int64 DefaultHTTPTimeout() models.Duration DefaultMaxHTTPAttempts() uint DefaultHTTPAllowUnrestrictedNetworkAccess() bool EthGasLimitDefault() uint64 EthMaxQueuedTransactions() uint64 TriggerFallbackDBPollInterval() time.Duration JobPipelineMaxRunDuration() time.Duration JobPipelineReaperInterval() time.Duration JobPipelineReaperThreshold() time.Duration }
type DecimalParam ¶
func (DecimalParam) Decimal ¶
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶
func (*DecimalSliceParam) UnmarshalPipelineParam ¶
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type DivideTask ¶
type DivideTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Divisor string `json:"divisor"` Precision string `json:"precision"` }
func (*DivideTask) Type ¶
func (t *DivideTask) Type() TaskType
type ETHABIDecodeLogTask ¶
type ETHABIDecodeLogTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` Topics string `json:"topics"` }
func (*ETHABIDecodeLogTask) Type ¶
func (t *ETHABIDecodeLogTask) Type() TaskType
type ETHABIDecodeTask ¶
type ETHABIDecodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
func (*ETHABIDecodeTask) Type ¶
func (t *ETHABIDecodeTask) Type() TaskType
type ETHABIEncodeTask ¶
type ETHABIEncodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
func (*ETHABIEncodeTask) Type ¶
func (t *ETHABIEncodeTask) Type() TaskType
type ETHCallTask ¶
type ETHCallTask struct { BaseTask `mapstructure:",squash"` Contract string `json:"contract"` Data string `json:"data"` // contains filtered or unexported fields }
func (*ETHCallTask) Type ¶
func (t *ETHCallTask) Type() TaskType
type ETHTxTask ¶
type ETHTxTask struct { BaseTask `mapstructure:",squash"` From string `json:"from"` To string `json:"to"` Data string `json:"data"` GasLimit string `json:"gasLimit"` TxMeta string `json:"txMeta"` // contains filtered or unexported fields }
type ErrRunPanicked ¶
type ErrRunPanicked struct {
// contains filtered or unexported fields
}
When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.
func (ErrRunPanicked) Error ¶
func (err ErrRunPanicked) Error() string
type FinalResult ¶
type FinalResult struct { Values []interface{} Errors []error }
FinalResult is the result of a Run
func (FinalResult) ErrorsDB ¶
func (result FinalResult) ErrorsDB() RunErrors
ErrorsDB dumps a result error for a pipeline_run
func (FinalResult) HasErrors ¶
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) OutputsDB ¶
func (result FinalResult) OutputsDB() JSONSerializable
OutputsDB dumps a result output for a pipeline_run
func (FinalResult) SingularResult ¶
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GetterFunc ¶
type GetterFunc func() (interface{}, error)
func From ¶
func From(getters ...interface{}) []GetterFunc
func Input ¶
func Input(inputs []Result, index int) GetterFunc
func Inputs ¶
func Inputs(inputs []Result) GetterFunc
func JSONWithVarExprs ¶
func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc
func NonemptyString ¶
func NonemptyString(s string) GetterFunc
func VarExpr ¶
func VarExpr(s string, vars Vars) GetterFunc
type Graph ¶
type Graph struct {
*simple.DirectedGraph
}
tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.
func (*Graph) UnmarshalText ¶
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL string RequestData string `json:"requestData"` AllowUnrestrictedNetworkAccess string // contains filtered or unexported fields }
type HashSliceParam ¶
func (*HashSliceParam) UnmarshalPipelineParam ¶
func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path string `json:"path"` Data string `json:"data"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax string }
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONPathParam ¶
type JSONPathParam []string
func (*JSONPathParam) UnmarshalPipelineParam ¶
func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error
type JSONSerializable ¶
type JSONSerializable struct { Val interface{} Null bool }
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
type MaybeInt32Param ¶
type MaybeInt32Param struct {
// contains filtered or unexported fields
}
func (MaybeInt32Param) Int32 ¶
func (p MaybeInt32Param) Int32() (int32, bool)
func (*MaybeInt32Param) UnmarshalPipelineParam ¶
func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error
type MaybeUint64Param ¶
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func (MaybeUint64Param) Uint64 ¶
func (p MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶
func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MeanTask ¶
type MeanTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` Precision string `json:"precision"` }
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type ModeTask ¶
type ModeTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
type MultiplyTask ¶
type MultiplyTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Times string `json:"times"` }
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { CreateSpec(ctx context.Context, tx *gorm.DB, pipeline Pipeline, maxTaskTimeout models.Interval) (int32, error) InsertFinishedRun(db *gorm.DB, run Run, trrs []TaskRunResult, saveSuccessfulTaskRuns bool) (runID int64, err error) DeleteRunsOlderThan(threshold time.Duration) error FindRun(id int64) (Run, error) GetAllRuns() ([]Run, error) DB() *gorm.DB }
type Pipeline ¶
func (*Pipeline) UnmarshalText ¶
type PipelineParamUnmarshaler ¶
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) ErrorDB ¶
func (result Result) ErrorDB() null.String
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type Run ¶
type Run struct { ID int64 `json:"-" gorm:"primary_key"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"pipelineSpec"` Meta JSONSerializable `json:"meta"` // The errors are only ever strings // DB example: [null, null, "my error"] Errors RunErrors `json:"errors" gorm:"type:jsonb"` // The outputs can be anything. // DB example: [1234, {"a": 10}, null] Outputs JSONSerializable `json:"outputs" gorm:"type:jsonb"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"` }
type RunStatus ¶
type RunStatus int
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = iota // RunStatusInProgress is used for when a run is actively being executed. RunStatusInProgress // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted )
type RunWithResults ¶
type RunWithResults struct { Run Run TaskRunResults TaskRunResults }
type Runner ¶
type Runner interface { service.Service // We expect spec.JobID and spec.JobName to be set for logging/prometheus. // ExecuteRun executes a new run in-memory according to a spec and returns the results. ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. InsertFinishedRun(db *gorm.DB, run Run, trrs TaskRunResults, saveSuccessfulTaskRuns bool) (int64, error) // ExecuteAndInsertNewRun executes a new run in-memory according to a spec, persists and saves the results. // It is a combination of ExecuteRun and InsertFinishedRun. // Note that the spec MUST have a DOT graph for this to work. ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error) // Test method for inserting completed non-pipeline job runs TestInsertFinishedRun(db *gorm.DB, jobID int32, jobName string, jobType string, specID int32) (int64, error) }
type SliceParam ¶
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type StringParam ¶
type StringParam string
func (*StringParam) UnmarshalPipelineParam ¶
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type SumTask ¶
type SumTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
type Task ¶
type TaskRun ¶
type TaskRun struct { ID int64 `json:"-" gorm:"primary_key"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output *JSONSerializable `json:"output" gorm:"type:jsonb"` Error null.String `json:"error"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` Index int32 `json:"index"` DotID string `json:"dotId"` }
type TaskRunResult ¶
type TaskRunResult struct { ID int64 Task Task TaskRun TaskRun Result Result CreatedAt time.Time FinishedAt time.Time }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
func (*TaskRunResult) IsTerminal ¶
func (result *TaskRunResult) IsTerminal() bool
type TaskRunResults ¶
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶
func (trrs TaskRunResults) FinalResult() FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
type TaskType ¶
type TaskType string
const ( TaskTypeHTTP TaskType = "http" TaskTypeBridge TaskType = "bridge" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMode TaskType = "mode" TaskTypeSum TaskType = "sum" TaskTypeMultiply TaskType = "multiply" TaskTypeDivide TaskType = "divide" TaskTypeJSONParse TaskType = "jsonparse" TaskTypeCBORParse TaskType = "cborparse" TaskTypeAny TaskType = "any" TaskTypeVRF TaskType = "vrf" TaskTypeETHCall TaskType = "ethcall" TaskTypeETHTx TaskType = "ethtx" TaskTypeETHABIEncode TaskType = "ethabiencode" TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" // Testing only. TaskTypePanic TaskType = "panic" )
type URLParam ¶
func (*URLParam) UnmarshalPipelineParam ¶
type Uint64Param ¶
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
Source Files ¶
- common.go
- common_eth.go
- common_http.go
- graph.go
- models.go
- orm.go
- runner.go
- scheduler.go
- task.any.go
- task.base.go
- task.bridge.go
- task.cborparse.go
- task.divide.go
- task.eth_abi_decode.go
- task.eth_abi_decode_log.go
- task.eth_abi_encode.go
- task.eth_call.go
- task.eth_tx.go
- task.http.go
- task.jsonparse.go
- task.mean.go
- task.median.go
- task.mode.go
- task.multiply.go
- task.panic.go
- task.sum.go
- task.vrf.go
- task_params.go
- variables.go