Documentation ¶
Index ¶
- Constants
- Variables
- func CheckInputs(inputs []Result, minLen, maxLen, maxErrors int) ([]interface{}, error)
- func ConnectWebSocket(ctx context.Context, websocketUrl, urlHeader string, maxRetries int) (conn *websocket.Conn, err error)
- func NewRunner(lggr log.Logger) *runner
- func ParseETHABIArgsString(theABI []byte, isLog bool) (args abi.Arguments, indexedArgs abi.Arguments, _ error)
- func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
- func ToDecimal(input interface{}) (decimal.Decimal, error)
- func WrapRecoverHandle(lggr log.Logger, fn func(), onPanic func(interface{}))
- type AddressParam
- type AddressSliceParam
- type AnyTask
- type BaseTask
- func (t *BaseTask) Base() *BaseTask
- func (t BaseTask) DotID() string
- func (t BaseTask) ID() int
- func (t BaseTask) Inputs() []TaskDependency
- func (t BaseTask) OutputIndex() int32
- func (t BaseTask) Outputs() []Task
- func (t BaseTask) TaskMaxBackoff() time.Duration
- func (t BaseTask) TaskMinBackoff() time.Duration
- func (t BaseTask) TaskRetries() uint32
- func (t BaseTask) TaskTimeout() (time.Duration, bool)
- type BoolParam
- type BytesParam
- type DecimalParam
- type DecimalSliceParam
- type DivideTask
- type ETHABIDecodeLogTask
- type ETHABIDecodeTask
- type ETHABIEncodeTask
- type ETHABIEncodeTask2
- type ErrRunPanicked
- type FailTask
- type FinalResult
- type GetterFunc
- type Graph
- type GraphEdge
- type GraphNode
- type HTTPRequest
- type HTTPTask
- type HashSliceParam
- type JSONParseTask
- type JSONPathParam
- type JSONSerializable
- type Keypath
- type LowercaseTask
- type MapParam
- type MaybeBigIntParam
- type MaybeInt32Param
- type MaybeUint64Param
- type MeanTask
- type MedianTask
- type MemoTask
- type MergeTask
- type Method
- type ModeTask
- type MultiplyTask
- type ObjectParam
- type ObjectType
- type PanicTask
- type Pipeline
- type PipelineParamUnmarshaler
- type PossibleErrorResponses
- type Result
- type ResumeRequest
- type Run
- func (r *Run) ByDotID(id string) *TaskRun
- func (r Run) GetID() string
- func (r Run) HasErrors() bool
- func (r Run) HasFatalErrors() bool
- func (r *Run) SetID(value string) error
- func (r *Run) Status() RunStatus
- func (r *Run) StringAllErrors() []*string
- func (r *Run) StringFatalErrors() []*string
- func (r *Run) StringOutputs() ([]*string, error)
- type RunErrors
- type RunInfo
- type RunStatus
- type Runner
- type SliceParam
- type Spec
- type StringParam
- type SumTask
- type Task
- type TaskDependency
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskType
- type URLParam
- type Uint64Param
- type UppercaseTask
- type Vars
Constants ¶
const (
InputTaskKey = "input"
)
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") ErrInputTaskErrored = errors.New("input task errored") ErrParameterEmpty = errors.New("parameter is empty") ErrTooManyErrors = errors.New("too many errors") ErrTimeout = errors.New("timeout") ErrTaskRunFailed = errors.New("task run failed") ErrCancelled = errors.New("task run cancelled (fail early)") )
var ( ErrKeypathNotFound = errors.New("keypath not found") ErrKeypathTooDeep = errors.New("keypath too deep (maximum 2 keys)") ErrVarsRoot = errors.New("cannot get/set the root of a pipeline.Vars") )
var ErrDisallowedIP = errors.New("disallowed IP")
var ErrOverflow = errors.New("overflow")
Functions ¶
func CheckInputs ¶
func ConnectWebSocket ¶
func ParseETHABIArgsString ¶
func ResolveParam ¶
func ResolveParam(out PipelineParamUnmarshaler, getters []GetterFunc) error
func WrapRecoverHandle ¶
Types ¶
type AddressParam ¶
func (*AddressParam) UnmarshalPipelineParam ¶
func (a *AddressParam) UnmarshalPipelineParam(val interface{}) error
type AddressSliceParam ¶
func (*AddressSliceParam) UnmarshalPipelineParam ¶
func (s *AddressSliceParam) UnmarshalPipelineParam(val interface{}) error
type AnyTask ¶
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout *time.Duration `mapstructure:"timeout"` FailEarly bool `mapstructure:"failEarly"` Retries null.Uint32 `mapstructure:"retries"` MinBackoff time.Duration `mapstructure:"minBackoff"` MaxBackoff time.Duration `mapstructure:"maxBackoff"` // contains filtered or unexported fields }
func NewBaseTask ¶
func (BaseTask) Inputs ¶
func (t BaseTask) Inputs() []TaskDependency
func (BaseTask) OutputIndex ¶
func (BaseTask) TaskMaxBackoff ¶
func (BaseTask) TaskMinBackoff ¶
func (BaseTask) TaskRetries ¶
type BytesParam ¶
type BytesParam []byte
func (*BytesParam) UnmarshalPipelineParam ¶
func (b *BytesParam) UnmarshalPipelineParam(val interface{}) error
type DecimalParam ¶
func (DecimalParam) Decimal ¶
func (d DecimalParam) Decimal() decimal.Decimal
func (*DecimalParam) UnmarshalPipelineParam ¶
func (d *DecimalParam) UnmarshalPipelineParam(val interface{}) error
type DecimalSliceParam ¶
func (*DecimalSliceParam) UnmarshalPipelineParam ¶
func (s *DecimalSliceParam) UnmarshalPipelineParam(val interface{}) error
type DivideTask ¶
type DivideTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Divisor string `json:"divisor"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
func (*DivideTask) Type ¶
func (t *DivideTask) Type() TaskType
type ETHABIDecodeLogTask ¶
type ETHABIDecodeLogTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` Topics string `json:"topics"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeLogTask) Type ¶
func (t *ETHABIDecodeLogTask) Type() TaskType
type ETHABIDecodeTask ¶
type ETHABIDecodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
map[string]interface{} with any geth/abigen value type
func (*ETHABIDecodeTask) Type ¶
func (t *ETHABIDecodeTask) Type() TaskType
type ETHABIEncodeTask ¶
type ETHABIEncodeTask struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask) Type ¶
func (t *ETHABIEncodeTask) Type() TaskType
type ETHABIEncodeTask2 ¶
type ETHABIEncodeTask2 struct { BaseTask `mapstructure:",squash"` ABI string `json:"abi"` Data string `json:"data"` }
Return types:
[]byte
func (*ETHABIEncodeTask2) Type ¶
func (t *ETHABIEncodeTask2) Type() TaskType
type ErrRunPanicked ¶
type ErrRunPanicked struct {
// contains filtered or unexported fields
}
When a task panics, we catch the panic and wrap it in an error for reporting to the scheduler.
func (ErrRunPanicked) Error ¶
func (err ErrRunPanicked) Error() string
type FailTask ¶
FailTask is like the Panic task but without all the drama and stack unwinding of a panic
type FinalResult ¶
FinalResult is the result of a Run
func (FinalResult) HasErrors ¶
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) HasFatalErrors ¶
func (result FinalResult) HasFatalErrors() bool
HasFatalErrors returns true if the final result has any errors
func (FinalResult) SingularResult ¶
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type GetterFunc ¶
type GetterFunc func() (interface{}, error)
func From ¶
func From(getters ...interface{}) []GetterFunc
func Input ¶
func Input(inputs []Result, index int) GetterFunc
func Inputs ¶
func Inputs(inputs []Result) GetterFunc
func JSONWithVarExprs ¶
func JSONWithVarExprs(s string, vars Vars, allowErrors bool) GetterFunc
func NonemptyString ¶
func NonemptyString(s string) GetterFunc
func VarExpr ¶
func VarExpr(s string, vars Vars) GetterFunc
type Graph ¶
type Graph struct {
*simple.DirectedGraph
}
tree fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it.
func (*Graph) AddImplicitDependenciesAsEdges ¶
func (g *Graph) AddImplicitDependenciesAsEdges()
Looks at node attributes and searches for implicit dependencies on other nodes expressed as attribute values. Adds those dependencies as implicit edges in the graph.
func (*Graph) IsImplicitEdge ¶
Indicates whether there's an implicit edge from uid -> vid. Implicit edged are ones that weren't added via the TOML spec, but via the pipeline parsing code
func (*Graph) UnmarshalText ¶
type GraphNode ¶
func (*GraphNode) Attributes ¶
type HTTPRequest ¶
HTTPRequest holds the request and config struct for a http request
func (*HTTPRequest) SendRequest ¶
func (h *HTTPRequest) SendRequest() (responseBody []byte, statusCode int, headers http.Header, err error)
SendRequest sends a HTTPRequest, returns a body, status code, and error.
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL string RequestData string `json:"requestData"` HeaderMap string `json:"headerMap"` }
Return types:
string
type HashSliceParam ¶
func (*HashSliceParam) UnmarshalPipelineParam ¶
func (s *HashSliceParam) UnmarshalPipelineParam(val interface{}) error
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path string `json:"path"` Data string `json:"data"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax string }
Return types:
float64 string bool map[string]interface{} []interface{} nil
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONPathParam ¶
type JSONPathParam []string
func (*JSONPathParam) UnmarshalPipelineParam ¶
func (p *JSONPathParam) UnmarshalPipelineParam(val interface{}) error
type JSONSerializable ¶
type JSONSerializable struct { Val interface{} Valid bool }
func (*JSONSerializable) Empty ¶
func (js *JSONSerializable) Empty() bool
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
MarshalJSON implements custom marshaling logic
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
UnmarshalJSON implements custom unmarshaling logic
type LowercaseTask ¶
Return types:
string
func (*LowercaseTask) Type ¶
func (t *LowercaseTask) Type() TaskType
type MapParam ¶
type MapParam map[string]interface{}
MapParam accepts maps or JSON-encoded strings
func (*MapParam) UnmarshalPipelineParam ¶
type MaybeBigIntParam ¶
type MaybeBigIntParam struct {
// contains filtered or unexported fields
}
func (MaybeBigIntParam) BigInt ¶
func (p MaybeBigIntParam) BigInt() *big.Int
func (*MaybeBigIntParam) UnmarshalPipelineParam ¶
func (p *MaybeBigIntParam) UnmarshalPipelineParam(val interface{}) error
type MaybeInt32Param ¶
type MaybeInt32Param struct {
// contains filtered or unexported fields
}
func (MaybeInt32Param) Int32 ¶
func (p MaybeInt32Param) Int32() (int32, bool)
func (*MaybeInt32Param) UnmarshalPipelineParam ¶
func (p *MaybeInt32Param) UnmarshalPipelineParam(val interface{}) error
type MaybeUint64Param ¶
type MaybeUint64Param struct {
// contains filtered or unexported fields
}
func (MaybeUint64Param) Uint64 ¶
func (p MaybeUint64Param) Uint64() (uint64, bool)
func (*MaybeUint64Param) UnmarshalPipelineParam ¶
func (p *MaybeUint64Param) UnmarshalPipelineParam(val interface{}) error
type MeanTask ¶
type MeanTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` Precision string `json:"precision"` }
Return types:
*decimal.Decimal
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MergeTask ¶
type MergeTask struct { BaseTask `mapstructure:",squash"` Left string `json:"left"` Right string `json:"right"` }
Return types:
map[string]interface{}
type Method ¶
go-ethereum's abi.Method doesn't implement json.Marshal for Type, but otherwise would have worked fine, in any case we only care about these...
type ModeTask ¶
type ModeTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
map[string]interface{}{ "results": []interface{} containing any other type other pipeline tasks can return "occurrences": (int64) }
type MultiplyTask ¶
type MultiplyTask struct { BaseTask `mapstructure:",squash"` Input string `json:"input"` Times string `json:"times"` }
Return types:
*decimal.Decimal
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ObjectParam ¶
type ObjectParam struct { Type ObjectType BoolValue BoolParam DecimalValue DecimalParam StringValue StringParam SliceValue SliceParam MapValue MapParam }
ObjectParam represents a kind of any type that could be used by the memo task
func MustNewObjectParam ¶
func MustNewObjectParam(val interface{}) *ObjectParam
func (ObjectParam) Marshal ¶
func (o ObjectParam) Marshal() (string, error)
func (ObjectParam) MarshalJSON ¶
func (o ObjectParam) MarshalJSON() ([]byte, error)
func (ObjectParam) String ¶
func (o ObjectParam) String() string
func (*ObjectParam) UnmarshalPipelineParam ¶
func (o *ObjectParam) UnmarshalPipelineParam(val interface{}) error
type ObjectType ¶
type ObjectType int
const ( NilType ObjectType = iota BoolType DecimalType StringType SliceType MapType )
type Pipeline ¶
func (*Pipeline) UnmarshalText ¶
type PipelineParamUnmarshaler ¶
type PipelineParamUnmarshaler interface {
UnmarshalPipelineParam(val interface{}) error
}
type PossibleErrorResponses ¶
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) OutputDB ¶
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type ResumeRequest ¶
type ResumeRequest struct { Error null.String `json:"error"` Value json.RawMessage `json:"value"` }
func (ResumeRequest) ToResult ¶
func (rr ResumeRequest) ToResult() (Result, error)
type Run ¶
type Run struct { ID int64 `json:"-"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"pipelineSpec"` Meta JSONSerializable `json:"meta"` // The errors are only ever strings // DB example: [null, null, "my error"] AllErrors RunErrors `json:"all_errors"` FatalErrors RunErrors `json:"fatal_errors"` Inputs JSONSerializable `json:"inputs"` // Its expected that Output.Val is of type []interface{}. // DB example: [1234, {"a": 10}, null] Outputs JSONSerializable `json:"outputs"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns"` State RunStatus `json:"state"` Pending bool FailEarly bool }
func (Run) HasFatalErrors ¶
func (*Run) StringAllErrors ¶
func (*Run) StringFatalErrors ¶
func (*Run) StringOutputs ¶
type RunErrors ¶
type RunStatus ¶
type RunStatus string
RunStatus represents the status of a run
const ( // RunStatusUnknown is the when the run status cannot be determined. RunStatusUnknown RunStatus = "unknown" // RunStatusRunning is used for when a run is actively being executed. RunStatusRunning RunStatus = "running" // RunStatusSuspended is used when a run is paused and awaiting further results. RunStatusSuspended RunStatus = "suspended" // RunStatusErrored is used for when a run has errored and will not complete. RunStatusErrored RunStatus = "errored" // RunStatusCompleted is used for when a run has successfully completed execution. RunStatusCompleted RunStatus = "completed" )
type SliceParam ¶
type SliceParam []interface{}
func (SliceParam) FilterErrors ¶
func (s SliceParam) FilterErrors() (SliceParam, int)
func (*SliceParam) UnmarshalPipelineParam ¶
func (s *SliceParam) UnmarshalPipelineParam(val interface{}) error
type Spec ¶
type StringParam ¶
type StringParam string
func (*StringParam) UnmarshalPipelineParam ¶
func (s *StringParam) UnmarshalPipelineParam(val interface{}) error
type SumTask ¶
type SumTask struct { BaseTask `mapstructure:",squash"` Values string `json:"values"` AllowedFaults string `json:"allowedFaults"` }
Return types:
*decimal.Decimal
type Task ¶
type Task interface { Type() TaskType ID() int DotID() string Run(ctx context.Context, lggr log.Logger, vars Vars, inputs []Result) (Result, RunInfo) Base() *BaseTask Outputs() []Task Inputs() []TaskDependency OutputIndex() int32 TaskTimeout() (time.Duration, bool) TaskRetries() uint32 TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration }
type TaskDependency ¶
Wraps the input Task for the given dependent task along with a bool variable PropagateResult, which Indicates whether result of InputTask should be propagated to its dependent task. If the edge between these tasks was an implicit edge, then results are not propagated. This is because some tasks cannot handle an input from an edge which wasn't specified in the spec.
type TaskRun ¶
type TaskRun struct { ID uuid.UUID `json:"id"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output JSONSerializable `json:"output"` Error null.String `json:"error"` CreatedAt time.Time `json:"createdAt"` FinishedAt null.Time `json:"finishedAt"` Index int32 `json:"index"` DotID string `json:"dotId"` // contains filtered or unexported fields }
type TaskRunResult ¶
type TaskRunResult struct { ID uuid.UUID Task Task TaskRun TaskRun Result Result Attempts uint CreatedAt time.Time FinishedAt null.Time // contains filtered or unexported fields }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
func (*TaskRunResult) IsPending ¶
func (result *TaskRunResult) IsPending() bool
func (*TaskRunResult) IsTerminal ¶
func (result *TaskRunResult) IsTerminal() bool
type TaskRunResults ¶
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶
func (trrs TaskRunResults) FinalResult(l log.Logger) FinalResult
FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task
type TaskType ¶
type TaskType string
const ( TaskTypeHTTP TaskType = "http" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMode TaskType = "mode" TaskTypeSum TaskType = "sum" TaskTypeMultiply TaskType = "multiply" TaskTypeDivide TaskType = "divide" TaskTypeJSONParse TaskType = "jsonparse" TaskTypeAny TaskType = "any" TaskTypeETHABIEncode TaskType = "ethabiencode" TaskTypeETHABIEncode2 TaskType = "ethabiencode2" TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" TaskTypeMerge TaskType = "merge" TaskTypeLowercase TaskType = "lowercase" TaskTypeUppercase TaskType = "uppercase" // Testing only. TaskTypePanic TaskType = "panic" TaskTypeMemo TaskType = "memo" TaskTypeFail TaskType = "fail" )
type URLParam ¶
func (*URLParam) UnmarshalPipelineParam ¶
type Uint64Param ¶
type Uint64Param uint64
func (*Uint64Param) UnmarshalPipelineParam ¶
func (u *Uint64Param) UnmarshalPipelineParam(val interface{}) error
type UppercaseTask ¶
Return types:
string
func (*UppercaseTask) Type ¶
func (t *UppercaseTask) Type() TaskType
Source Files ¶
- common.go
- common_eth.go
- common_http.go
- graph.go
- http.go
- http_allowed_ips.go
- models.go
- runner.go
- scheduler.go
- task.any.go
- task.base.go
- task.divide.go
- task.eth_abi_decode.go
- task.eth_abi_decode_log.go
- task.eth_abi_encode.go
- task.eth_abi_encode_2.go
- task.fail.go
- task.http.go
- task.jsonparse.go
- task.lowercase.go
- task.mean.go
- task.median.go
- task.memo.go
- task.merge.go
- task.mode.go
- task.multiply.go
- task.panic.go
- task.sum.go
- task.uppercase.go
- task_object_params.go
- task_params.go
- variables.go
- ws.go