Documentation ¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, ...) *orm
- func NewRunner(orm ORM, btORM bridges.ORM, cfg Config, bridgeCfg BridgeConfig, ...) *runner
- func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- func SelectGasLimit(ge config.GasEstimator, jobType string, specGasLimit *uint32) uint64
- type AddressParam
- type AddressSliceParam
- type AnyTask
- type Base64DecodeTask
- type Base64EncodeTask
- type BaseTask
- func (t *BaseTask) Base() *BaseTask
- func (t BaseTask) DotID() string
- func (t BaseTask) ID() int
- func (t BaseTask) Inputs() []TaskDependency
- func (t BaseTask) OutputIndex() int32
- func (t BaseTask) Outputs() []Task
- func (t BaseTask) TaskMaxBackoff() time.Duration
- func (t BaseTask) TaskMinBackoff() time.Duration
- func (t BaseTask) TaskRetries() uint32
- func (t BaseTask) TaskTimeout() (time.Duration, bool)
- type BoolParam
- type BridgeConfig
- type BridgeTask
- type BytesParam
- type CBORParseTask
- type ConditionalTask
- type Config
- type DecimalParam
- type DecimalSliceParam
- type DivideTask
- type ETHABIDecodeLogTask
- type ETHABIDecodeTask
- type ETHABIEncodeTask
- type ETHABIEncodeTask2
- type ETHCallTask
- type ETHKeyStore
- type ETHTxTask
- type ErrRunPanicked
- type EstimateGasLimitTask
- type FailTask
- type FinalResult
- type GasEstimator
- type GetterFunc
- func From(getters ...interface{}) []GetterFunc
- func Input(inputs []Result, index int) GetterFunc
- func Inputs(inputs []Result) GetterFunc
- func JSONWithVarExprs(jsExpr string, vars Vars, allowErrors bool) GetterFunc
- func NonemptyString(s string) GetterFunc
- func ValidDurationInSeconds(s string) GetterFunc
- func VarExpr(expr string, vars Vars) GetterFunc
- type Graph
- type GraphEdge
- type GraphNode
- type HTTPTask
- type HashSliceParam
- type HexDecodeTask
- type HexEncodeTask
- type JSONParseTask
- type JSONPathParam
- type Keypath
- type LengthTask
- type LessThanTask
- type LookupTask
- type LowercaseTask
- type MapParam
- type MaybeBigIntParam
- type MaybeInt32Param
- type MaybeUint64Param
- type MeanTask
- type MedianTask
- type MemoTask
- type MergeTask
- type Method
- type ModeTask
- type MultiplyTask
- type ORM
- type ObjectParam
- type ObjectType
- type PanicTask
- type Pipeline
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type ResumeRequest
- type Run
- func (r *Run) ByDotID(id string) *TaskRun
- func (r Run) GetID() string
- func (r Run) HasErrors() bool
- func (r Run) HasFatalErrors() bool
- func (r *Run) SetID(value string) error
- func (r *Run) Status() RunStatus
- func (r *Run) StringAllErrors() []*string
- func (r *Run) StringFatalErrors() []*string
- func (r *Run) StringOutputs() ([]*string, error)
- type RunErrors
- type RunInfo
- type RunStatus
- type Runner
- type SliceParam
- type Spec
- type StringParam
- type StringSliceParam
- type SumTask
- type Task
- type TaskDependency
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type URLParam
- type Uint64Param
- type UppercaseTask
- type VRFKeyStore
- type VRFTask
- type VRFTaskV2
- type VRFTaskV2Plus
- type Vars
Constants ¶
const ( BlockHeaderFeederJobType string = "blockheaderfeeder" BlockhashStoreJobType string = "blockhashstore" BootstrapJobType string = "bootstrap" CronJobType string = "cron" DirectRequestJobType string = "directrequest" FluxMonitorJobType string = "fluxmonitor" GatewayJobType string = "gateway" KeeperJobType string = "keeper" LegacyGasStationServerJobType string = "legacygasstationserver" LegacyGasStationSidecarJobType string = "legacygasstationsidecar" OffchainReporting2JobType string = "offchainreporting2" OffchainReportingJobType string = "offchainreporting" StreamJobType string = "stream" VRFJobType string = "vrf" WebhookJobType string = "webhook" WorkflowJobType string = "workflow" )
const (
InputTaskKey = "input"
)
const KeepersObservationSource = `` /* 3338-byte string literal not displayed */
KeepersObservationSource is the same for all keeper jobs and it is not persisted in DB
const KeypathSeparator = "."
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrInputTaskErrored = errors.New("input task errored") ErrParameterEmpty = errors.New("parameter is empty") ErrIndexOutOfRange = errors.New("index out of range") ErrTooManyErrors = errors.New("too many errors") ErrTimeout = errors.New("timeout") ErrTaskRunFailed = errors.New("task run failed") ErrCancelled = errors.New("task run cancelled (fail early)") )
var ( // PromPipelineTaskExecutionTime reports how long each pipeline task took to execute // TODO: Make private again after // https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_task_execution_time", Help: "How long each pipeline task took to execute", }, []string{"job_id", "job_name", "task_id", "task_type"}, ) PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_run_errors", Help: "Number of errors for each pipeline spec", }, []string{"job_id", "job_name"}, ) PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pipeline_run_total_time_to_completion", Help: "How long each pipeline run took to finish (from the moment it was created)", }, []string{"job_id", "job_name"}, ) PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pipeline_tasks_total_finished", Help: "The total number of pipeline tasks which have finished", }, []string{"job_id", "job_name", "task_id", "task_type", "bridge_name", "status"}, ) )
var ( ErrDivideByZero = errors.New("divide by zero") ErrDivisionOverlow = errors.New("division overflow") )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") ErrVarsSetNested = errors.New("cannot set a nested key of a pipeline.Vars") )
var ErrInvalidEVMChainID = errors.New("invalid EVM chain ID")
var (
ErrInvalidMultiplier = errors.New("Invalid multiplier")
)
var (
ErrMultiplyOverlow = errors.New("multiply overflow")
)
var ErrOverflow = errors.New("overflow")
var (
ErrWrongKeypath = errors.New("wrong keypath format")
)
Functions ¶
func CheckInputs ¶
func NewRunner ¶
func NewRunner(orm ORM, btORM bridges.ORM, cfg Config, bridgeCfg BridgeConfig, legacyChains legacyevm.LegacyChainContainer, ethks ETHKeyStore, vrfks VRFKeyStore, lggr logger.Logger, httpClient, unrestrictedHTTPClient *http.Client) *runner
func ParseETHABIArgsString ¶
func ResolveParam ¶
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
func SelectGasLimit ¶
func SelectGasLimit(ge config.GasEstimator, jobType string, specGasLimit *uint32) uint64
Types ¶
type AddressParam ¶
func (*AddressParam) UnmarshalPipelineParam ¶
func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error
type AddressSliceParam ¶
func (*AddressSliceParam) UnmarshalPipelineParam ¶
func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error
type AnyTask ¶
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type Base64DecodeTask ¶
Return types:
bytes
func (*Base64DecodeTask) Type ¶
func (t *Base64DecodeTask) Type() TaskType
type Base64EncodeTask ¶
Return types:
string
func (*Base64EncodeTask) Type ¶
func (t *Base64EncodeTask) Type() TaskType
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout *time.Duration `mapstructure:"timeout"` FailEarly bool `mapstructure:"failEarly"` Retries null.Uint32 `mapstructure:"retries"` MinBackoff time.Duration `mapstructure:"minBackoff"` MaxBackoff time.Duration `mapstructure:"maxBackoff"` // contains filtered or unexported fields }
func NewBaseTask ¶
func (BaseTask) Inputs ¶
func (t BaseTask) Inputs() []TaskDependency
func (BaseTask) OutputIndex ¶
func (BaseTask) TaskMaxBackoff ¶
func (BaseTask) TaskMinBackoff ¶
func (BaseTask) TaskRetries ¶
type BridgeConfig ¶ added in v2.3.0
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData string `json:"requestData"` IncludeInputAtKey string `json:"includeInputAtKey"` Async string `json:"async"` CacheTTL string `json:"cacheTTL"` Headers string `json:"headers"` // contains filtered or unexported fields }
Return types:
string
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type BytesParam ¶
type BytesParam []byte
func (*BytesParam) UnmarshalPipelineParam ¶
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type CBORParseTask ¶
type CBORParseTask struct { BaseTask `mapstructure:",squash"` Data string `json:"data"` Mode string `json:"mode"` }
Return types:
map[string]interface{} with potential value types: float64 string bool map[string]interface{} []interface{} nil
func (*CBORParseTask) Type ¶
func (t *CBORParseTask) Type() TaskType
type ConditionalTask ¶
ConditionalTask checks if data is false for now this is all we need but in the future we can expand this to handle more general conditional statements
func (*ConditionalTask) Type ¶
func (t *ConditionalTask) Type() TaskType
type DecimalParam ¶
func (DecimalParam) Decimal ¶
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶
func (*DecimalSliceParam) UnmarshalPipelineParam ¶
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type DivideTask ¶
type DivideTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Divisor string `json:"divisor"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
func (*DivideTask) Type ¶
func (t *DivideTask) Type() TaskType
type ETHABIDecodeLogTask ¶
type ETHABIDecodeLogTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` Topics string `json:"topics"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeLogTask) Type ¶
func (t *ETHABIDecodeLogTask) Type() TaskType
type ETHABIDecodeTask ¶
type ETHABIDecodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeTask) Type ¶
func (t *ETHABIDecodeTask) Type() TaskType
type ETHABIEncodeTask ¶
type ETHABIEncodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask) Type ¶
func (t *ETHABIEncodeTask) Type() TaskType
type ETHABIEncodeTask2 ¶
type ETHABIEncodeTask2 struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask2) Type ¶
func (t *ETHABIEncodeTask2) Type() TaskType
type ETHCallTask ¶
type ETHCallTask struct { BaseTask `mapstructure:",squash"` Contract string `json:"contract"` From string `json:"from"` Data string `json:"data"` Gas string `json:"gas"` GasPrice string `json:"gasPrice"` GasTipCap string `json:"gasTipCap"` GasFeeCap string `json:"gasFeeCap"` GasUnlimited string `json:"gasUnlimited"` ExtractRevertReason bool `json:"extractRevertReason"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` Block string `json:"block"` // contains filtered or unexported fields }
Return types:
[]byte
func (*ETHCallTask) Type ¶
func (t *ETHCallTask) Type() TaskType
type ETHKeyStore ¶
type ETHTxTask ¶
type ETHTxTask struct { BaseTask `mapstructure:",squash"` From string `json:"from"` To string `json:"to"` Data string `json:"data"` GasLimit string `json:"gasLimit"` TxMeta string `json:"txMeta"` MinConfirmations string `json:"minConfirmations"` // FailOnRevert, if set, will error the task if the transaction reverted on-chain // If unset, the receipt will be passed as output // It has no effect if minConfirmations == 0 FailOnRevert string `json:"failOnRevert"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` TransmitChecker string `json:"transmitChecker"` // contains filtered or unexported fields }
Return types:
nil
type ErrRunPanicked ¶
type ErrRunPanicked struct {
// contains filtered or unexported fields
}
When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.
func (ErrRunPanicked) Error ¶
func (err ErrRunPanicked) Error() string
type EstimateGasLimitTask ¶
type EstimateGasLimitTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` From string `json:"from"` To string `json:"to"` Multiplier string `json:"multiplier"` Data string `json:"data"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` Block string `json:"block"` // contains filtered or unexported fields }
Return types:
uint64
func (*EstimateGasLimitTask) Type ¶
func (t *EstimateGasLimitTask) Type() TaskType
type FailTask ¶
FailTask is like the Panic task but without all the drama and stack unwinding of a panic
type FinalResult ¶
FinalResult is the result of a Run
func (FinalResult) CombinedError ¶
func (result FinalResult) CombinedError() error
func (FinalResult) HasErrors ¶
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) HasFatalErrors ¶
func (result FinalResult) HasFatalErrors() bool
HasFatalErrors returns true if the final result has any errors
func (FinalResult) SingularResult ¶
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GasEstimator ¶
type GetterFunc ¶
type GetterFunc func() (interface{}, error)
GetterFunc is a function that either returns a value or an error.
func From ¶
func From(getters ...interface{}) []GetterFunc
From creates []GetterFunc from a mix of getters or bare values.
func Input ¶
func Input(inputs []Result, index int) GetterFunc
Input creates a getter returning inputs[index] value, or error if index is out of range.
func Inputs ¶
func Inputs(inputs []Result) GetterFunc
Inputs creates a getter returning array of Result.Value (or Result.Error where not nil).
func JSONWithVarExprs ¶
func JSONWithVarExprs(jsExpr string, vars Vars, allowErrors bool) GetterFunc
JSONWithVarExprs creates a getter that unmarshals jsExpr string as JSON, and interpolates all variables expressions found in jsExpr from Vars. The getter returns the unmarshalled object having expressions interpolated from Vars. allowErrors flag indicates if interpolating values stored in Vars can be errors. jsExpr example: {"requestId": $(decode_log.requestId), "payment": $(decode_log.payment)}
func NonemptyString ¶
func NonemptyString(s string) GetterFunc
NonemptyString creates a getter to ensure the string is non-empty.
func ValidDurationInSeconds ¶
func ValidDurationInSeconds(s string) GetterFunc
ValidDurationInSeconds creates a getter to ensure the string is a valid duration and return duration in seconds.
func VarExpr ¶
func VarExpr(expr string, vars Vars) GetterFunc
VarExpr creates a getter interpolating expr value using the given Vars. The expression allows whitespace on both ends that will be trimmed. Expr examples: $(foo.bar), $(arr.1), $(bar)
type Graph ¶
type Graph struct {
*simple.DirectedGraph
}
tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.
func (*Graph) AddImplicitDependenciesAsEdges ¶
func (g *Graph) AddImplicitDependenciesAsEdges()
Looks at node attributes and searches for implicit dependencies on other nodes expressed as attribute values. Adds those dependencies as implicit edges in the graph.
func (*Graph) IsImplicitEdge ¶
Indicates whether there's an implicit edge from uid -> vid. Implicit edged are ones that weren't added via the TOML spec, but via the pipeline parsing code
func (*Graph) UnmarshalText ¶
type GraphNode ¶
func (*GraphNode) Attributes ¶
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL string RequestData string `json:"requestData"` AllowUnrestrictedNetworkAccess string Headers string // contains filtered or unexported fields }
Return types:
string
type HashSliceParam ¶
func (*HashSliceParam) UnmarshalPipelineParam ¶
func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error
type HexDecodeTask ¶
Return types:
bytes
func (*HexDecodeTask) Type ¶
func (t *HexDecodeTask) Type() TaskType
type HexEncodeTask ¶
Return types:
string
func (*HexEncodeTask) Type ¶
func (t *HexEncodeTask) Type() TaskType
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path string `json:"path"` Separator string `json:"separator"` Data string `json:"data"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax string }
Return types:
float64 string bool map[string]interface{} []interface{} nil
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONPathParam ¶
type JSONPathParam []string
func NewJSONPathParam ¶
func NewJSONPathParam(sep string) JSONPathParam
NewJSONPathParam returns a new JSONPathParam using the given separator, or the default if empty.
func (*JSONPathParam) UnmarshalPipelineParam ¶
func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error
UnmarshalPipelineParam unmarshals a slice of strings from val. If val is a string or []byte, it is split on a separator. The default separator is ',' but can be overridden by initializing via NewJSONPathParam.
type Keypath ¶
type Keypath struct {
Parts []string
}
Keypath contains keypath parsed by NewKeypathFromString.
func NewKeypathFromString ¶
NewKeypathFromString creates a new Keypath from the given string. Returns error if it fails to parse the given keypath string.
type LengthTask ¶
Return types:
*decimal.Decimal
func (*LengthTask) Type ¶
func (t *LengthTask) Type() TaskType
type LessThanTask ¶
type LessThanTask struct { BaseTask `mapstructure:",squash"` Left string `json:"input"` Right string `json:"limit"` }
Return types:
bool
func (*LessThanTask) Type ¶
func (t *LessThanTask) Type() TaskType
type LookupTask ¶
Look up a field on a map
Return types:
interface{}
func (*LookupTask) Type ¶
func (t *LookupTask) Type() TaskType
type LowercaseTask ¶
Return types:
string
func (*LowercaseTask) Type ¶
func (t *LowercaseTask) Type() TaskType
type MapParam ¶
type MapParam map[string]interface{}
MapParam accepts maps or JSON-encoded strings
func (*MapParam) UnmarshalPipelineParam ¶
type MaybeBigIntParam ¶
type MaybeBigIntParam struct {
// contains filtered or unexported fields
}
func NewMaybeBigIntParam ¶
func NewMaybeBigIntParam(n *big.Int) MaybeBigIntParam
NewMaybeBigIntParam creates a new instance of MaybeBigIntParam
func (MaybeBigIntParam) BigInt ¶
func (p MaybeBigIntParam) BigInt() *big.Int
func (*MaybeBigIntParam) UnmarshalPipelineParam ¶
func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error
type MaybeInt32Param ¶
type MaybeInt32Param struct {
// contains filtered or unexported fields
}
func NewMaybeInt32Param ¶
func NewMaybeInt32Param(n int32, isSet bool) MaybeInt32Param
NewMaybeInt32Param creates new instance of MaybeInt32Param
func (MaybeInt32Param) Int32 ¶
func (p MaybeInt32Param) Int32() (int32, bool)
func (*MaybeInt32Param) UnmarshalPipelineParam ¶
func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error
type MaybeUint64Param ¶
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func NewMaybeUint64Param ¶
func NewMaybeUint64Param(n uint64, isSet bool) MaybeUint64Param
NewMaybeUint64Param creates new instance of MaybeUint64Param
func (MaybeUint64Param) Uint64 ¶
func (p MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶
func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MeanTask ¶
type MeanTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MergeTask ¶
type MergeTask struct { BaseTask `mapstructure:",squash"` Left string `json:"left"` Right string `json:"right"` }
Return types:
map[string]interface{}
type Method ¶
go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...
type ModeTask ¶
type ModeTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
map[string]interface{}{ "results": []interface{} containing any other type other pipeline tasks can return "occurrences": (int64) }
type MultiplyTask ¶
type MultiplyTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Times string `json:"times"` }
Return types:
*decimal.Decimal
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { services.Service CreateSpec(pipeline Pipeline, maxTaskTimeout models.Interval, qopts ...pg.QOpt) (int32, error) CreateRun(run *Run, qopts ...pg.QOpt) (err error) InsertRun(run *Run, qopts ...pg.QOpt) error DeleteRun(id int64) error StoreRun(run *Run, qopts ...pg.QOpt) (restart bool, err error) UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, start bool, err error) InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error) InsertFinishedRunWithSpec(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error) // InsertFinishedRuns inserts all the given runs into the database. // If saveSuccessfulTaskRuns is false, only errored runs are saved. InsertFinishedRuns(run []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error) DeleteRunsOlderThan(context.Context, time.Duration) error FindRun(id int64) (Run, error) GetAllRuns() ([]Run, error) GetUnfinishedRuns(context.Context, time.Time, func(run Run) error) error GetQ() pg.Q }
type ObjectParam ¶
type ObjectParam struct { Type ObjectType BoolValue BoolParam DecimalValue DecimalParam StringValue StringParam SliceValue SliceParam MapValue MapParam }
ObjectParam represents a kind of any type that could be used by the memo task
func (ObjectParam) Marshal ¶
func (o ObjectParam) Marshal() (string, error)
func (ObjectParam) MarshalJSON ¶
func (o ObjectParam) MarshalJSON() ([]byte, error)
func (ObjectParam) String ¶
func (o ObjectParam) String() string
func (*ObjectParam) UnmarshalPipelineParam ¶
func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error
type ObjectType ¶
type ObjectType int
const ( NilType ObjectType = iota BoolType DecimalType StringType SliceType MapType )
type PipelineParamUnmarshaler ¶
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) ErrorDB ¶
func (result Result) ErrorDB() null.String
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶
func (result Result) OutputDB() jsonserializable.JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type ResumeRequest ¶
type ResumeRequest struct { Error null.String `json:"error"` Value json.RawMessage `json:"value"` }
func (ResumeRequest) ToResult ¶
func (rr ResumeRequest) ToResult() (Result, error)
type Run ¶
type Run struct { ID int64 `json:"-"` JobID int32 `json:"-"` PipelineSpecID int32 `json:"-"` PruningKey int32 `json:"-"` // This currently refers to the upstream job ID PipelineSpec Spec `json:"pipelineSpec"` Meta jsonserializable.JSONSerializable `json:"meta"` // The errors are only ever strings // DB example: [null, null, "my error"] AllErrors RunErrors `json:"all_errors"` FatalErrors RunErrors `json:"fatal_errors"` Inputs jsonserializable.JSONSerializable `json:"inputs"` // Its expected that Output.Val is of type []interface{}. // DB example: [1234, {"a": 10}, null] Outputs jsonserializable.JSONSerializable `json:"outputs"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns"` State RunStatus `json:"state"` Pending bool // FailSilently is used to signal that a task with the failEarly flag has failed, and we want to not put this in the db FailSilently bool }
func (Run) HasFatalErrors ¶
func (*Run) StringAllErrors ¶
func (*Run) StringFatalErrors ¶
func (*Run) StringOutputs ¶
type RunErrors ¶
type RunErrors []null.String
type RunStatus ¶
type RunStatus string
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = "unknown" // RunStatusRunning is used for when a run is actively being executed. RunStatusRunning RunStatus = "running" // RunStatusSuspended is used when a run is paused and awaiting further results. RunStatusSuspended RunStatus = "suspended" // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored RunStatus = "errored" // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted RunStatus = "completed" )
type Runner ¶
type Runner interface { services.Service // Run is a blocking call that will execute the run until no further progress can be made. // If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in. // Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks. Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error) ResumeRun(taskID uuid.UUID, value interface{}, err error) error // ExecuteRun executes a new run in-memory according to a spec and returns the results. // We expect spec.JobID and spec.JobName to be set for logging/prometheus. ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run *Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error InsertFinishedRuns(runs []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error // ExecuteAndInsertFinishedRun executes a new run in-memory according to a spec, persists and saves the results. // It is a combination of ExecuteRun and InsertFinishedRun. // Note that the spec MUST have a DOT graph for this to work. // This will persist the Spec in the DB if it doesn't have an ID. ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results TaskRunResults, err error) OnRunFinished(func(*Run)) InitializePipeline(spec Spec) (*Pipeline, error) }
type SliceParam ¶
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type Spec struct { ID int32 DotDagSource string `json:"dotDagSource"` CreatedAt time.Time `json:"-"` MaxTaskDuration models.Interval `json:"-"` GasLimit *uint32 `json:"-"` ForwardingAllowed bool `json:"-"` JobID int32 `json:"-"` JobName string `json:"-"` JobType string `json:"-"` Pipeline *Pipeline `json:"-" db:"-"` // This may be nil, or may be populated manually as a cache. There is no locking on this, so be careful }
func (*Spec) GetOrParsePipeline ¶ added in v2.9.0
func (*Spec) ParsePipeline ¶ added in v2.9.0
type StringParam ¶
type StringParam string
func (*StringParam) String ¶
func (s *StringParam) String() string
func (*StringParam) UnmarshalPipelineParam ¶
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type StringSliceParam ¶
type StringSliceParam []string
func (*StringSliceParam) UnmarshalPipelineParam ¶
func (s *StringSliceParam) UnmarshalPipelineParam(val interface{}) error
type SumTask ¶
type SumTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
type Task ¶
type Task interface { Type() TaskType ID() int DotID() string Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) Base() *BaseTask Outputs() []Task Inputs() []TaskDependency OutputIndex() int32 TaskTimeout() (time.Duration, bool) TaskRetries() uint32 TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration }
type TaskDependency ¶
Wraps the input Task for the given dependent task along with a bool variable PropagateResult, which Indicates whether result of InputTask should be propagated to its dependent task. If the edge between these tasks was an implicit edge, then results are not propagated. This is because some tasks cannot handle an input from an edge which wasn't specified in the spec.
type TaskRun ¶
type TaskRun struct { ID uuid.UUID `json:"id"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output jsonserializable.JSONSerializable `json:"output"` Error null.String `json:"error"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` Index int32 `json:"index"` DotID string `json:"dotId"` // contains filtered or unexported fields }
type TaskRunResult ¶
type TaskRunResult struct { ID uuid.UUID Task Task `json:"-"` TaskRun TaskRun `json:"-"` Result Result Attempts uint CreatedAt time.Time FinishedAt null.Time // contains filtered or unexported fields }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
func (*TaskRunResult) IsPending ¶
func (result *TaskRunResult) IsPending() bool
func (*TaskRunResult) IsTerminal ¶
func (result *TaskRunResult) IsTerminal() bool
type TaskRunResults ¶
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
func (*TaskRunResults) GetNextTaskOf ¶
func (trrs *TaskRunResults) GetNextTaskOf(task TaskRunResult) *TaskRunResult
GetNextTaskOf returns the task with the next id or nil if it does not exist
func (TaskRunResults) GetTaskRunResultsFinishedAt ¶ added in v2.11.0
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time
GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (TaskRunResults) Terminals ¶ added in v2.10.0
func (trrs TaskRunResults) Terminals() (terminals []TaskRunResult)
Terminals returns all terminal task run results
type TaskType ¶
type TaskType string
const ( TaskTypeAny TaskType = "any" TaskTypeBase64Decode TaskType = "base64decode" TaskTypeBase64Encode TaskType = "base64encode" TaskTypeBridge TaskType = "bridge" TaskTypeCBORParse TaskType = "cborparse" TaskTypeConditional TaskType = "conditional" TaskTypeDivide TaskType = "divide" TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" TaskTypeETHABIEncode TaskType = "ethabiencode" TaskTypeETHABIEncode2 TaskType = "ethabiencode2" TaskTypeETHCall TaskType = "ethcall" TaskTypeETHTx TaskType = "ethtx" TaskTypeEstimateGasLimit TaskType = "estimategaslimit" TaskTypeHTTP TaskType = "http" TaskTypeHexDecode TaskType = "hexdecode" TaskTypeHexEncode TaskType = "hexencode" TaskTypeJSONParse TaskType = "jsonparse" TaskTypeLength TaskType = "length" TaskTypeLessThan TaskType = "lessthan" TaskTypeLookup TaskType = "lookup" TaskTypeLowercase TaskType = "lowercase" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMerge TaskType = "merge" TaskTypeMode TaskType = "mode" TaskTypeMultiply TaskType = "multiply" TaskTypeSum TaskType = "sum" TaskTypeUppercase TaskType = "uppercase" TaskTypeVRF TaskType = "vrf" TaskTypeVRFV2 TaskType = "vrfv2" TaskTypeVRFV2Plus TaskType = "vrfv2plus" // Testing only. TaskTypePanic TaskType = "panic" TaskTypeMemo TaskType = "memo" TaskTypeFail TaskType = "fail" )
type URLParam ¶
func (*URLParam) UnmarshalPipelineParam ¶
type Uint64Param ¶
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
type UppercaseTask ¶
Return types:
string
func (*UppercaseTask) Type ¶
func (t *UppercaseTask) Type() TaskType
type VRFKeyStore ¶
type VRFTask ¶
type VRFTask struct { BaseTask `mapstructure:",squash"` PublicKey string `json:"publicKey"` RequestBlockHash string `json:"requestBlockHash"` RequestBlockNumber string `json:"requestBlockNumber"` Topics string `json:"topics"` // contains filtered or unexported fields }
type VRFTaskV2 ¶
type VRFTaskV2 struct { BaseTask `mapstructure:",squash"` PublicKey string `json:"publicKey"` RequestBlockHash string `json:"requestBlockHash"` RequestBlockNumber string `json:"requestBlockNumber"` Topics string `json:"topics"` // contains filtered or unexported fields }
type VRFTaskV2Plus ¶ added in v2.4.0
type VRFTaskV2Plus struct { BaseTask `mapstructure:",squash"` PublicKey string `json:"publicKey"` RequestBlockHash string `json:"requestBlockHash"` RequestBlockNumber string `json:"requestBlockNumber"` Topics string `json:"topics"` // contains filtered or unexported fields }
VRFTaskV2Plus is identical to VRFTaskV2 except that it uses the V2Plus VRF request commitment, which includes a boolean indicating whether native or link payment was used.
func (*VRFTaskV2Plus) Type ¶ added in v2.4.0
func (t *VRFTaskV2Plus) Type() TaskType
type Vars ¶
type Vars struct {
// contains filtered or unexported fields
}
func NewVarsFrom ¶
NewVarsFrom creates new Vars from the given map. If the map is nil, a new map instance will be created.
func (Vars) Copy ¶
Copy makes a copy of Vars by copying the underlying map. Used by scheduler for new tasks to avoid data races.
Source Files ¶
- common.go
- common_eth.go
- common_http.go
- getters.go
- graph.go
- keypath.go
- models.go
- orm.go
- runner.go
- scheduler.go
- task.any.go
- task.base.go
- task.base64decode.go
- task.base64encode.go
- task.bridge.go
- task.cborparse.go
- task.conditional.go
- task.divide.go
- task.estimategas.go
- task.eth_abi_decode.go
- task.eth_abi_decode_log.go
- task.eth_abi_encode.go
- task.eth_abi_encode_2.go
- task.eth_call.go
- task.eth_tx.go
- task.fail.go
- task.hexdecode.go
- task.hexencode.go
- task.http.go
- task.jsonparse.go
- task.length.go
- task.lessthan.go
- task.lookup.go
- task.lowercase.go
- task.mean.go
- task.median.go
- task.memo.go
- task.merge.go
- task.mode.go
- task.multiply.go
- task.panic.go
- task.sum.go
- task.uppercase.go
- task.vrf.go
- task.vrfv2.go
- task.vrfv2plus.go
- task_object_params.go
- task_params.go
- variables.go