pipeline

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2022 License: MIT Imports: 58 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InputTaskKey = "input"
)
View Source
const KeepersObservationSource = `` /* 2676-byte string literal not displayed */

KeepersObservationSource is the same for all keeper jobs and it is not perisisted in DB

View Source
const KeypathSeparator = "."

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
	ErrInputTaskErrored      = errors.New("input task errored")
	ErrParameterEmpty        = errors.New("parameter is empty")
	ErrIndexOutOfRange       = errors.New("index out of range")
	ErrTooManyErrors         = errors.New("too many errors")
	ErrTimeout               = errors.New("timeout")
	ErrTaskRunFailed         = errors.New("task run failed")
	ErrCancelled             = errors.New("task run cancelled (fail early)")
)
View Source
var (
	// PromPipelineTaskExecutionTime reports how long each pipeline task took to execute
	// TODO: Make private again after
	// https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline
	PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_task_execution_time",
		Help: "How long each pipeline task took to execute",
	},
		[]string{"job_id", "job_name", "task_id", "task_type"},
	)
	PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_run_errors",
		Help: "Number of errors for each pipeline spec",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_run_total_time_to_completion",
		Help: "How long each pipeline run took to finish (from the moment it was created)",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_tasks_total_finished",
		Help: "The total number of pipeline tasks which have finished",
	},
		[]string{"job_id", "job_name", "task_id", "task_type", "status"},
	)
)
View Source
var (
	ErrDivideByZero    = errors.New("divide by zero")
	ErrDivisionOverlow = errors.New("division overflow")
)
View Source
var (
	ErrKeypathNotFound = errors.New("keypath not found")
	ErrVarsRoot        = errors.New("cannot get/set the root of a pipeline.Vars")
	ErrVarsSetNested   = errors.New("cannot set a nested key of a pipeline.Vars")
)
View Source
var (
	ErrInvalidMultiplier = errors.New("Invalid multiplier")
)
View Source
var (
	ErrMultiplyOverlow = errors.New("multiply overflow")
)
View Source
var ErrOverflow = errors.New("overflow")
View Source
var (
	ErrWrongKeypath = errors.New("wrong keypath format")
)

Functions

func CheckInputs added in v0.10.8

func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)

func NewORM

func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig) *orm

func NewRunner

func NewRunner(orm ORM, config Config, chainSet evm.ChainSet, ethks ETHKeyStore, vrfks VRFKeyStore, lggr logger.Logger, httpClient, unrestrictedHTTPClient *http.Client) *runner

func ParseETHABIArgsString added in v1.1.0

func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)

func ResolveParam added in v0.10.8

func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error

Types

type AddressParam added in v0.10.9

type AddressParam common.Address

func (*AddressParam) UnmarshalPipelineParam added in v0.10.9

func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error

type AddressSliceParam added in v0.10.9

type AddressSliceParam []common.Address

func (*AddressSliceParam) UnmarshalPipelineParam added in v0.10.9

func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error

type AnyTask added in v0.10.0

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run added in v0.10.0

func (t *AnyTask) Run(_ context.Context, _ logger.Logger, _ Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*AnyTask) Type added in v0.10.0

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index     int32          `mapstructure:"index" json:"-" `
	Timeout   *time.Duration `mapstructure:"timeout"`
	FailEarly bool           `mapstructure:"failEarly"`

	Retries    null.Uint32   `mapstructure:"retries"`
	MinBackoff time.Duration `mapstructure:"minBackoff"`
	MaxBackoff time.Duration `mapstructure:"maxBackoff"`
	// contains filtered or unexported fields
}

func NewBaseTask added in v0.9.9

func NewBaseTask(id int, dotID string, inputs []TaskDependency, outputs []Task, index int32) BaseTask

func (*BaseTask) Base added in v0.10.9

func (t *BaseTask) Base() *BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) ID added in v0.10.9

func (t BaseTask) ID() int

func (BaseTask) Inputs added in v0.10.9

func (t BaseTask) Inputs() []TaskDependency

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) Outputs added in v0.10.9

func (t BaseTask) Outputs() []Task

func (BaseTask) TaskMaxBackoff added in v1.0.0

func (t BaseTask) TaskMaxBackoff() time.Duration

func (BaseTask) TaskMinBackoff added in v1.0.0

func (t BaseTask) TaskMinBackoff() time.Duration

func (BaseTask) TaskRetries added in v1.0.0

func (t BaseTask) TaskRetries() uint32

func (BaseTask) TaskTimeout added in v0.9.7

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BoolParam added in v0.10.8

type BoolParam bool

func (*BoolParam) UnmarshalPipelineParam added in v0.10.8

func (b *BoolParam) UnmarshalPipelineParam(val interface{}) error

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name              string `json:"name"`
	RequestData       string `json:"requestData"`
	IncludeInputAtKey string `json:"includeInputAtKey"`
	Async             string `json:"async"`
	// contains filtered or unexported fields
}

Return types:

string

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type BytesParam added in v0.10.8

type BytesParam []byte

func (*BytesParam) UnmarshalPipelineParam added in v0.10.8

func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error

type CBORParseTask added in v0.10.9

type CBORParseTask struct {
	BaseTask `mapstructure:",squash"`
	Data     string `json:"data"`
	Mode     string `json:"mode"`
}

Return types:

map[string]interface{} with potential value types:
    float64
    string
    bool
    map[string]interface{}
    []interface{}
    nil

func (*CBORParseTask) Run added in v0.10.9

func (t *CBORParseTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*CBORParseTask) Type added in v0.10.9

func (t *CBORParseTask) Type() TaskType

type ConditionalTask added in v1.5.0

type ConditionalTask struct {
	BaseTask `mapstructure:",squash"`
	Data     string `json:"data"`
}

ConditionalTask checks if data is false for now this is all we need but in the future we can expand this to handle more general conditional statements

func (*ConditionalTask) Run added in v1.5.0

func (t *ConditionalTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ConditionalTask) Type added in v1.5.0

func (t *ConditionalTask) Type() TaskType

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseURL() url.URL
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineMaxRunDuration() time.Duration
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type DecimalParam added in v0.10.8

type DecimalParam decimal.Decimal

func (DecimalParam) Decimal added in v0.10.8

func (d DecimalParam) Decimal() decimal.Decimal

func (*DecimalParam) UnmarshalPipelineParam added in v0.10.8

func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error

type DecimalSliceParam added in v0.10.8

type DecimalSliceParam []decimal.Decimal

func (*DecimalSliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error

type DivideTask added in v0.10.9

type DivideTask struct {
	BaseTask  `mapstructure:",squash"`
	Input     string `json:"input"`
	Divisor   string `json:"divisor"`
	Precision string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*DivideTask) Run added in v0.10.9

func (t *DivideTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*DivideTask) Type added in v0.10.9

func (t *DivideTask) Type() TaskType

type ETHABIDecodeLogTask added in v0.10.9

type ETHABIDecodeLogTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
	Topics   string `json:"topics"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeLogTask) Run added in v0.10.9

func (t *ETHABIDecodeLogTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIDecodeLogTask) Type added in v0.10.9

func (t *ETHABIDecodeLogTask) Type() TaskType

type ETHABIDecodeTask added in v0.10.9

type ETHABIDecodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeTask) Run added in v0.10.9

func (t *ETHABIDecodeTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIDecodeTask) Type added in v0.10.9

func (t *ETHABIDecodeTask) Type() TaskType

type ETHABIEncodeTask added in v0.10.9

type ETHABIEncodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

[]byte

func (*ETHABIEncodeTask) Run added in v0.10.9

func (t *ETHABIEncodeTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIEncodeTask) Type added in v0.10.9

func (t *ETHABIEncodeTask) Type() TaskType

type ETHABIEncodeTask2 added in v1.1.0

type ETHABIEncodeTask2 struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

[]byte

func (*ETHABIEncodeTask2) Run added in v1.1.0

func (t *ETHABIEncodeTask2) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (Result, RunInfo)

func (*ETHABIEncodeTask2) Type added in v1.1.0

func (t *ETHABIEncodeTask2) Type() TaskType

type ETHCallTask added in v0.10.9

type ETHCallTask struct {
	BaseTask            `mapstructure:",squash"`
	Contract            string `json:"contract"`
	From                string `json:"from"`
	Data                string `json:"data"`
	Gas                 string `json:"gas"`
	GasPrice            string `json:"gasPrice"`
	GasTipCap           string `json:"gasTipCap"`
	GasFeeCap           string `json:"gasFeeCap"`
	ExtractRevertReason bool   `json:"extractRevertReason"`
	EVMChainID          string `json:"evmChainID" mapstructure:"evmChainID"`
	// contains filtered or unexported fields
}

Return types:

[]byte

func (*ETHCallTask) Run added in v0.10.9

func (t *ETHCallTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHCallTask) Type added in v0.10.9

func (t *ETHCallTask) Type() TaskType

type ETHKeyStore added in v0.10.11

type ETHKeyStore interface {
	GetRoundRobinAddress(chainID *big.Int, addrs ...common.Address) (common.Address, error)
}

type ETHTxTask added in v0.10.9

type ETHTxTask struct {
	BaseTask         `mapstructure:",squash"`
	From             string `json:"from"`
	To               string `json:"to"`
	Data             string `json:"data"`
	GasLimit         string `json:"gasLimit"`
	TxMeta           string `json:"txMeta"`
	MinConfirmations string `json:"minConfirmations"`
	// FailOnRevert, if set, will error the task if the transaction reverted on-chain
	// If unset, the receipt will be passed as output
	// It has no effect if minConfirmations == 0
	FailOnRevert    string `json:"failOnRevert"`
	EVMChainID      string `json:"evmChainID" mapstructure:"evmChainID"`
	TransmitChecker string `json:"transmitChecker"`
	// contains filtered or unexported fields
}

Return types:

nil

func (*ETHTxTask) Run added in v0.10.9

func (t *ETHTxTask) Run(_ context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHTxTask) Type added in v0.10.9

func (t *ETHTxTask) Type() TaskType

type ErrRunPanicked added in v0.10.4

type ErrRunPanicked struct {
	// contains filtered or unexported fields
}

When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.

func (ErrRunPanicked) Error added in v0.10.9

func (err ErrRunPanicked) Error() string

type EstimateGasLimitTask added in v1.0.0

type EstimateGasLimitTask struct {
	BaseTask   `mapstructure:",squash"`
	Input      string `json:"input"`
	From       string `json:"from"`
	To         string `json:"to"`
	Multiplier string `json:"multiplier"`
	Data       string `json:"data"`
	EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"`
	// contains filtered or unexported fields
}

Return types:

uint64

func (*EstimateGasLimitTask) Run added in v1.0.0

func (t *EstimateGasLimitTask) Run(_ context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*EstimateGasLimitTask) Type added in v1.0.0

func (t *EstimateGasLimitTask) Type() TaskType

type FailTask added in v1.1.0

type FailTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

FailTask is like the Panic task but without all the drama and stack unwinding of a panic

func (*FailTask) Run added in v1.1.0

func (t *FailTask) Run(_ context.Context, _ logger.Logger, vars Vars, _ []Result) (Result, RunInfo)

func (*FailTask) Type added in v1.1.0

func (t *FailTask) Type() TaskType

type FinalResult added in v0.9.10

type FinalResult struct {
	Values      []interface{}
	AllErrors   []error
	FatalErrors []error
}

FinalResult is the result of a Run

func (FinalResult) HasErrors added in v0.9.10

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) HasFatalErrors added in v1.1.0

func (result FinalResult) HasFatalErrors() bool

HasFatalErrors returns true if the final result has any errors

func (FinalResult) SingularResult added in v0.9.10

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type GasEstimator added in v1.0.0

type GasEstimator interface {
	EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint64, error)
}

type GetterFunc added in v0.10.8

type GetterFunc func() (interface{}, error)

GetterFunc is a function that either returns a value or an error.

func From added in v0.10.8

func From(getters ...interface{}) []GetterFunc

From creates []GetterFunc from a mix of getters or bare values.

func Input added in v0.10.8

func Input(inputs []Result, index int) GetterFunc

Input creates a getter returning inputs[index] value, or error if index is out of range.

func Inputs added in v0.10.8

func Inputs(inputs []Result) GetterFunc

Inputs creates a getter returning array of Result.Value (or Result.Error where not nil).

func JSONWithVarExprs added in v0.10.8

func JSONWithVarExprs(jsExpr string, vars Vars, allowErrors bool) GetterFunc

JSONWithVarExprs creates a getter that unmarshals jsExpr string as JSON, and interpolates all variables expressions found in jsExpr from Vars. The getter returns the unmarshalled object having expressions interpolated from Vars. allowErrors flag indicates if interpolating values stored in Vars can be errors. jsExpr example: {"requestId": $(decode_log.requestId), "payment": $(decode_log.payment)}

func NonemptyString added in v0.10.8

func NonemptyString(s string) GetterFunc

NonemptyString creates a getter to ensure the string is non-empty.

func VarExpr added in v0.10.8

func VarExpr(expr string, vars Vars) GetterFunc

VarExpr creates a getter interpolating expr value using the given Vars. The expression allows whitespace on both ends that will be trimmed. Expr examples: $(foo.bar), $(arr.1), $(bar)

type Graph added in v0.10.9

type Graph struct {
	*simple.DirectedGraph
}

tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.

func NewGraph added in v0.10.9

func NewGraph() *Graph

func (*Graph) AddImplicitDependenciesAsEdges added in v1.2.0

func (g *Graph) AddImplicitDependenciesAsEdges()

Looks at node attributes and searches for implicit dependencies on other nodes expressed as attribute values. Adds those dependencies as implicit edges in the graph.

func (*Graph) IsImplicitEdge added in v1.2.0

func (g *Graph) IsImplicitEdge(uid, vid int64) bool

Indicates whether there's an implicit edge from uid -> vid. Implicit edged are ones that weren't added via the TOML spec, but via the pipeline parsing code

func (*Graph) NewEdge added in v1.2.0

func (g *Graph) NewEdge(from, to graph.Node) graph.Edge

func (*Graph) NewNode added in v0.10.9

func (g *Graph) NewNode() graph.Node

func (*Graph) UnmarshalText added in v0.10.9

func (g *Graph) UnmarshalText(bs []byte) (err error)

type GraphEdge added in v1.2.0

type GraphEdge struct {
	graph.Edge
	// contains filtered or unexported fields
}

func (*GraphEdge) IsImplicit added in v1.2.0

func (e *GraphEdge) IsImplicit() bool

func (*GraphEdge) SetIsImplicit added in v1.2.0

func (e *GraphEdge) SetIsImplicit(isImplicit bool)

type GraphNode added in v0.10.9

type GraphNode struct {
	graph.Node
	// contains filtered or unexported fields
}

func (*GraphNode) Attributes added in v0.10.9

func (n *GraphNode) Attributes() []encoding.Attribute

func (*GraphNode) DOTID added in v0.10.9

func (n *GraphNode) DOTID() string

func (*GraphNode) SetAttribute added in v0.10.9

func (n *GraphNode) SetAttribute(attr encoding.Attribute) error

func (*GraphNode) SetDOTID added in v0.10.9

func (n *GraphNode) SetDOTID(id string)

func (*GraphNode) String added in v0.10.9

func (n *GraphNode) String() string

type HTTPTask

type HTTPTask struct {
	BaseTask                       `mapstructure:",squash"`
	Method                         string
	URL                            string
	RequestData                    string `json:"requestData"`
	AllowUnrestrictedNetworkAccess string
	Headers                        string
	// contains filtered or unexported fields
}

Return types:

string

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HashSliceParam added in v0.10.9

type HashSliceParam []common.Hash

func (*HashSliceParam) UnmarshalPipelineParam added in v0.10.9

func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error

type JSONParseTask

type JSONParseTask struct {
	BaseTask  `mapstructure:",squash"`
	Path      string `json:"path"`
	Separator string `json:"separator"`
	Data      string `json:"data"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax string
}

Return types:

float64
string
bool
map[string]interface{}
[]interface{}
nil

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPathParam added in v0.10.9

type JSONPathParam []string

func NewJSONPathParam added in v1.4.0

func NewJSONPathParam(sep string) JSONPathParam

NewJSONPathParam returns a new JSONPathParam using the given separator, or the default if empty.

func (*JSONPathParam) UnmarshalPipelineParam added in v0.10.9

func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error

UnmarshalPipelineParam unmarshals a slice of strings from val. If val is a string or []byte, it is split on a separator. The default separator is ',' but can be overridden by initializing via NewJSONPathParam.

type JSONSerializable

type JSONSerializable struct {
	Val   interface{}
	Valid bool
}

func (*JSONSerializable) Empty added in v0.10.10

func (js *JSONSerializable) Empty() bool

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

MarshalJSON implements custom marshaling logic

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

UnmarshalJSON implements custom unmarshaling logic

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type Keypath added in v0.10.8

type Keypath struct {
	NumParts int    // can be 0, 1 or 2
	Part0    string // can be empty string if NumParts is 0
	Part1    string // can be empty string if NumParts is 0 or 1
}

Keypath contains keypath parsed by NewKeypathFromString.

func NewKeypathFromString added in v1.4.0

func NewKeypathFromString(keypathStr string) (Keypath, error)

NewKeypathFromString creates a new Keypath from the given string. Returns error if it fails to parse the given keypath string.

type LowercaseTask added in v1.2.0

type LowercaseTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
}

Return types:

string

func (*LowercaseTask) Run added in v1.2.0

func (t *LowercaseTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*LowercaseTask) Type added in v1.2.0

func (t *LowercaseTask) Type() TaskType

type MapParam added in v0.10.8

type MapParam map[string]interface{}

MapParam accepts maps or JSON-encoded strings

func (MapParam) Map added in v1.1.0

func (m MapParam) Map() map[string]interface{}

func (*MapParam) UnmarshalPipelineParam added in v0.10.8

func (m *MapParam) UnmarshalPipelineParam(val interface{}) error

type MaybeBigIntParam added in v1.1.0

type MaybeBigIntParam struct {
	// contains filtered or unexported fields
}

func NewMaybeBigIntParam added in v1.4.0

func NewMaybeBigIntParam(n *big.Int) MaybeBigIntParam

NewMaybeBigIntParam creates a new instance of MaybeBigIntParam

func (MaybeBigIntParam) BigInt added in v1.1.0

func (p MaybeBigIntParam) BigInt() *big.Int

func (*MaybeBigIntParam) UnmarshalPipelineParam added in v1.1.0

func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error

type MaybeInt32Param added in v0.10.9

type MaybeInt32Param struct {
	// contains filtered or unexported fields
}

func NewMaybeInt32Param added in v1.4.0

func NewMaybeInt32Param(n int32, isSet bool) MaybeInt32Param

NewMaybeInt32Param creates new instance of MaybeInt32Param

func (MaybeInt32Param) Int32 added in v0.10.9

func (p MaybeInt32Param) Int32() (int32, bool)

func (*MaybeInt32Param) UnmarshalPipelineParam added in v0.10.9

func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error

type MaybeUint64Param added in v0.10.8

type MaybeUint64Param struct {
	// contains filtered or unexported fields
}

func NewMaybeUint64Param added in v1.4.0

func NewMaybeUint64Param(n uint64, isSet bool) MaybeUint64Param

NewMaybeUint64Param creates new instance of MaybeUint64Param

func (MaybeUint64Param) Uint64 added in v0.10.8

func (p MaybeUint64Param) Uint64() (uint64, bool)

func (*MaybeUint64Param) UnmarshalPipelineParam added in v0.10.8

func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error

type MeanTask added in v0.10.9

type MeanTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
	Precision     string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*MeanTask) Run added in v0.10.9

func (t *MeanTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MeanTask) Type added in v0.10.9

func (t *MeanTask) Type() TaskType

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MemoTask added in v1.1.0

type MemoTask struct {
	BaseTask `mapstructure:",squash"`
	Value    string `json:"value"`
}

func (*MemoTask) Run added in v1.1.0

func (t *MemoTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (Result, RunInfo)

func (*MemoTask) Type added in v1.1.0

func (t *MemoTask) Type() TaskType

type MergeTask added in v1.1.0

type MergeTask struct {
	BaseTask `mapstructure:",squash"`
	Left     string `json:"left"`
	Right    string `json:"right"`
}

Return types:

map[string]interface{}

func (*MergeTask) Run added in v1.1.0

func (t *MergeTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MergeTask) Type added in v1.1.0

func (t *MergeTask) Type() TaskType

type Method added in v1.1.0

type Method struct {
	Name   string
	Inputs abi.Arguments
}

go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...

type ModeTask added in v0.10.9

type ModeTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

map[string]interface{}{
    "results": []interface{} containing any other type other pipeline tasks can return
    "occurrences": (int64)
}

func (*ModeTask) Run added in v0.10.9

func (t *ModeTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ModeTask) Type added in v0.10.9

func (t *ModeTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
	Times    string `json:"times"`
}

Return types:

*decimal.Decimal

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(pipeline Pipeline, maxTaskTimeout models.Interval, qopts ...pg.QOpt) (int32, error)
	CreateRun(run *Run, qopts ...pg.QOpt) (err error)
	InsertRun(run *Run, qopts ...pg.QOpt) error
	DeleteRun(id int64) error
	StoreRun(run *Run, qopts ...pg.QOpt) (restart bool, err error)
	UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, start bool, err error)
	InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error)

	// InsertFinishedRuns inserts all the given runs into the database.
	// If saveSuccessfulTaskRuns is false, only errored runs are saved.
	InsertFinishedRuns(run []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) (err error)

	DeleteRunsOlderThan(context.Context, time.Duration) error
	FindRun(id int64) (Run, error)
	GetAllRuns() ([]Run, error)
	GetUnfinishedRuns(context.Context, time.Time, func(run Run) error) error
	GetQ() pg.Q
}

type ObjectParam added in v1.1.0

type ObjectParam struct {
	Type         ObjectType
	BoolValue    BoolParam
	DecimalValue DecimalParam
	StringValue  StringParam
	SliceValue   SliceParam
	MapValue     MapParam
}

ObjectParam represents a kind of any type that could be used by the memo task

func (ObjectParam) Marshal added in v1.1.0

func (o ObjectParam) Marshal() (string, error)

func (ObjectParam) MarshalJSON added in v1.1.0

func (o ObjectParam) MarshalJSON() ([]byte, error)

func (ObjectParam) String added in v1.1.0

func (o ObjectParam) String() string

func (*ObjectParam) UnmarshalPipelineParam added in v1.1.0

func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error

type ObjectType added in v1.1.0

type ObjectType int
const (
	NilType ObjectType = iota
	BoolType
	DecimalType
	StringType
	SliceType
	MapType
)

type PanicTask added in v0.10.4

type PanicTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

func (*PanicTask) Run added in v0.10.4

func (t *PanicTask) Run(_ context.Context, _ logger.Logger, vars Vars, _ []Result) (result Result, runInfo RunInfo)

func (*PanicTask) Type added in v0.10.4

func (t *PanicTask) Type() TaskType

type Pipeline added in v0.10.9

type Pipeline struct {
	Tasks []Task

	Source string
	// contains filtered or unexported fields
}

func Parse added in v0.10.9

func Parse(text string) (*Pipeline, error)

func (*Pipeline) ByDotID added in v0.10.10

func (p *Pipeline) ByDotID(id string) Task

func (*Pipeline) MinTimeout added in v0.10.9

func (p *Pipeline) MinTimeout() (time.Duration, bool, error)

func (*Pipeline) RequiresPreInsert added in v1.0.0

func (p *Pipeline) RequiresPreInsert() bool

func (*Pipeline) UnmarshalText added in v0.10.9

func (p *Pipeline) UnmarshalText(bs []byte) (err error)

type PipelineParamUnmarshaler added in v0.10.8

type PipelineParamUnmarshaler interface {
	UnmarshalPipelineParam(val interface{}) error
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB added in v0.9.10

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB added in v0.9.10

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type ResumeRequest added in v1.1.0

type ResumeRequest struct {
	Error null.String     `json:"error"`
	Value json.RawMessage `json:"value"`
}

func (ResumeRequest) ToResult added in v1.1.0

func (rr ResumeRequest) ToResult() (Result, error)

type Run

type Run struct {
	ID             int64            `json:"-"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"pipelineSpec"`
	Meta           JSONSerializable `json:"meta"`
	// The errors are only ever strings
	// DB example: [null, null, "my error"]
	AllErrors   RunErrors        `json:"all_errors"`
	FatalErrors RunErrors        `json:"fatal_errors"`
	Inputs      JSONSerializable `json:"inputs"`
	// Its expected that Output.Val is of type []interface{}.
	// DB example: [1234, {"a": 10}, null]
	Outputs          JSONSerializable `json:"outputs"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       null.Time        `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns"`
	State            RunStatus        `json:"state"`

	Pending bool
	// FailSilently is used to signal that a task with the failEarly flag has failed, and we want to not put this in the db
	FailSilently bool
}

func NewRun added in v0.10.3

func NewRun(spec Spec, vars Vars) Run

func (*Run) ByDotID added in v0.10.10

func (r *Run) ByDotID(id string) *TaskRun

func (Run) GetID added in v0.9.6

func (r Run) GetID() string

func (Run) HasErrors added in v0.9.10

func (r Run) HasErrors() bool

func (Run) HasFatalErrors added in v1.1.0

func (r Run) HasFatalErrors() bool

func (*Run) SetID added in v0.9.6

func (r *Run) SetID(value string) error

func (*Run) Status added in v0.10.3

func (r *Run) Status() RunStatus

Status determines the status of the run.

func (*Run) StringAllErrors added in v1.1.0

func (r *Run) StringAllErrors() []*string

func (*Run) StringFatalErrors added in v1.1.0

func (r *Run) StringFatalErrors() []*string

func (*Run) StringOutputs added in v1.1.0

func (r *Run) StringOutputs() ([]*string, error)

type RunErrors added in v0.10.4

type RunErrors []null.String

func (RunErrors) HasError added in v0.10.4

func (re RunErrors) HasError() bool

func (*RunErrors) Scan added in v0.10.4

func (re *RunErrors) Scan(value interface{}) error

func (RunErrors) ToError added in v1.2.0

func (re RunErrors) ToError() error

ToError coalesces all non-nil errors into a single error object. This is useful for logging.

func (RunErrors) Value added in v0.10.4

func (re RunErrors) Value() (driver.Value, error)

type RunInfo added in v1.1.0

type RunInfo struct {
	IsRetryable bool
	IsPending   bool
}

RunInfo contains additional information about the finished TaskRun

type RunStatus added in v0.10.3

type RunStatus string

RunStatus represents the status of a run

const (
	// RunStatusUnknown is the when the run status cannot be determined.
	RunStatusUnknown RunStatus = "unknown"
	// RunStatusRunning is used for when a run is actively being executed.
	RunStatusRunning RunStatus = "running"
	// RunStatusSuspended is used when a run is paused and awaiting further results.
	RunStatusSuspended RunStatus = "suspended"
	// RunStatusErrored is used for when a run has errored and will not complete.
	RunStatusErrored RunStatus = "errored"
	// RunStatusCompleted is used for when a run has successfully completed execution.
	RunStatusCompleted RunStatus = "completed"
)

func (RunStatus) Completed added in v0.10.3

func (s RunStatus) Completed() bool

Completed returns true if the status is RunStatusCompleted.

func (RunStatus) Errored added in v0.10.3

func (s RunStatus) Errored() bool

Errored returns true if the status is RunStatusErrored.

func (RunStatus) Finished added in v0.10.3

func (s RunStatus) Finished() bool

Finished returns true if the status is final and can't be changed.

type Runner

type Runner interface {
	services.ServiceCtx

	// Run is a blocking call that will execute the run until no further progress can be made.
	// If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in.
	// Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks.
	Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error)
	ResumeRun(taskID uuid.UUID, value interface{}, err error) error

	// We expect spec.JobID and spec.JobName to be set for logging/prometheus.
	// ExecuteRun executes a new run in-memory according to a spec and returns the results.
	ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error)
	// InsertFinishedRun saves the run results in the database.
	InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error
	InsertFinishedRuns(runs []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error

	// ExecuteAndInsertFinishedRun executes a new run in-memory according to a spec, persists and saves the results.
	// It is a combination of ExecuteRun and InsertFinishedRun.
	// Note that the spec MUST have a DOT graph for this to work.
	ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error)

	OnRunFinished(func(*Run))
}

type SliceParam added in v0.10.8

type SliceParam []interface{}

func (SliceParam) FilterErrors added in v0.10.8

func (s SliceParam) FilterErrors() (SliceParam, int)

func (*SliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error

type Spec

type Spec struct {
	ID              int32
	DotDagSource    string          `json:"dotDagSource"`
	CreatedAt       time.Time       `json:"-"`
	MaxTaskDuration models.Interval `json:"-"`

	JobID   int32  `json:"-"`
	JobName string `json:"-"`
	JobType string `json:"-"`
}

func (Spec) Pipeline added in v0.10.9

func (s Spec) Pipeline() (*Pipeline, error)

type StringParam added in v0.10.8

type StringParam string

func (*StringParam) String added in v1.5.0

func (s *StringParam) String() string

func (*StringParam) UnmarshalPipelineParam added in v0.10.8

func (s *StringParam) UnmarshalPipelineParam(val interface{}) error

type StringSliceParam added in v0.10.8

type StringSliceParam []string

func (*StringSliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *StringSliceParam) UnmarshalPipelineParam(val interface{}) error

type SumTask added in v0.10.9

type SumTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*SumTask) Run added in v0.10.9

func (t *SumTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*SumTask) Type added in v0.10.9

func (t *SumTask) Type() TaskType

type Task

type Task interface {
	Type() TaskType
	ID() int
	DotID() string
	Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo)
	Base() *BaseTask
	Outputs() []Task
	Inputs() []TaskDependency
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
	TaskRetries() uint32
	TaskMinBackoff() time.Duration
	TaskMaxBackoff() time.Duration
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID string) (_ Task, err error)

type TaskDependency added in v1.2.0

type TaskDependency struct {
	PropagateResult bool
	InputTask       Task
}

Wraps the input Task for the given dependent task along with a bool variable PropagateResult, which Indicates whether result of InputTask should be propagated to its dependent task. If the edge between these tasks was an implicit edge, then results are not propagated. This is because some tasks cannot handle an input from an edge which wasn't specified in the spec.

type TaskRun

type TaskRun struct {
	ID            uuid.UUID        `json:"id"`
	Type          TaskType         `json:"type"`
	PipelineRun   Run              `json:"-"`
	PipelineRunID int64            `json:"-"`
	Output        JSONSerializable `json:"output"`
	Error         null.String      `json:"error"`
	CreatedAt     time.Time        `json:"createdAt"`
	FinishedAt    null.Time        `json:"finishedAt"`
	Index         int32            `json:"index"`
	DotID         string           `json:"dotId"`
	// contains filtered or unexported fields
}

func (TaskRun) GetDotID added in v0.10.4

func (tr TaskRun) GetDotID() string

func (TaskRun) GetID added in v0.9.6

func (tr TaskRun) GetID() string

func (*TaskRun) IsPending added in v0.10.10

func (tr *TaskRun) IsPending() bool

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID added in v0.9.6

func (tr *TaskRun) SetID(value string) error

type TaskRunResult added in v0.9.10

type TaskRunResult struct {
	ID         uuid.UUID
	Task       Task
	TaskRun    TaskRun
	Result     Result
	Attempts   uint
	CreatedAt  time.Time
	FinishedAt null.Time
	// contains filtered or unexported fields
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

func (*TaskRunResult) IsPending added in v0.10.10

func (result *TaskRunResult) IsPending() bool

func (*TaskRunResult) IsTerminal added in v0.9.10

func (result *TaskRunResult) IsTerminal() bool

type TaskRunResults added in v0.9.10

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult added in v0.9.10

func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult

FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task

type TaskType

type TaskType string
const (
	TaskTypeHTTP             TaskType = "http"
	TaskTypeBridge           TaskType = "bridge"
	TaskTypeMean             TaskType = "mean"
	TaskTypeMedian           TaskType = "median"
	TaskTypeMode             TaskType = "mode"
	TaskTypeSum              TaskType = "sum"
	TaskTypeMultiply         TaskType = "multiply"
	TaskTypeDivide           TaskType = "divide"
	TaskTypeJSONParse        TaskType = "jsonparse"
	TaskTypeCBORParse        TaskType = "cborparse"
	TaskTypeAny              TaskType = "any"
	TaskTypeVRF              TaskType = "vrf"
	TaskTypeVRFV2            TaskType = "vrfv2"
	TaskTypeEstimateGasLimit TaskType = "estimategaslimit"
	TaskTypeETHCall          TaskType = "ethcall"
	TaskTypeETHTx            TaskType = "ethtx"
	TaskTypeETHABIEncode     TaskType = "ethabiencode"
	TaskTypeETHABIEncode2    TaskType = "ethabiencode2"
	TaskTypeETHABIDecode     TaskType = "ethabidecode"
	TaskTypeETHABIDecodeLog  TaskType = "ethabidecodelog"
	TaskTypeMerge            TaskType = "merge"
	TaskTypeLowercase        TaskType = "lowercase"
	TaskTypeUppercase        TaskType = "uppercase"
	TaskTypeConditional      TaskType = "conditional"

	// Testing only.
	TaskTypePanic TaskType = "panic"
	TaskTypeMemo  TaskType = "memo"
	TaskTypeFail  TaskType = "fail"
)

func (TaskType) String added in v0.10.6

func (t TaskType) String() string

type URLParam added in v0.10.8

type URLParam url.URL

func (*URLParam) String added in v0.10.8

func (u *URLParam) String() string

func (*URLParam) UnmarshalPipelineParam added in v0.10.8

func (u *URLParam) UnmarshalPipelineParam(val interface{}) error

type Uint64Param added in v0.10.8

type Uint64Param uint64

func (*Uint64Param) UnmarshalPipelineParam added in v0.10.8

func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error

type UppercaseTask added in v1.2.0

type UppercaseTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
}

Return types:

string

func (*UppercaseTask) Run added in v1.2.0

func (t *UppercaseTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*UppercaseTask) Type added in v1.2.0

func (t *UppercaseTask) Type() TaskType

type VRFKeyStore added in v0.10.11

type VRFKeyStore interface {
	GenerateProof(id string, seed *big.Int) (vrfkey.Proof, error)
}

type VRFTask added in v0.10.8

type VRFTask struct {
	BaseTask           `mapstructure:",squash"`
	PublicKey          string `json:"publicKey"`
	RequestBlockHash   string `json:"requestBlockHash"`
	RequestBlockNumber string `json:"requestBlockNumber"`
	Topics             string `json:"topics"`
	// contains filtered or unexported fields
}

func (*VRFTask) Run added in v0.10.8

func (t *VRFTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*VRFTask) Type added in v0.10.8

func (t *VRFTask) Type() TaskType

type VRFTaskV2 added in v1.0.0

type VRFTaskV2 struct {
	BaseTask           `mapstructure:",squash"`
	PublicKey          string `json:"publicKey"`
	RequestBlockHash   string `json:"requestBlockHash"`
	RequestBlockNumber string `json:"requestBlockNumber"`
	Topics             string `json:"topics"`
	// contains filtered or unexported fields
}

func (*VRFTaskV2) Run added in v1.0.0

func (t *VRFTaskV2) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*VRFTaskV2) Type added in v1.0.0

func (t *VRFTaskV2) Type() TaskType

type Vars added in v0.10.8

type Vars struct {
	// contains filtered or unexported fields
}

func NewVarsFrom added in v0.10.8

func NewVarsFrom(m map[string]interface{}) Vars

NewVarsFrom creates new Vars from the given map. If the map is nil, a new map instance will be created.

func (Vars) Copy added in v0.10.9

func (vars Vars) Copy() Vars

Copy makes a copy of Vars by copying the underlying map. Used by scheduler for new tasks to avoid data races.

func (Vars) Get added in v0.10.8

func (vars Vars) Get(keypathStr string) (interface{}, error)

Get returns the value for the given keypath or error. The keypath can consist of one or two parts, e.g. "foo" or "foo.bar". The second part of the keypath can be an index of a slice.

func (Vars) Set added in v0.10.8

func (vars Vars) Set(dotID string, value interface{}) error

Set sets a top-level variable specified by dotID. Returns error if either dotID is empty or it is a compound keypath.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL