Documentation ¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig) *orm
- func NewRunner(orm ORM, config Config, chainSet evm.ChainSet, ethks ETHKeyStore, ...) *runner
- func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- type AddressParam
- type AddressSliceParam
- type AnyTask
- type BaseTask
- func (t *BaseTask) Base() *BaseTask
- func (t BaseTask) DotID() string
- func (t BaseTask) ID() int
- func (t BaseTask) Inputs() []TaskDependency
- func (t BaseTask) OutputIndex() int32
- func (t BaseTask) Outputs() []Task
- func (t BaseTask) TaskMaxBackoff() time.Duration
- func (t BaseTask) TaskMinBackoff() time.Duration
- func (t BaseTask) TaskRetries() uint32
- func (t BaseTask) TaskTimeout() (time.Duration, bool)
- type BoolParam
- type BridgeTask
- type BytesParam
- type CBORParseTask
- type ConditionalTask
- type Config
- type DecimalParam
- type DecimalSliceParam
- type DivideTask
- type ETHABIDecodeLogTask
- type ETHABIDecodeTask
- type ETHABIEncodeTask
- type ETHABIEncodeTask2
- type ETHCallTask
- type ETHKeyStore
- type ETHTxTask
- type ErrRunPanicked
- type EstimateGasLimitTask
- type FailTask
- type FinalResult
- type GasEstimator
- type GetterFunc
- func From(getters ...interface{}) []GetterFunc
- func Input(inputs []Result, index int) GetterFunc
- func Inputs(inputs []Result) GetterFunc
- func JSONWithVarExprs(jsExpr string, vars Vars, allowErrors bool) GetterFunc
- func NonemptyString(s string) GetterFunc
- func VarExpr(expr string, vars Vars) GetterFunc
- type Graph
- type GraphEdge
- type GraphNode
- type HTTPTask
- type HashSliceParam
- type JSONParseTask
- type JSONPathParam
- type JSONSerializable
- type Keypath
- type LowercaseTask
- type MapParam
- type MaybeBigIntParam
- type MaybeInt32Param
- type MaybeUint64Param
- type MeanTask
- type MedianTask
- type MemoTask
- type MergeTask
- type Method
- type ModeTask
- type MultiplyTask
- type ORM
- type ObjectParam
- type ObjectType
- type PanicTask
- type Pipeline
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type ResumeRequest
- type Run
- func (r *Run) ByDotID(id string) *TaskRun
- func (r Run) GetID() string
- func (r Run) HasErrors() bool
- func (r Run) HasFatalErrors() bool
- func (r *Run) SetID(value string) error
- func (r *Run) Status() RunStatus
- func (r *Run) StringAllErrors() []*string
- func (r *Run) StringFatalErrors() []*string
- func (r *Run) StringOutputs() ([]*string, error)
- type RunErrors
- type RunInfo
- type RunStatus
- type Runner
- type SliceParam
- type Spec
- type StringParam
- type StringSliceParam
- type SumTask
- type Task
- type TaskDependency
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type URLParam
- type Uint64Param
- type UppercaseTask
- type VRFKeyStore
- type VRFTask
- type VRFTaskV2
- type Vars
Constants ¶
const (
InputTaskKey = "input"
)
const KeypathSeparator = "."
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrInputTaskErrored = errors.New("input task errored") ErrParameterEmpty = errors.New("parameter is empty") ErrIndexOutOfRange = errors.New("index out of range") ErrTooManyErrors = errors.New("too many errors") ErrTimeout = errors.New("timeout") ErrTaskRunFailed = errors.New("task run failed") ErrCancelled = errors.New("task run cancelled (fail early)") )
var ( // PromPipelineTaskExecutionTime reports how long each pipeline task took to execute // TODO: Make private again after // https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_task_execution_time", Help: "How long each pipeline task took to execute", }, []string{"job_id", "job_name", "task_id", "task_type"}, ) PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_run_errors", Help: "Number of errors for each pipeline spec", }, []string{"job_id", "job_name"}, ) PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_run_total_time_to_completion", Help: "How long each pipeline run took to finish (from the moment it was created)", }, []string{"job_id", "job_name"}, ) PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_tasks_total_finished", Help: "The total number of pipeline tasks which have finished", }, []string{"job_id", "job_name", "task_id", "task_type", "status"}, ) )
var ( ErrDivideByZero = errors.New("divide by zero") ErrDivisionOverlow = errors.New("division overflow") )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") ErrVarsSetNested = errors.New("cannot set a nested key of a pipeline.Vars") )
var (
ErrInvalidMultiplier = errors.New("Invalid multiplier")
)
var (
ErrMultiplyOverlow = errors.New("multiply overflow")
)
var ErrOverflow = errors.New("overflow")
var (
ErrWrongKeypath = errors.New("wrong keypath format")
)
Functions ¶
func CheckInputs ¶ added in v0.10.8
func NewRunner ¶
func NewRunner(orm ORM, config Config, chainSet evm.ChainSet, ethks ETHKeyStore, vrfks VRFKeyStore, lggr logger.Logger, httpClient, unrestrictedHTTPClient *http.Client) *runner
func ParseETHABIArgsString ¶ added in v1.1.0
func ResolveParam ¶ added in v0.10.8
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
Types ¶
type AddressParam ¶ added in v0.10.9
func (*AddressParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error
type AddressSliceParam ¶ added in v0.10.9
func (*AddressSliceParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error
type AnyTask ¶ added in v0.10.0
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout *time.Duration `mapstructure:"timeout"` FailEarly bool `mapstructure:"failEarly"` Retries null.Uint32 `mapstructure:"retries"` MinBackoff time.Duration `mapstructure:"minBackoff"` MaxBackoff time.Duration `mapstructure:"maxBackoff"` // contains filtered or unexported fields }
func NewBaseTask ¶ added in v0.9.9
func (BaseTask) Inputs ¶ added in v0.10.9
func (t BaseTask) Inputs() []TaskDependency
func (BaseTask) OutputIndex ¶
func (BaseTask) TaskMaxBackoff ¶ added in v1.0.0
func (BaseTask) TaskMinBackoff ¶ added in v1.0.0
func (BaseTask) TaskRetries ¶ added in v1.0.0
type BoolParam ¶ added in v0.10.8
type BoolParam bool
func (*BoolParam) UnmarshalPipelineParam ¶ added in v0.10.8
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData string `json:"requestData"` IncludeInputAtKey string `json:"includeInputAtKey"` Async string `json:"async"` // contains filtered or unexported fields }
Return types:
string
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type BytesParam ¶ added in v0.10.8
type BytesParam []byte
func (*BytesParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type CBORParseTask ¶ added in v0.10.9
type CBORParseTask struct { BaseTask `mapstructure:",squash"` Data string `json:"data"` Mode string `json:"mode"` }
Return types:
map[string]interface{} with potential value types: float64 string bool map[string]interface{} []interface{} nil
func (*CBORParseTask) Type ¶ added in v0.10.9
func (t *CBORParseTask) Type() TaskType
type ConditionalTask ¶ added in v1.5.0
ConditionalTask checks if data is false for now this is all we need but in the future we can expand this to handle more general conditional statements
func (*ConditionalTask) Type ¶ added in v1.5.0
func (t *ConditionalTask) Type() TaskType
type Config ¶
type Config interface { BridgeResponseURL() *url.URL DatabaseURL() url.URL DefaultHTTPLimit() int64 DefaultHTTPTimeout() models.Duration TriggerFallbackDBPollInterval() time.Duration JobPipelineMaxRunDuration() time.Duration JobPipelineReaperInterval() time.Duration JobPipelineReaperThreshold() time.Duration }
type DecimalParam ¶ added in v0.10.8
func (DecimalParam) Decimal ¶ added in v0.10.8
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶ added in v0.10.8
func (*DecimalSliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type DivideTask ¶ added in v0.10.9
type DivideTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Divisor string `json:"divisor"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
func (*DivideTask) Type ¶ added in v0.10.9
func (t *DivideTask) Type() TaskType
type ETHABIDecodeLogTask ¶ added in v0.10.9
type ETHABIDecodeLogTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` Topics string `json:"topics"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeLogTask) Type ¶ added in v0.10.9
func (t *ETHABIDecodeLogTask) Type() TaskType
type ETHABIDecodeTask ¶ added in v0.10.9
type ETHABIDecodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeTask) Type ¶ added in v0.10.9
func (t *ETHABIDecodeTask) Type() TaskType
type ETHABIEncodeTask ¶ added in v0.10.9
type ETHABIEncodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask) Type ¶ added in v0.10.9
func (t *ETHABIEncodeTask) Type() TaskType
type ETHABIEncodeTask2 ¶ added in v1.1.0
type ETHABIEncodeTask2 struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask2) Type ¶ added in v1.1.0
func (t *ETHABIEncodeTask2) Type() TaskType
type ETHCallTask ¶ added in v0.10.9
type ETHCallTask struct { BaseTask `mapstructure:",squash"` Contract string `json:"contract"` From string `json:"from"` Data string `json:"data"` Gas string `json:"gas"` GasPrice string `json:"gasPrice"` GasTipCap string `json:"gasTipCap"` GasFeeCap string `json:"gasFeeCap"` ExtractRevertReason bool `json:"extractRevertReason"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` // contains filtered or unexported fields }
Return types:
[]byte
func (*ETHCallTask) Type ¶ added in v0.10.9
func (t *ETHCallTask) Type() TaskType
type ETHKeyStore ¶ added in v0.10.11
type ETHTxTask ¶ added in v0.10.9
type ETHTxTask struct { BaseTask `mapstructure:",squash"` From string `json:"from"` To string `json:"to"` Data string `json:"data"` GasLimit string `json:"gasLimit"` TxMeta string `json:"txMeta"` MinConfirmations string `json:"minConfirmations"` // FailOnRevert, if set, will error the task if the transaction reverted on-chain // If unset, the receipt will be passed as output // It has no effect if minConfirmations == 0 FailOnRevert string `json:"failOnRevert"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` TransmitChecker string `json:"transmitChecker"` // contains filtered or unexported fields }
Return types:
nil
type ErrRunPanicked ¶ added in v0.10.4
type ErrRunPanicked struct {
// contains filtered or unexported fields
}
When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.
func (ErrRunPanicked) Error ¶ added in v0.10.9
func (err ErrRunPanicked) Error() string
type EstimateGasLimitTask ¶ added in v1.0.0
type EstimateGasLimitTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` From string `json:"from"` To string `json:"to"` Multiplier string `json:"multiplier"` Data string `json:"data"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` // contains filtered or unexported fields }
Return types:
uint64
func (*EstimateGasLimitTask) Type ¶ added in v1.0.0
func (t *EstimateGasLimitTask) Type() TaskType
type FailTask ¶ added in v1.1.0
FailTask is like the Panic task but without all the drama and stack unwinding of a panic
type FinalResult ¶ added in v0.9.10
FinalResult is the result of a Run
func (FinalResult) HasErrors ¶ added in v0.9.10
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) HasFatalErrors ¶ added in v1.1.0
func (result FinalResult) HasFatalErrors() bool
HasFatalErrors returns true if the final result has any errors
func (FinalResult) SingularResult ¶ added in v0.9.10
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GasEstimator ¶ added in v1.0.0
type GetterFunc ¶ added in v0.10.8
type GetterFunc func() (interface{}, error)
GetterFunc is a function that either returns a value or an error.
func From ¶ added in v0.10.8
func From(getters ...interface{}) []GetterFunc
From creates []GetterFunc from a mix of getters or bare values.
func Input ¶ added in v0.10.8
func Input(inputs []Result, index int) GetterFunc
Input creates a getter returning inputs[index] value, or error if index is out of range.
func Inputs ¶ added in v0.10.8
func Inputs(inputs []Result) GetterFunc
Inputs creates a getter returning array of Result.Value (or Result.Error where not nil).
func JSONWithVarExprs ¶ added in v0.10.8
func JSONWithVarExprs(jsExpr string, vars Vars, allowErrors bool) GetterFunc
JSONWithVarExprs creates a getter that unmarshals jsExpr string as JSON, and interpolates all variables expressions found in jsExpr from Vars. The getter returns the unmarshalled object having expressions interpolated from Vars. allowErrors flag indicates if interpolating values stored in Vars can be errors. jsExpr example: {"requestId": $(decode_log.requestId), "payment": $(decode_log.payment)}
func NonemptyString ¶ added in v0.10.8
func NonemptyString(s string) GetterFunc
NonemptyString creates a getter to ensure the string is non-empty.
func VarExpr ¶ added in v0.10.8
func VarExpr(expr string, vars Vars) GetterFunc
VarExpr creates a getter interpolating expr value using the given Vars. The expression allows whitespace on both ends that will be trimmed. Expr examples: $(foo.bar), $(arr.1), $(bar)
type Graph ¶ added in v0.10.9
type Graph struct {
*simple.DirectedGraph
}
tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.
func (*Graph) AddImplicitDependenciesAsEdges ¶ added in v1.2.0
func (g *Graph) AddImplicitDependenciesAsEdges()
Looks at node attributes and searches for implicit dependencies on other nodes expressed as attribute values. Adds those dependencies as implicit edges in the graph.
func (*Graph) IsImplicitEdge ¶ added in v1.2.0
Indicates whether there's an implicit edge from uid -> vid. Implicit edged are ones that weren't added via the TOML spec, but via the pipeline parsing code
func (*Graph) UnmarshalText ¶ added in v0.10.9
type GraphEdge ¶ added in v1.2.0
func (*GraphEdge) IsImplicit ¶ added in v1.2.0
func (*GraphEdge) SetIsImplicit ¶ added in v1.2.0
type GraphNode ¶ added in v0.10.9
func (*GraphNode) Attributes ¶ added in v0.10.9
func (*GraphNode) SetAttribute ¶ added in v0.10.9
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL string RequestData string `json:"requestData"` AllowUnrestrictedNetworkAccess string Headers string // contains filtered or unexported fields }
Return types:
string
type HashSliceParam ¶ added in v0.10.9
func (*HashSliceParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path string `json:"path"` Separator string `json:"separator"` Data string `json:"data"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax string }
Return types:
float64 string bool map[string]interface{} []interface{} nil
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONPathParam ¶ added in v0.10.9
type JSONPathParam []string
func NewJSONPathParam ¶ added in v1.4.0
func NewJSONPathParam(sep string) JSONPathParam
NewJSONPathParam returns a new JSONPathParam using the given separator, or the default if empty.
func (*JSONPathParam) UnmarshalPipelineParam ¶ added in v0.10.9
func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error
UnmarshalPipelineParam unmarshals a slice of strings from val. If val is a string or []byte, it is split on a separator. The default separator is ',' but can be overridden by initializing via NewJSONPathParam.
type JSONSerializable ¶
type JSONSerializable struct { Val interface{} Valid bool }
func (*JSONSerializable) Empty ¶ added in v0.10.10
func (js *JSONSerializable) Empty() bool
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
MarshalJSON implements custom marshaling logic
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
UnmarshalJSON implements custom unmarshaling logic
type Keypath ¶ added in v0.10.8
type Keypath struct { NumParts int // can be 0, 1 or 2 Part0 string // can be empty string if NumParts is 0 Part1 string // can be empty string if NumParts is 0 or 1 }
Keypath contains keypath parsed by NewKeypathFromString.
func NewKeypathFromString ¶ added in v1.4.0
NewKeypathFromString creates a new Keypath from the given string. Returns error if it fails to parse the given keypath string.
type LowercaseTask ¶ added in v1.2.0
Return types:
string
func (*LowercaseTask) Type ¶ added in v1.2.0
func (t *LowercaseTask) Type() TaskType
type MapParam ¶ added in v0.10.8
type MapParam map[string]interface{}
MapParam accepts maps or JSON-encoded strings
func (*MapParam) UnmarshalPipelineParam ¶ added in v0.10.8
type MaybeBigIntParam ¶ added in v1.1.0
type MaybeBigIntParam struct {
// contains filtered or unexported fields
}
func NewMaybeBigIntParam ¶ added in v1.4.0
func NewMaybeBigIntParam(n *big.Int) MaybeBigIntParam
NewMaybeBigIntParam creates a new instance of MaybeBigIntParam
func (MaybeBigIntParam) BigInt ¶ added in v1.1.0
func (p MaybeBigIntParam) BigInt() *big.Int
func (*MaybeBigIntParam) UnmarshalPipelineParam ¶ added in v1.1.0
func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error
type MaybeInt32Param ¶ added in v0.10.9
type MaybeInt32Param struct {
// contains filtered or unexported fields
}
func NewMaybeInt32Param ¶ added in v1.4.0
func NewMaybeInt32Param(n int32, isSet bool) MaybeInt32Param
NewMaybeInt32Param creates new instance of MaybeInt32Param
func (MaybeInt32Param) Int32 ¶ added in v0.10.9
func (p MaybeInt32Param) Int32() (int32, bool)
func (*MaybeInt32Param) UnmarshalPipelineParam ¶ added in v0.10.9
func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error
type MaybeUint64Param ¶ added in v0.10.8
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func NewMaybeUint64Param ¶ added in v1.4.0
func NewMaybeUint64Param(n uint64, isSet bool) MaybeUint64Param
NewMaybeUint64Param creates new instance of MaybeUint64Param
func (MaybeUint64Param) Uint64 ¶ added in v0.10.8
func (p MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MeanTask ¶ added in v0.10.9
type MeanTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MergeTask ¶ added in v1.1.0
type MergeTask struct { BaseTask `mapstructure:",squash"` Left string `json:"left"` Right string `json:"right"` }
Return types:
map[string]interface{}
type Method ¶ added in v1.1.0
go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...
type ModeTask ¶ added in v0.10.9
type ModeTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
map[string]interface{}{ "results": []interface{} containing any other type other pipeline tasks can return "occurrences": (int64) }
type MultiplyTask ¶
type MultiplyTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Times string `json:"times"` }
Return types:
*decimal.Decimal
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { CreateSpec(pipeline Pipeline, maxTaskTimeout models.Interval, qopts ...pg.QOpt) (int32, error) CreateRun(run *Run, qopts ...pg.QOpt) (err error) InsertRun(run *Run, qopts ...pg.QOpt) error DeleteRun(id int64) error StoreRun(run *Run, qopts ...pg.QOpt) (restart bool, err error) UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, start bool, err error) InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error) // InsertFinishedRuns inserts all the given runs into the database. // If saveSuccessfulTaskRuns is false, only errored runs are saved. InsertFinishedRuns(run []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error) DeleteRunsOlderThan(context.Context, time.Duration) error FindRun(id int64) (Run, error) GetAllRuns() ([]Run, error) GetUnfinishedRuns(context.Context, time.Time, func(run Run) error) error GetQ() pg.Q }
type ObjectParam ¶ added in v1.1.0
type ObjectParam struct { Type ObjectType BoolValue BoolParam DecimalValue DecimalParam StringValue StringParam SliceValue SliceParam MapValue MapParam }
ObjectParam represents a kind of any type that could be used by the memo task
func (ObjectParam) Marshal ¶ added in v1.1.0
func (o ObjectParam) Marshal() (string, error)
func (ObjectParam) MarshalJSON ¶ added in v1.1.0
func (o ObjectParam) MarshalJSON() ([]byte, error)
func (ObjectParam) String ¶ added in v1.1.0
func (o ObjectParam) String() string
func (*ObjectParam) UnmarshalPipelineParam ¶ added in v1.1.0
func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error
type ObjectType ¶ added in v1.1.0
type ObjectType int
const ( NilType ObjectType = iota BoolType DecimalType StringType SliceType MapType )
type Pipeline ¶ added in v0.10.9
func (*Pipeline) MinTimeout ¶ added in v0.10.9
func (*Pipeline) RequiresPreInsert ¶ added in v1.0.0
func (*Pipeline) UnmarshalText ¶ added in v0.10.9
type PipelineParamUnmarshaler ¶ added in v0.10.8
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) ErrorDB ¶ added in v0.9.10
func (result Result) ErrorDB() null.String
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶ added in v0.9.10
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type ResumeRequest ¶ added in v1.1.0
type ResumeRequest struct { Error null.String `json:"error"` Value json.RawMessage `json:"value"` }
func (ResumeRequest) ToResult ¶ added in v1.1.0
func (rr ResumeRequest) ToResult() (Result, error)
type Run ¶
type Run struct { ID int64 `json:"-"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"pipelineSpec"` Meta JSONSerializable `json:"meta"` // The errors are only ever strings // DB example: [null, null, "my error"] AllErrors RunErrors `json:"all_errors"` FatalErrors RunErrors `json:"fatal_errors"` Inputs JSONSerializable `json:"inputs"` // Its expected that Output.Val is of type []interface{}. // DB example: [1234, {"a": 10}, null] Outputs JSONSerializable `json:"outputs"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns"` State RunStatus `json:"state"` Pending bool // FailSilently is used to signal that a task with the failEarly flag has failed, and we want to not put this in the db FailSilently bool }
func (Run) HasFatalErrors ¶ added in v1.1.0
func (*Run) StringAllErrors ¶ added in v1.1.0
func (*Run) StringFatalErrors ¶ added in v1.1.0
func (*Run) StringOutputs ¶ added in v1.1.0
type RunErrors ¶ added in v0.10.4
type RunErrors []null.String
type RunStatus ¶ added in v0.10.3
type RunStatus string
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = "unknown" // RunStatusRunning is used for when a run is actively being executed. RunStatusRunning RunStatus = "running" // RunStatusSuspended is used when a run is paused and awaiting further results. RunStatusSuspended RunStatus = "suspended" // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored RunStatus = "errored" // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted RunStatus = "completed" )
func (RunStatus) Completed ¶ added in v0.10.3
Completed returns true if the status is RunStatusCompleted.
type Runner ¶
type Runner interface { services.ServiceCtx // Run is a blocking call that will execute the run until no further progress can be made. // If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in. // Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks. Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error) ResumeRun(taskID uuid.UUID, value interface{}, err error) error // We expect spec.JobID and spec.JobName to be set for logging/prometheus. // ExecuteRun executes a new run in-memory according to a spec and returns the results. ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error InsertFinishedRuns(runs []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error // ExecuteAndInsertFinishedRun executes a new run in-memory according to a spec, persists and saves the results. // It is a combination of ExecuteRun and InsertFinishedRun. // Note that the spec MUST have a DOT graph for this to work. ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error) OnRunFinished(func(*Run)) }
type SliceParam ¶ added in v0.10.8
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶ added in v0.10.8
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type StringParam ¶ added in v0.10.8
type StringParam string
func (*StringParam) String ¶ added in v1.5.0
func (s *StringParam) String() string
func (*StringParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type StringSliceParam ¶ added in v0.10.8
type StringSliceParam []string
func (*StringSliceParam) UnmarshalPipelineParam ¶ added in v0.10.8
func (s *StringSliceParam) UnmarshalPipelineParam(val interface{}) error
type SumTask ¶ added in v0.10.9
type SumTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
type Task ¶
type Task interface { Type() TaskType ID() int DotID() string Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) Base() *BaseTask Outputs() []Task Inputs() []TaskDependency OutputIndex() int32 TaskTimeout() (time.Duration, bool) TaskRetries() uint32 TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration }
type TaskDependency ¶ added in v1.2.0
Wraps the input Task for the given dependent task along with a bool variable PropagateResult, which Indicates whether result of InputTask should be propagated to its dependent task. If the edge between these tasks was an implicit edge, then results are not propagated. This is because some tasks cannot handle an input from an edge which wasn't specified in the spec.
type TaskRun ¶
type TaskRun struct { ID uuid.UUID `json:"id"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output JSONSerializable `json:"output"` Error null.String `json:"error"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` Index int32 `json:"index"` DotID string `json:"dotId"` // contains filtered or unexported fields }
type TaskRunResult ¶ added in v0.9.10
type TaskRunResult struct { ID uuid.UUID Task Task TaskRun TaskRun Result Result Attempts uint CreatedAt time.Time FinishedAt null.Time // contains filtered or unexported fields }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
func (*TaskRunResult) IsPending ¶ added in v0.10.10
func (result *TaskRunResult) IsPending() bool
func (*TaskRunResult) IsTerminal ¶ added in v0.9.10
func (result *TaskRunResult) IsTerminal() bool
type TaskRunResults ¶ added in v0.9.10
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶ added in v0.9.10
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
type TaskType ¶
type TaskType string
const ( TaskTypeHTTP TaskType = "http" TaskTypeBridge TaskType = "bridge" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMode TaskType = "mode" TaskTypeSum TaskType = "sum" TaskTypeMultiply TaskType = "multiply" TaskTypeDivide TaskType = "divide" TaskTypeJSONParse TaskType = "jsonparse" TaskTypeCBORParse TaskType = "cborparse" TaskTypeAny TaskType = "any" TaskTypeVRF TaskType = "vrf" TaskTypeVRFV2 TaskType = "vrfv2" TaskTypeEstimateGasLimit TaskType = "estimategaslimit" TaskTypeETHCall TaskType = "ethcall" TaskTypeETHTx TaskType = "ethtx" TaskTypeETHABIEncode TaskType = "ethabiencode" TaskTypeETHABIEncode2 TaskType = "ethabiencode2" TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" TaskTypeMerge TaskType = "merge" TaskTypeLowercase TaskType = "lowercase" TaskTypeUppercase TaskType = "uppercase" TaskTypeConditional TaskType = "conditional" // Testing only. TaskTypePanic TaskType = "panic" TaskTypeMemo TaskType = "memo" TaskTypeFail TaskType = "fail" )
type URLParam ¶ added in v0.10.8
func (*URLParam) UnmarshalPipelineParam ¶ added in v0.10.8
type Uint64Param ¶ added in v0.10.8
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶ added in v0.10.8
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
type UppercaseTask ¶ added in v1.2.0
Return types:
string
func (*UppercaseTask) Type ¶ added in v1.2.0
func (t *UppercaseTask) Type() TaskType
type VRFKeyStore ¶ added in v0.10.11
type VRFTask ¶ added in v0.10.8
type VRFTask struct { BaseTask `mapstructure:",squash"` PublicKey string `json:"publicKey"` RequestBlockHash string `json:"requestBlockHash"` RequestBlockNumber string `json:"requestBlockNumber"` Topics string `json:"topics"` // contains filtered or unexported fields }
type VRFTaskV2 ¶ added in v1.0.0
type VRFTaskV2 struct { BaseTask `mapstructure:",squash"` PublicKey string `json:"publicKey"` RequestBlockHash string `json:"requestBlockHash"` RequestBlockNumber string `json:"requestBlockNumber"` Topics string `json:"topics"` // contains filtered or unexported fields }
type Vars ¶ added in v0.10.8
type Vars struct {
// contains filtered or unexported fields
}
func NewVarsFrom ¶ added in v0.10.8
NewVarsFrom creates new Vars from the given map. If the map is nil, a new map instance will be created.
func (Vars) Copy ¶ added in v0.10.9
Copy makes a copy of Vars by copying the underlying map. Used by scheduler for new tasks to avoid data races.
Source Files ¶
- common.go
- common_eth.go
- common_http.go
- getters.go
- graph.go
- keypath.go
- models.go
- orm.go
- runner.go
- scheduler.go
- task.any.go
- task.base.go
- task.bridge.go
- task.cborparse.go
- task.conditional.go
- task.divide.go
- task.estimategas.go
- task.eth_abi_decode.go
- task.eth_abi_decode_log.go
- task.eth_abi_encode.go
- task.eth_abi_encode_2.go
- task.eth_call.go
- task.eth_tx.go
- task.fail.go
- task.http.go
- task.jsonparse.go
- task.lowercase.go
- task.mean.go
- task.median.go
- task.memo.go
- task.merge.go
- task.mode.go
- task.multiply.go
- task.panic.go
- task.sum.go
- task.uppercase.go
- task.vrf.go
- task.vrfv2.go
- task_object_params.go
- task_params.go
- variables.go