pipeline

package
v0.10.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2021 License: MIT Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InputTaskKey = "input"
)

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
	ErrInputTaskErrored      = errors.New("input task errored")
	ErrParameterEmpty        = errors.New("parameter is empty")
	ErrTooManyErrors         = errors.New("too many errors")
	ErrTimeout               = errors.New("timeout")
	ErrTaskRunFailed         = errors.New("task run failed")
)
View Source
var (
	// PromPipelineTaskExecutionTime reports how long each pipeline task took to execute
	// TODO: Make private again after
	// https://app.clubhouse.io/chainlinklabs/story/6065/hook-keeper-up-to-use-tasks-in-the-pipeline
	PromPipelineTaskExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_task_execution_time",
		Help: "How long each pipeline task took to execute",
	},
		[]string{"job_id", "job_name", "task_id", "task_type"},
	)
	PromPipelineRunErrors = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_run_errors",
		Help: "Number of errors for each pipeline spec",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineRunTotalTimeToCompletion = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "pipeline_run_total_time_to_completion",
		Help: "How long each pipeline run took to finish (from the moment it was created)",
	},
		[]string{"job_id", "job_name"},
	)
	PromPipelineTasksTotalFinished = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "pipeline_tasks_total_finished",
		Help: "The total number of pipeline tasks which have finished",
	},
		[]string{"job_id", "job_name", "task_id", "task_type", "status"},
	)
)
View Source
var (
	ErrKeypathNotFound = errors.New("keypath not found")
	ErrKeypathTooDeep  = errors.New("keypath too deep (maximum 2 keys)")
	ErrVarsRoot        = errors.New("cannot get/set the root of a pipeline.Vars")
)
View Source
var (
	ErrNoSuchBridge = errors.New("no such bridge exists")
)
View Source
var ErrOverflow = errors.New("overflow")
View Source
var ErrPending = errors.New("pending")

Functions

func CheckInputs added in v0.10.8

func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)

func NewORM

func NewORM(db *gorm.DB) *orm

func NewRunner

func NewRunner(orm ORM, config Config, ethClient eth.Client, ethks ETHKeyStore, vrfks VRFKeyStore, txManager TxManager) *runner

func ResolveParam added in v0.10.8

func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error

Types

type AddressParam added in v0.10.9

type AddressParam common.Address

func (*AddressParam) UnmarshalPipelineParam added in v0.10.9

func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error

type AddressSliceParam added in v0.10.9

type AddressSliceParam []common.Address

func (*AddressSliceParam) UnmarshalPipelineParam added in v0.10.9

func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error

type AnyTask added in v0.10.0

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run added in v0.10.0

func (t *AnyTask) Run(_ context.Context, _ Vars, inputs []Result) (result Result)

func (*AnyTask) Type added in v0.10.0

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index     int32         `mapstructure:"index" json:"-" `
	Timeout   time.Duration `mapstructure:"timeout"`
	FailEarly string        `mapstructure:"failEarly"`
	// contains filtered or unexported fields
}

func NewBaseTask added in v0.9.9

func NewBaseTask(id int, dotID string, inputs, outputs []Task, index int32) BaseTask

func (*BaseTask) Base added in v0.10.9

func (t *BaseTask) Base() *BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) ID added in v0.10.9

func (t BaseTask) ID() int

func (BaseTask) Inputs added in v0.10.9

func (t BaseTask) Inputs() []Task

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) Outputs added in v0.10.9

func (t BaseTask) Outputs() []Task

func (BaseTask) TaskTimeout added in v0.9.7

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BoolParam added in v0.10.8

type BoolParam bool

func (*BoolParam) UnmarshalPipelineParam added in v0.10.8

func (b *BoolParam) UnmarshalPipelineParam(val interface{}) error

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name              string `json:"name"`
	RequestData       string `json:"requestData"`
	IncludeInputAtKey string `json:"includeInputAtKey"`
	Async             string `json:"async"`
	// contains filtered or unexported fields
}

Return types:

string

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, vars Vars, inputs []Result) Result

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type BytesParam added in v0.10.8

type BytesParam []byte

func (*BytesParam) UnmarshalPipelineParam added in v0.10.8

func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error

type CBORParseTask added in v0.10.9

type CBORParseTask struct {
	BaseTask `mapstructure:",squash"`
	Data     string `json:"data"`
}

Return types:

map[string]interface{} with potential value types:
    float64
    string
    bool
    map[string]interface{}
    []interface{}
    nil

func (*CBORParseTask) Run added in v0.10.9

func (t *CBORParseTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*CBORParseTask) Type added in v0.10.9

func (t *CBORParseTask) Type() TaskType

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() url.URL
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	DefaultMaxHTTPAttempts() uint
	DefaultHTTPAllowUnrestrictedNetworkAccess() bool
	EthGasLimitDefault() uint64
	EthMaxQueuedTransactions() uint64
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineMaxRunDuration() time.Duration
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type DecimalParam added in v0.10.8

type DecimalParam decimal.Decimal

func (DecimalParam) Decimal added in v0.10.8

func (d DecimalParam) Decimal() decimal.Decimal

func (*DecimalParam) UnmarshalPipelineParam added in v0.10.8

func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error

type DecimalSliceParam added in v0.10.8

type DecimalSliceParam []decimal.Decimal

func (*DecimalSliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error

type DivideTask added in v0.10.9

type DivideTask struct {
	BaseTask  `mapstructure:",squash"`
	Input     string `json:"input"`
	Divisor   string `json:"divisor"`
	Precision string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*DivideTask) Run added in v0.10.9

func (t *DivideTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*DivideTask) Type added in v0.10.9

func (t *DivideTask) Type() TaskType

type ETHABIDecodeLogTask added in v0.10.9

type ETHABIDecodeLogTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
	Topics   string `json:"topics"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeLogTask) Run added in v0.10.9

func (t *ETHABIDecodeLogTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*ETHABIDecodeLogTask) Type added in v0.10.9

func (t *ETHABIDecodeLogTask) Type() TaskType

type ETHABIDecodeTask added in v0.10.9

type ETHABIDecodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeTask) Run added in v0.10.9

func (t *ETHABIDecodeTask) Run(_ context.Context, vars Vars, inputs []Result) Result

func (*ETHABIDecodeTask) Type added in v0.10.9

func (t *ETHABIDecodeTask) Type() TaskType

type ETHABIEncodeTask added in v0.10.9

type ETHABIEncodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

[]byte

func (*ETHABIEncodeTask) Run added in v0.10.9

func (t *ETHABIEncodeTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*ETHABIEncodeTask) Type added in v0.10.9

func (t *ETHABIEncodeTask) Type() TaskType

type ETHCallTask added in v0.10.9

type ETHCallTask struct {
	BaseTask `mapstructure:",squash"`
	Contract string `json:"contract"`
	Data     string `json:"data"`
	// contains filtered or unexported fields
}

Return types:

[]byte

func (*ETHCallTask) Run added in v0.10.9

func (t *ETHCallTask) Run(ctx context.Context, vars Vars, inputs []Result) (result Result)

func (*ETHCallTask) Type added in v0.10.9

func (t *ETHCallTask) Type() TaskType

type ETHKeyStore added in v0.10.11

type ETHKeyStore interface {
	GetRoundRobinAddress(addrs ...common.Address) (common.Address, error)
}

type ETHTxTask added in v0.10.9

type ETHTxTask struct {
	BaseTask `mapstructure:",squash"`
	From     string `json:"from"`
	To       string `json:"to"`
	Data     string `json:"data"`
	GasLimit string `json:"gasLimit"`
	TxMeta   string `json:"txMeta"`
	// contains filtered or unexported fields
}

Return types:

nil

func (*ETHTxTask) Run added in v0.10.9

func (t *ETHTxTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*ETHTxTask) Type added in v0.10.9

func (t *ETHTxTask) Type() TaskType

type ErrRunPanicked added in v0.10.4

type ErrRunPanicked struct {
	// contains filtered or unexported fields
}

When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.

func (ErrRunPanicked) Error added in v0.10.9

func (err ErrRunPanicked) Error() string

type FinalResult added in v0.9.10

type FinalResult struct {
	Values []interface{}
	Errors []error
}

FinalResult is the result of a Run

func (FinalResult) HasErrors added in v0.9.10

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) SingularResult added in v0.9.10

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type GetterFunc added in v0.10.8

type GetterFunc func() (interface{}, error)

func From added in v0.10.8

func From(getters ...interface{}) []GetterFunc

func Input added in v0.10.8

func Input(inputs []Result, index int) GetterFunc

func Inputs added in v0.10.8

func Inputs(inputs []Result) GetterFunc

func JSONWithVarExprs added in v0.10.8

func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc

func NonemptyString added in v0.10.8

func NonemptyString(s string) GetterFunc

func VarExpr added in v0.10.8

func VarExpr(s string, vars Vars) GetterFunc

type Graph added in v0.10.9

type Graph struct {
	*simple.DirectedGraph
}

tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.

func NewGraph added in v0.10.9

func NewGraph() *Graph

func (*Graph) NewNode added in v0.10.9

func (g *Graph) NewNode() graph.Node

func (*Graph) UnmarshalText added in v0.10.9

func (g *Graph) UnmarshalText(bs []byte) (err error)

type GraphNode added in v0.10.9

type GraphNode struct {
	graph.Node
	// contains filtered or unexported fields
}

func NewGraphNode added in v0.10.9

func NewGraphNode(n graph.Node, dotID string, attrs map[string]string) *GraphNode

func (*GraphNode) Attributes added in v0.10.9

func (n *GraphNode) Attributes() []encoding.Attribute

func (*GraphNode) DOTID added in v0.10.9

func (n *GraphNode) DOTID() string

func (*GraphNode) SetAttribute added in v0.10.9

func (n *GraphNode) SetAttribute(attr encoding.Attribute) error

func (*GraphNode) SetDOTID added in v0.10.9

func (n *GraphNode) SetDOTID(id string)

func (*GraphNode) String added in v0.10.9

func (n *GraphNode) String() string

type HTTPTask

type HTTPTask struct {
	BaseTask                       `mapstructure:",squash"`
	Method                         string
	URL                            string
	RequestData                    string `json:"requestData"`
	AllowUnrestrictedNetworkAccess string
	// contains filtered or unexported fields
}

Return types:

string

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, vars Vars, inputs []Result) Result

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HashSliceParam added in v0.10.9

type HashSliceParam []common.Hash

func (*HashSliceParam) UnmarshalPipelineParam added in v0.10.9

func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     string `json:"path"`
	Data     string `json:"data"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax string
}

Return types:

float64
string
bool
map[string]interface{}
[]interface{}
nil

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPathParam added in v0.10.9

type JSONPathParam []string

func (*JSONPathParam) UnmarshalPipelineParam added in v0.10.9

func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error

type JSONSerializable

type JSONSerializable struct {
	Val  interface{}
	Null bool
}

func (*JSONSerializable) Empty added in v0.10.10

func (js *JSONSerializable) Empty() bool

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type Keypath added in v0.10.8

type Keypath [2][]byte

func (Keypath) NumParts added in v0.10.8

func (keypath Keypath) NumParts() int

func (Keypath) String added in v0.10.8

func (keypath Keypath) String() string

type MapParam added in v0.10.8

type MapParam map[string]interface{}

func (*MapParam) UnmarshalPipelineParam added in v0.10.8

func (m *MapParam) UnmarshalPipelineParam(val interface{}) error

type MaybeInt32Param added in v0.10.9

type MaybeInt32Param struct {
	// contains filtered or unexported fields
}

func (MaybeInt32Param) Int32 added in v0.10.9

func (p MaybeInt32Param) Int32() (int32, bool)

func (*MaybeInt32Param) UnmarshalPipelineParam added in v0.10.9

func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error

type MaybeUint64Param added in v0.10.8

type MaybeUint64Param struct {
	// contains filtered or unexported fields
}

func (MaybeUint64Param) Uint64 added in v0.10.8

func (p MaybeUint64Param) Uint64() (uint64, bool)

func (*MaybeUint64Param) UnmarshalPipelineParam added in v0.10.8

func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error

type MeanTask added in v0.10.9

type MeanTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
	Precision     string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*MeanTask) Run added in v0.10.9

func (t *MeanTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*MeanTask) Type added in v0.10.9

func (t *MeanTask) Type() TaskType

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type ModeTask added in v0.10.9

type ModeTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

map[string]interface{}{
    "results": []interface{} containing any other type other pipeline tasks can return
    "occurrences": (int64)
}

func (*ModeTask) Run added in v0.10.9

func (t *ModeTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*ModeTask) Type added in v0.10.9

func (t *ModeTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
	Times    string `json:"times"`
}

Return types:

*decimal.Decimal

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(ctx context.Context, tx *gorm.DB, pipeline Pipeline, maxTaskTimeout models.Interval) (int32, error)
	CreateRun(db *gorm.DB, run *Run) (err error)
	StoreRun(db *sql.DB, run *Run) (restart bool, err error)
	UpdateTaskRunResult(db *sql.DB, taskID uuid.UUID, result interface{}) (run Run, start bool, err error)
	InsertFinishedRun(db *gorm.DB, run Run, trrs []TaskRunResult, saveSuccessfulTaskRuns bool) (runID int64, err error)
	DeleteRunsOlderThan(threshold time.Duration) error
	FindRun(id int64) (Run, error)
	GetAllRuns() ([]Run, error)
	GetUnfinishedRuns(now time.Time, fn func(run Run) error) error
	DB() *gorm.DB
}

type PanicTask added in v0.10.4

type PanicTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

func (*PanicTask) Run added in v0.10.4

func (t *PanicTask) Run(_ context.Context, vars Vars, _ []Result) (result Result)

func (*PanicTask) Type added in v0.10.4

func (t *PanicTask) Type() TaskType

type Pipeline added in v0.10.9

type Pipeline struct {
	Tasks []Task

	Source string
	// contains filtered or unexported fields
}

func Parse added in v0.10.9

func Parse(text string) (*Pipeline, error)

func (*Pipeline) ByDotID added in v0.10.10

func (p *Pipeline) ByDotID(id string) Task

func (*Pipeline) HasAsync added in v0.10.10

func (p *Pipeline) HasAsync() bool

func (*Pipeline) MinTimeout added in v0.10.9

func (p *Pipeline) MinTimeout() (time.Duration, bool, error)

func (*Pipeline) UnmarshalText added in v0.10.9

func (p *Pipeline) UnmarshalText(bs []byte) (err error)

type PipelineParamUnmarshaler added in v0.10.8

type PipelineParamUnmarshaler interface {
	UnmarshalPipelineParam(val interface{}) error
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB added in v0.9.10

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB added in v0.9.10

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type Run

type Run struct {
	ID             int64            `json:"-" gorm:"primary_key"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"pipelineSpec"`
	Meta           JSONSerializable `json:"meta"`
	// The errors are only ever strings
	// DB example: [null, null, "my error"]
	Errors RunErrors        `json:"errors" gorm:"type:jsonb"`
	Inputs JSONSerializable `json:"inputs" gorm:"type:jsonb"`
	// Its expected that Output.Val is of type []interface{}.
	// DB example: [1234, {"a": 10}, null]
	Outputs          JSONSerializable `json:"outputs" gorm:"type:jsonb"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       null.Time        `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"`
	State            RunStatus        `json:"state"`

	Async     bool `gorm:"-"`
	Pending   bool `gorm:"-"`
	FailEarly bool `gorm:"-"`
}

func NewRun added in v0.10.3

func NewRun(spec Spec, vars Vars) Run

func (*Run) ByDotID added in v0.10.10

func (r *Run) ByDotID(id string) *TaskRun

func (Run) GetID added in v0.9.6

func (r Run) GetID() string

func (Run) HasErrors added in v0.9.10

func (r Run) HasErrors() bool

func (*Run) SetID added in v0.9.6

func (r *Run) SetID(value string) error

func (*Run) Status added in v0.10.3

func (r *Run) Status() RunStatus

Status determines the status of the run.

func (Run) TableName

func (Run) TableName() string

type RunErrors added in v0.10.4

type RunErrors []null.String

func (RunErrors) HasError added in v0.10.4

func (re RunErrors) HasError() bool

func (*RunErrors) Scan added in v0.10.4

func (re *RunErrors) Scan(value interface{}) error

func (RunErrors) Value added in v0.10.4

func (re RunErrors) Value() (driver.Value, error)

type RunStatus added in v0.10.3

type RunStatus string

RunStatus represents the status of a run

const (
	// RunStatusUnknown is the when the run status cannot be determined.
	RunStatusUnknown RunStatus = "unknown"
	// RunStatusRunning is used for when a run is actively being executed.
	RunStatusRunning RunStatus = "running"
	// RunStatusSuspended is used when a run is paused and awaiting further results.
	RunStatusSuspended RunStatus = "suspended"
	// RunStatusErrored is used for when a run has errored and will not complete.
	RunStatusErrored RunStatus = "errored"
	// RunStatusCompleted is used for when a run has successfully completed execution.
	RunStatusCompleted RunStatus = "completed"
)

func (RunStatus) Completed added in v0.10.3

func (s RunStatus) Completed() bool

Completed returns true if the status is RunStatusCompleted.

func (RunStatus) Errored added in v0.10.3

func (s RunStatus) Errored() bool

Errored returns true if the status is RunStatusErrored.

func (RunStatus) Finished added in v0.10.3

func (s RunStatus) Finished() bool

Finished returns true if the status is final and can't be changed.

type RunWithResults added in v0.10.3

type RunWithResults struct {
	Run            Run
	TaskRunResults TaskRunResults
}

type Runner

type Runner interface {
	service.Service

	// Run is a blocking call that will execute the run until no further progress can be made.
	// If `incomplete` is true, the run is only partially complete and is suspended, awaiting to be resumed when more data comes in.
	// Note that `saveSuccessfulTaskRuns` value is ignored if the run contains async tasks.
	Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool) (incomplete bool, err error)

	// We expect spec.JobID and spec.JobName to be set for logging/prometheus.
	// ExecuteRun executes a new run in-memory according to a spec and returns the results.
	ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error)
	// InsertFinishedRun saves the run results in the database.
	InsertFinishedRun(db *gorm.DB, run Run, trrs TaskRunResults, saveSuccessfulTaskRuns bool) (int64, error)

	// ExecuteAndInsertNewRun executes a new run in-memory according to a spec, persists and saves the results.
	// It is a combination of ExecuteRun and InsertFinishedRun.
	// Note that the spec MUST have a DOT graph for this to work.
	ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error)

	// Test method for inserting completed non-pipeline job runs
	TestInsertFinishedRun(db *gorm.DB, jobID int32, jobName string, jobType string, specID int32) (int64, error)
}

type SliceParam added in v0.10.8

type SliceParam []interface{}

func (SliceParam) FilterErrors added in v0.10.8

func (s SliceParam) FilterErrors() (SliceParam, int)

func (*SliceParam) UnmarshalPipelineParam added in v0.10.8

func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error

type Spec

type Spec struct {
	ID              int32           `gorm:"primary_key"`
	DotDagSource    string          `json:"dotDagSource"`
	CreatedAt       time.Time       `json:"-"`
	MaxTaskDuration models.Interval `json:"-"`

	JobID   int32  `gorm:"-" json:"-"`
	JobName string `gorm:"-" json:"-"`
}

func (Spec) Pipeline added in v0.10.9

func (s Spec) Pipeline() (*Pipeline, error)

func (Spec) TableName

func (Spec) TableName() string

type StringParam added in v0.10.8

type StringParam string

func (*StringParam) UnmarshalPipelineParam added in v0.10.8

func (s *StringParam) UnmarshalPipelineParam(val interface{}) error

type SumTask added in v0.10.9

type SumTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*SumTask) Run added in v0.10.9

func (t *SumTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*SumTask) Type added in v0.10.9

func (t *SumTask) Type() TaskType

type Task

type Task interface {
	Type() TaskType
	ID() int
	DotID() string
	Run(ctx context.Context, vars Vars, inputs []Result) Result
	Base() *BaseTask
	Outputs() []Task
	Inputs() []Task
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID string) (_ Task, err error)

type TaskRun

type TaskRun struct {
	ID            uuid.UUID         `json:"id" gorm:"primary_key"`
	Type          TaskType          `json:"type"`
	PipelineRun   Run               `json:"-"`
	PipelineRunID int64             `json:"-"`
	Output        *JSONSerializable `json:"output" gorm:"type:jsonb"`
	Error         null.String       `json:"error"`
	CreatedAt     time.Time         `json:"createdAt"`
	FinishedAt    null.Time         `json:"finishedAt"`
	Index         int32             `json:"index"`
	DotID         string            `json:"dotId"`
	// contains filtered or unexported fields
}

func (TaskRun) GetDotID added in v0.10.4

func (tr TaskRun) GetDotID() string

func (TaskRun) GetID added in v0.9.6

func (tr TaskRun) GetID() string

func (*TaskRun) IsPending added in v0.10.10

func (tr *TaskRun) IsPending() bool

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID added in v0.9.6

func (tr *TaskRun) SetID(value string) error

func (TaskRun) TableName

func (TaskRun) TableName() string

type TaskRunResult added in v0.9.10

type TaskRunResult struct {
	ID         uuid.UUID
	Task       Task
	TaskRun    TaskRun
	Result     Result
	CreatedAt  time.Time
	FinishedAt null.Time
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

func (*TaskRunResult) IsPending added in v0.10.10

func (result *TaskRunResult) IsPending() bool

func (*TaskRunResult) IsTerminal added in v0.9.10

func (result *TaskRunResult) IsTerminal() bool

type TaskRunResults added in v0.9.10

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult added in v0.9.10

func (trrs TaskRunResults) FinalResult() FinalResult

FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task

type TaskType

type TaskType string
const (
	TaskTypeHTTP            TaskType = "http"
	TaskTypeBridge          TaskType = "bridge"
	TaskTypeMean            TaskType = "mean"
	TaskTypeMedian          TaskType = "median"
	TaskTypeMode            TaskType = "mode"
	TaskTypeSum             TaskType = "sum"
	TaskTypeMultiply        TaskType = "multiply"
	TaskTypeDivide          TaskType = "divide"
	TaskTypeJSONParse       TaskType = "jsonparse"
	TaskTypeCBORParse       TaskType = "cborparse"
	TaskTypeAny             TaskType = "any"
	TaskTypeVRF             TaskType = "vrf"
	TaskTypeETHCall         TaskType = "ethcall"
	TaskTypeETHTx           TaskType = "ethtx"
	TaskTypeETHABIEncode    TaskType = "ethabiencode"
	TaskTypeETHABIDecode    TaskType = "ethabidecode"
	TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog"

	// Testing only.
	TaskTypePanic TaskType = "panic"
)

func (TaskType) String added in v0.10.6

func (t TaskType) String() string

type TxManager added in v0.10.9

type TxManager interface {
	CreateEthTransaction(db *gorm.DB, fromAddress, toAddress common.Address, payload []byte, gasLimit uint64, meta interface{}, strategy bulletprooftxmanager.TxStrategy) (etx bulletprooftxmanager.EthTx, err error)
}

type URLParam added in v0.10.8

type URLParam url.URL

func (*URLParam) String added in v0.10.8

func (u *URLParam) String() string

func (*URLParam) UnmarshalPipelineParam added in v0.10.8

func (u *URLParam) UnmarshalPipelineParam(val interface{}) error

type Uint64Param added in v0.10.8

type Uint64Param uint64

func (*Uint64Param) UnmarshalPipelineParam added in v0.10.8

func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error

type VRFKeyStore added in v0.10.11

type VRFKeyStore interface {
	GenerateProof(k secp256k1.PublicKey, seed *big.Int) (vrfkey.Proof, error)
}

type VRFTask added in v0.10.8

type VRFTask struct {
	BaseTask           `mapstructure:",squash"`
	PublicKey          string `json:"publicKey"`
	RequestBlockHash   string `json:"requestBlockHash"`
	RequestBlockNumber string `json:"requestBlockNumber"`
	Topics             string `json:"topics"`
	// contains filtered or unexported fields
}

func (*VRFTask) Run added in v0.10.8

func (t *VRFTask) Run(_ context.Context, vars Vars, inputs []Result) (result Result)

func (*VRFTask) Type added in v0.10.8

func (t *VRFTask) Type() TaskType

type Vars added in v0.10.8

type Vars struct {
	// contains filtered or unexported fields
}

func NewVarsFrom added in v0.10.8

func NewVarsFrom(m map[string]interface{}) Vars

func (Vars) Copy added in v0.10.9

func (vars Vars) Copy() Vars

func (Vars) Get added in v0.10.8

func (vars Vars) Get(keypathStr string) (interface{}, error)

func (Vars) Set added in v0.10.8

func (vars Vars) Set(dotID string, value interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL