pipeline

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0, MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InputTaskKey = "input"
)

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
	ErrInputTaskErrored      = errors.New("input task errored")
	ErrParameterEmpty        = errors.New("parameter is empty")
	ErrTooManyErrors         = errors.New("too many errors")
	ErrTimeout               = errors.New("timeout")
	ErrTaskRunFailed         = errors.New("task run failed")
	ErrCancelled             = errors.New("task run cancelled (fail early)")
)
View Source
var (
	ErrKeypathNotFound = errors.New("keypath not found")
	ErrKeypathTooDeep  = errors.New("keypath too deep (maximum 2 keys)")
	ErrVarsRoot        = errors.New("cannot get/set the root of a pipeline.Vars")
)
View Source
var ErrDisallowedIP = errors.New("disallowed IP")
View Source
var ErrOverflow = errors.New("overflow")

Functions

func CheckInputs

func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)

func ConnectWebSocket

func ConnectWebSocket(ctx context.Context, websocketUrl, urlHeader string, maxRetries int) (conn *websocket.Conn, err error)

func NewRunner

func NewRunner(lggr log.Logger) *runner

func ParseETHABIArgsString

func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)

func ResolveParam

func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error

func ToDecimal

func ToDecimal(input interface{}) (decimal.Decimal, error)

ToDecimal converts an input to a decimal

func WrapRecoverHandle

func WrapRecoverHandle(lggr log.Logger, fn func(), onPanic func(interface{}))

Types

type AddressParam

type AddressParam common.Address

func (*AddressParam) UnmarshalPipelineParam

func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error

type AddressSliceParam

type AddressSliceParam []common.Address

func (*AddressSliceParam) UnmarshalPipelineParam

func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error

type AnyTask

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run

func (t *AnyTask) Run(_ context.Context, _ log.Logger, _ Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*AnyTask) Type

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index     int32          `mapstructure:"index" json:"-" `
	Timeout   *time.Duration `mapstructure:"timeout"`
	FailEarly bool           `mapstructure:"failEarly"`

	Retries    null.Uint32   `mapstructure:"retries"`
	MinBackoff time.Duration `mapstructure:"minBackoff"`
	MaxBackoff time.Duration `mapstructure:"maxBackoff"`
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(id int, dotID string, inputs []TaskDependency, outputs []Task, index int32) BaseTask

func (*BaseTask) Base

func (t *BaseTask) Base() *BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) ID

func (t BaseTask) ID() int

func (BaseTask) Inputs

func (t BaseTask) Inputs() []TaskDependency

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) Outputs

func (t BaseTask) Outputs() []Task

func (BaseTask) TaskMaxBackoff

func (t BaseTask) TaskMaxBackoff() time.Duration

func (BaseTask) TaskMinBackoff

func (t BaseTask) TaskMinBackoff() time.Duration

func (BaseTask) TaskRetries

func (t BaseTask) TaskRetries() uint32

func (BaseTask) TaskTimeout

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BoolParam

type BoolParam bool

func (*BoolParam) UnmarshalPipelineParam

func (b *BoolParam) UnmarshalPipelineParam(val interface{}) error

type BytesParam

type BytesParam []byte

func (*BytesParam) UnmarshalPipelineParam

func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error

type DecimalParam

type DecimalParam decimal.Decimal

func (DecimalParam) Decimal

func (d DecimalParam) Decimal() decimal.Decimal

func (*DecimalParam) UnmarshalPipelineParam

func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error

type DecimalSliceParam

type DecimalSliceParam []decimal.Decimal

func (*DecimalSliceParam) UnmarshalPipelineParam

func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error

type DivideTask

type DivideTask struct {
	BaseTask  `mapstructure:",squash"`
	Input     string `json:"input"`
	Divisor   string `json:"divisor"`
	Precision string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*DivideTask) Run

func (t *DivideTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*DivideTask) Type

func (t *DivideTask) Type() TaskType

type ETHABIDecodeLogTask

type ETHABIDecodeLogTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
	Topics   string `json:"topics"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeLogTask) Run

func (t *ETHABIDecodeLogTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIDecodeLogTask) Type

func (t *ETHABIDecodeLogTask) Type() TaskType

type ETHABIDecodeTask

type ETHABIDecodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

map[string]interface{} with any geth/abigen value type

func (*ETHABIDecodeTask) Run

func (t *ETHABIDecodeTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIDecodeTask) Type

func (t *ETHABIDecodeTask) Type() TaskType

type ETHABIEncodeTask

type ETHABIEncodeTask struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

[]byte

func (*ETHABIEncodeTask) Run

func (t *ETHABIEncodeTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ETHABIEncodeTask) Type

func (t *ETHABIEncodeTask) Type() TaskType

type ETHABIEncodeTask2

type ETHABIEncodeTask2 struct {
	BaseTask `mapstructure:",squash"`
	ABI      string `json:"abi"`
	Data     string `json:"data"`
}

Return types:

[]byte

func (*ETHABIEncodeTask2) Run

func (t *ETHABIEncodeTask2) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (Result, RunInfo)

func (*ETHABIEncodeTask2) Type

func (t *ETHABIEncodeTask2) Type() TaskType

type ErrRunPanicked

type ErrRunPanicked struct {
	// contains filtered or unexported fields
}

When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.

func (ErrRunPanicked) Error

func (err ErrRunPanicked) Error() string

type FailTask

type FailTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

FailTask is like the Panic task but without all the drama and stack unwinding of a panic

func (*FailTask) Run

func (t *FailTask) Run(_ context.Context, _ log.Logger, vars Vars, _ []Result) (Result, RunInfo)

func (*FailTask) Type

func (t *FailTask) Type() TaskType

type FinalResult

type FinalResult struct {
	Values      []interface{}
	AllErrors   []error
	FatalErrors []error
}

FinalResult is the result of a Run

func (FinalResult) HasErrors

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) HasFatalErrors

func (result FinalResult) HasFatalErrors() bool

HasFatalErrors returns true if the final result has any errors

func (FinalResult) SingularResult

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type GetterFunc

type GetterFunc func() (interface{}, error)

func From

func From(getters ...interface{}) []GetterFunc

func Input

func Input(inputs []Result, index int) GetterFunc

func Inputs

func Inputs(inputs []Result) GetterFunc

func JSONWithVarExprs

func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc

func NonemptyString

func NonemptyString(s string) GetterFunc

func VarExpr

func VarExpr(s string, vars Vars) GetterFunc

type Graph

type Graph struct {
	*simple.DirectedGraph
}

tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.

func NewGraph

func NewGraph() *Graph

func (*Graph) AddImplicitDependenciesAsEdges

func (g *Graph) AddImplicitDependenciesAsEdges()

Looks at node attributes and searches for implicit dependencies on other nodes expressed as attribute values. Adds those dependencies as implicit edges in the graph.

func (*Graph) IsImplicitEdge

func (g *Graph) IsImplicitEdge(uid, vid int64) bool

Indicates whether there's an implicit edge from uid -> vid. Implicit edged are ones that weren't added via the TOML spec, but via the pipeline parsing code

func (*Graph) NewEdge

func (g *Graph) NewEdge(from, to graph.Node) graph.Edge

func (*Graph) NewNode

func (g *Graph) NewNode() graph.Node

func (*Graph) UnmarshalText

func (g *Graph) UnmarshalText(bs []byte) (err error)

type GraphEdge

type GraphEdge struct {
	graph.Edge
	// contains filtered or unexported fields
}

func (*GraphEdge) IsImplicit

func (e *GraphEdge) IsImplicit() bool

func (*GraphEdge) SetIsImplicit

func (e *GraphEdge) SetIsImplicit(isImplicit bool)

type GraphNode

type GraphNode struct {
	graph.Node
	// contains filtered or unexported fields
}

func (*GraphNode) Attributes

func (n *GraphNode) Attributes() []encoding.Attribute

func (*GraphNode) DOTID

func (n *GraphNode) DOTID() string

func (*GraphNode) SetAttribute

func (n *GraphNode) SetAttribute(attr encoding.Attribute) error

func (*GraphNode) SetDOTID

func (n *GraphNode) SetDOTID(id string)

func (*GraphNode) String

func (n *GraphNode) String() string

type HTTPRequest

type HTTPRequest struct {
	Request *http.Request
	Logger  log.Logger
}

HTTPRequest holds the request and config struct for a http request

func (*HTTPRequest) SendRequest

func (h *HTTPRequest) SendRequest() (responseBody []byte, statusCode int, headers http.Header, err error)

SendRequest sends a HTTPRequest, returns a body, status code, and error.

type HTTPTask

type HTTPTask struct {
	BaseTask    `mapstructure:",squash"`
	Method      string
	URL         string
	RequestData string `json:"requestData"`
	HeaderMap   string `json:"headerMap"`
}

Return types:

string

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, lggr log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HashSliceParam

type HashSliceParam []common.Hash

func (*HashSliceParam) UnmarshalPipelineParam

func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     string `json:"path"`
	Data     string `json:"data"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax string
}

Return types:

float64
string
bool
map[string]interface{}
[]interface{}
nil

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPathParam

type JSONPathParam []string

func (*JSONPathParam) UnmarshalPipelineParam

func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error

type JSONSerializable

type JSONSerializable struct {
	Val   interface{}
	Valid bool
}

func (*JSONSerializable) Empty

func (js *JSONSerializable) Empty() bool

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

MarshalJSON implements custom marshaling logic

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

UnmarshalJSON implements custom unmarshaling logic

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type Keypath

type Keypath [2][]byte

func (Keypath) NumParts

func (keypath Keypath) NumParts() int

func (Keypath) String

func (keypath Keypath) String() string

type LowercaseTask

type LowercaseTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
}

Return types:

string

func (*LowercaseTask) Run

func (t *LowercaseTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*LowercaseTask) Type

func (t *LowercaseTask) Type() TaskType

type MapParam

type MapParam map[string]interface{}

MapParam accepts maps or JSON-encoded strings

func (MapParam) Map

func (m MapParam) Map() map[string]interface{}

func (*MapParam) UnmarshalPipelineParam

func (m *MapParam) UnmarshalPipelineParam(val interface{}) error

type MaybeBigIntParam

type MaybeBigIntParam struct {
	// contains filtered or unexported fields
}

func (MaybeBigIntParam) BigInt

func (p MaybeBigIntParam) BigInt() *big.Int

func (*MaybeBigIntParam) UnmarshalPipelineParam

func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error

type MaybeInt32Param

type MaybeInt32Param struct {
	// contains filtered or unexported fields
}

func (MaybeInt32Param) Int32

func (p MaybeInt32Param) Int32() (int32, bool)

func (*MaybeInt32Param) UnmarshalPipelineParam

func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error

type MaybeUint64Param

type MaybeUint64Param struct {
	// contains filtered or unexported fields
}

func (MaybeUint64Param) Uint64

func (p MaybeUint64Param) Uint64() (uint64, bool)

func (*MaybeUint64Param) UnmarshalPipelineParam

func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error

type MeanTask

type MeanTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
	Precision     string `json:"precision"`
}

Return types:

*decimal.Decimal

func (*MeanTask) Run

func (t *MeanTask) Run(ctx context.Context, lggr log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MeanTask) Type

func (t *MeanTask) Type() TaskType

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MemoTask

type MemoTask struct {
	BaseTask `mapstructure:",squash"`
	Value    string `json:"value"`
}

func (*MemoTask) Run

func (t *MemoTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (Result, RunInfo)

func (*MemoTask) Type

func (t *MemoTask) Type() TaskType

type MergeTask

type MergeTask struct {
	BaseTask `mapstructure:",squash"`
	Left     string `json:"left"`
	Right    string `json:"right"`
}

Return types:

map[string]interface{}

func (*MergeTask) Run

func (t *MergeTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MergeTask) Type

func (t *MergeTask) Type() TaskType

type Method

type Method struct {
	Name   string
	Inputs abi.Arguments
}

go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...

type ModeTask

type ModeTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

map[string]interface{}{
    "results": []interface{} containing any other type other pipeline tasks can return
    "occurrences": (int64)
}

func (*ModeTask) Run

func (t *ModeTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*ModeTask) Type

func (t *ModeTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
	Times    string `json:"times"`
}

Return types:

*decimal.Decimal

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ObjectParam

type ObjectParam struct {
	Type         ObjectType
	BoolValue    BoolParam
	DecimalValue DecimalParam
	StringValue  StringParam
	SliceValue   SliceParam
	MapValue     MapParam
}

ObjectParam represents a kind of any type that could be used by the memo task

func MustNewObjectParam

func MustNewObjectParam(val interface{}) *ObjectParam

func (ObjectParam) Marshal

func (o ObjectParam) Marshal() (string, error)

func (ObjectParam) MarshalJSON

func (o ObjectParam) MarshalJSON() ([]byte, error)

func (ObjectParam) String

func (o ObjectParam) String() string

func (*ObjectParam) UnmarshalPipelineParam

func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error

type ObjectType

type ObjectType int
const (
	NilType ObjectType = iota
	BoolType
	DecimalType
	StringType
	SliceType
	MapType
)

type PanicTask

type PanicTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

func (*PanicTask) Run

func (t *PanicTask) Run(_ context.Context, _ log.Logger, vars Vars, _ []Result) (result Result, runInfo RunInfo)

func (*PanicTask) Type

func (t *PanicTask) Type() TaskType

type Pipeline

type Pipeline struct {
	Tasks []Task

	Source string
	// contains filtered or unexported fields
}

func Parse

func Parse(text string) (*Pipeline, error)

func (*Pipeline) ByDotID

func (p *Pipeline) ByDotID(id string) Task

func (*Pipeline) MinTimeout

func (p *Pipeline) MinTimeout() (time.Duration, bool, error)

func (*Pipeline) UnmarshalText

func (p *Pipeline) UnmarshalText(bs []byte) (err error)

type PipelineParamUnmarshaler

type PipelineParamUnmarshaler interface {
	UnmarshalPipelineParam(val interface{}) error
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type ResumeRequest

type ResumeRequest struct {
	Error null.String     `json:"error"`
	Value json.RawMessage `json:"value"`
}

func (ResumeRequest) ToResult

func (rr ResumeRequest) ToResult() (Result, error)

type Run

type Run struct {
	ID             int64            `json:"-"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"pipelineSpec"`
	Meta           JSONSerializable `json:"meta"`
	// The errors are only ever strings
	// DB example: [null, null, "my error"]
	AllErrors   RunErrors        `json:"all_errors"`
	FatalErrors RunErrors        `json:"fatal_errors"`
	Inputs      JSONSerializable `json:"inputs"`
	// Its expected that Output.Val is of type []interface{}.
	// DB example: [1234, {"a": 10}, null]
	Outputs          JSONSerializable `json:"outputs"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       null.Time        `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns"`
	State            RunStatus        `json:"state"`

	Pending   bool
	FailEarly bool
}

func NewRun

func NewRun(spec Spec, vars Vars) Run

func (*Run) ByDotID

func (r *Run) ByDotID(id string) *TaskRun

func (Run) GetID

func (r Run) GetID() string

func (Run) HasErrors

func (r Run) HasErrors() bool

func (Run) HasFatalErrors

func (r Run) HasFatalErrors() bool

func (*Run) SetID

func (r *Run) SetID(value string) error

func (*Run) Status

func (r *Run) Status() RunStatus

Status determines the status of the run.

func (*Run) StringAllErrors

func (r *Run) StringAllErrors() []*string

func (*Run) StringFatalErrors

func (r *Run) StringFatalErrors() []*string

func (*Run) StringOutputs

func (r *Run) StringOutputs() ([]*string, error)

type RunErrors

type RunErrors []null.String

func (RunErrors) HasError

func (re RunErrors) HasError() bool

func (*RunErrors) Scan

func (re *RunErrors) Scan(value interface{}) error

func (RunErrors) ToError

func (re RunErrors) ToError() error

ToError coalesces all non-nil errors into a single error object. This is useful for logging.

func (RunErrors) Value

func (re RunErrors) Value() (driver.Value, error)

type RunInfo

type RunInfo struct {
	IsRetryable bool
	IsPending   bool
}

RunInfo contains additional information about the finished TaskRun

type RunStatus

type RunStatus string

RunStatus represents the status of a run

const (
	// RunStatusUnknown is the when the run status cannot be determined.
	RunStatusUnknown RunStatus = "unknown"
	// RunStatusRunning is used for when a run is actively being executed.
	RunStatusRunning RunStatus = "running"
	// RunStatusSuspended is used when a run is paused and awaiting further results.
	RunStatusSuspended RunStatus = "suspended"
	// RunStatusErrored is used for when a run has errored and will not complete.
	RunStatusErrored RunStatus = "errored"
	// RunStatusCompleted is used for when a run has successfully completed execution.
	RunStatusCompleted RunStatus = "completed"
)

func (RunStatus) Completed

func (s RunStatus) Completed() bool

Completed returns true if the status is RunStatusCompleted.

func (RunStatus) Errored

func (s RunStatus) Errored() bool

Errored returns true if the status is RunStatusErrored.

func (RunStatus) Finished

func (s RunStatus) Finished() bool

Finished returns true if the status is final and can't be changed.

type Runner

type Runner interface {
	// ExecuteRun executes a new run in-memory according to a spec and returns the results.
	ExecuteRun(ctx context.Context, spec Spec, vars Vars, l log.Logger) (run Run, trrs TaskRunResults, err error)
}

type SliceParam

type SliceParam []interface{}

func (SliceParam) FilterErrors

func (s SliceParam) FilterErrors() (SliceParam, int)

func (*SliceParam) UnmarshalPipelineParam

func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error

type Spec

type Spec struct {
	ID           int32
	DotDagSource string    `json:"dotDagSource"`
	CreatedAt    time.Time `json:"-"`

	JobID   int32  `json:"-"`
	JobName string `json:"-"`
}

func (Spec) Pipeline

func (s Spec) Pipeline() (*Pipeline, error)

type StringParam

type StringParam string

func (*StringParam) UnmarshalPipelineParam

func (s *StringParam) UnmarshalPipelineParam(val interface{}) error

type SumTask

type SumTask struct {
	BaseTask      `mapstructure:",squash"`
	Values        string `json:"values"`
	AllowedFaults string `json:"allowedFaults"`
}

Return types:

*decimal.Decimal

func (*SumTask) Run

func (t *SumTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*SumTask) Type

func (t *SumTask) Type() TaskType

type Task

type Task interface {
	Type() TaskType
	ID() int
	DotID() string
	Run(ctx context.Context, lggr log.Logger, vars Vars, inputs []Result) (Result, RunInfo)
	Base() *BaseTask
	Outputs() []Task
	Inputs() []TaskDependency
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
	TaskRetries() uint32
	TaskMinBackoff() time.Duration
	TaskMaxBackoff() time.Duration
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID string) (_ Task, err error)

type TaskDependency

type TaskDependency struct {
	PropagateResult bool
	InputTask       Task
}

Wraps the input Task for the given dependent task along with a bool variable PropagateResult, which Indicates whether result of InputTask should be propagated to its dependent task. If the edge between these tasks was an implicit edge, then results are not propagated. This is because some tasks cannot handle an input from an edge which wasn't specified in the spec.

type TaskRun

type TaskRun struct {
	ID            uuid.UUID        `json:"id"`
	Type          TaskType         `json:"type"`
	PipelineRun   Run              `json:"-"`
	PipelineRunID int64            `json:"-"`
	Output        JSONSerializable `json:"output"`
	Error         null.String      `json:"error"`
	CreatedAt     time.Time        `json:"createdAt"`
	FinishedAt    null.Time        `json:"finishedAt"`
	Index         int32            `json:"index"`
	DotID         string           `json:"dotId"`
	// contains filtered or unexported fields
}

func (TaskRun) GetDotID

func (tr TaskRun) GetDotID() string

func (TaskRun) GetID

func (tr TaskRun) GetID() string

func (*TaskRun) IsPending

func (tr *TaskRun) IsPending() bool

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID

func (tr *TaskRun) SetID(value string) error

type TaskRunResult

type TaskRunResult struct {
	ID         uuid.UUID
	Task       Task
	TaskRun    TaskRun
	Result     Result
	Attempts   uint
	CreatedAt  time.Time
	FinishedAt null.Time
	// contains filtered or unexported fields
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

func (*TaskRunResult) IsPending

func (result *TaskRunResult) IsPending() bool

func (*TaskRunResult) IsTerminal

func (result *TaskRunResult) IsTerminal() bool

type TaskRunResults

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult

func (trrs TaskRunResults) FinalResult(l log.Logger) FinalResult

FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task

type TaskType

type TaskType string
const (
	TaskTypeHTTP            TaskType = "http"
	TaskTypeMean            TaskType = "mean"
	TaskTypeMedian          TaskType = "median"
	TaskTypeMode            TaskType = "mode"
	TaskTypeSum             TaskType = "sum"
	TaskTypeMultiply        TaskType = "multiply"
	TaskTypeDivide          TaskType = "divide"
	TaskTypeJSONParse       TaskType = "jsonparse"
	TaskTypeAny             TaskType = "any"
	TaskTypeETHABIEncode    TaskType = "ethabiencode"
	TaskTypeETHABIEncode2   TaskType = "ethabiencode2"
	TaskTypeETHABIDecode    TaskType = "ethabidecode"
	TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog"
	TaskTypeMerge           TaskType = "merge"
	TaskTypeLowercase       TaskType = "lowercase"
	TaskTypeUppercase       TaskType = "uppercase"

	// Testing only.
	TaskTypePanic TaskType = "panic"
	TaskTypeMemo  TaskType = "memo"
	TaskTypeFail  TaskType = "fail"
)

func (TaskType) String

func (t TaskType) String() string

type URLParam

type URLParam url.URL

func (*URLParam) String

func (u *URLParam) String() string

func (*URLParam) UnmarshalPipelineParam

func (u *URLParam) UnmarshalPipelineParam(val interface{}) error

type Uint64Param

type Uint64Param uint64

func (*Uint64Param) UnmarshalPipelineParam

func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error

type UppercaseTask

type UppercaseTask struct {
	BaseTask `mapstructure:",squash"`
	Input    string `json:"input"`
}

Return types:

string

func (*UppercaseTask) Run

func (t *UppercaseTask) Run(_ context.Context, _ log.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo)

func (*UppercaseTask) Type

func (t *UppercaseTask) Type() TaskType

type Vars

type Vars struct {
	// contains filtered or unexported fields
}

func NewVarsFrom

func NewVarsFrom(m map[string]interface{}) Vars

func (Vars) Copy

func (vars Vars) Copy() Vars

func (Vars) Get

func (vars Vars) Get(keypathStr string) (interface{}, error)

func (Vars) Set

func (vars Vars) Set(dotID string, value interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL