Documentation ¶
Index ¶
- Constants
- Variables
- func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)
- func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm
- func NewRunner(orm ORM, config Config) *runner
- func WrapResultIfError(result *Result, msg string, args ...interface{})
- type BaseTask
- type BridgeTask
- type Config
- type FinalErrors
- type HTTPTask
- type HttpRequestData
- type JSONParseTask
- type JSONPath
- type JSONSerializable
- type MedianTask
- type MultiplyTask
- type ORM
- type PossibleErrorResponses
- type ProcessTaskRunFunc
- type Result
- type ResultTask
- type Run
- type Runner
- type Spec
- type Task
- type TaskDAG
- type TaskRun
- type TaskSpec
- type TaskType
Constants ¶
const ResultTaskDotID = "__result__"
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") )
Functions ¶
func FindBridge ¶
FindBridge find a bridge using the given database
func NewORM ¶
func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm
func WrapResultIfError ¶
Types ¶
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` // contains filtered or unexported fields }
func (BaseTask) OutputIndex ¶
func (BaseTask) OutputTask ¶
func (*BaseTask) SetOutputTask ¶
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData HttpRequestData `json:"requestData"` // contains filtered or unexported fields }
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type Config ¶
type Config interface { BridgeResponseURL() *url.URL DatabaseMaximumTxDuration() time.Duration DatabaseURL() string DefaultHTTPLimit() int64 DefaultHTTPTimeout() models.Duration DefaultMaxHTTPAttempts() uint DefaultHTTPAllowUnrestrictedNetworkAccess() bool JobPipelineDBPollInterval() time.Duration JobPipelineMaxTaskDuration() time.Duration JobPipelineParallelism() uint8 JobPipelineReaperInterval() time.Duration JobPipelineReaperThreshold() time.Duration }
type FinalErrors ¶
type FinalErrors []null.String
func (FinalErrors) Error ¶
func (fe FinalErrors) Error() string
func (*FinalErrors) Scan ¶
func (fe *FinalErrors) Scan(value interface{}) error
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL models.WebURL RequestData HttpRequestData `json:"requestData"` // contains filtered or unexported fields }
type HttpRequestData ¶
type HttpRequestData map[string]interface{}
func (HttpRequestData) AsMap ¶
func (h HttpRequestData) AsMap() map[string]interface{}
func (*HttpRequestData) Scan ¶
func (h *HttpRequestData) Scan(value interface{}) error
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path JSONPath `json:"path"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax bool }
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONSerializable ¶
type JSONSerializable struct {
Val interface{}
}
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
type MedianTask ¶
type MedianTask struct {
BaseTask `mapstructure:",squash"`
}
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MultiplyTask ¶
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { CreateSpec(ctx context.Context, taskDAG TaskDAG) (int32, error) CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error) ProcessNextUnclaimedTaskRun(ctx context.Context, fn ProcessTaskRunFunc) (bool, error) ListenForNewRuns() (postgres.Subscription, error) AwaitRun(ctx context.Context, runID int64) error RunFinished(runID int64) (bool, error) ResultsForRun(ctx context.Context, runID int64) ([]Result, error) DeleteRunsOlderThan(threshold time.Duration) error FindBridge(name models.TaskType) (models.BridgeType, error) }
type PossibleErrorResponses ¶
type ProcessTaskRunFunc ¶
type ResultTask ¶
type ResultTask struct {
BaseTask `mapstructure:",squash"`
}
ResultTask exists solely as a Postgres performance optimization. It's added automatically to the end of every pipeline, and it receives the outputs of all tasks that have no successor tasks. This allows the pipeline runner to detect when it has reached the end a given pipeline simply by checking the `successor_id` field, rather than having to try to SELECT all of the pipeline run's task runs, (which must be done from inside of a transaction, and causes lock contention and serialization anomaly issues).
func (*ResultTask) Type ¶
func (t *ResultTask) Type() TaskType
type Run ¶
type Runner ¶
type Runner interface { Start() Stop() CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error) AwaitRun(ctx context.Context, runID int64) error ResultsForRun(ctx context.Context, runID int64) ([]Result, error) }
Runner checks the DB for incomplete TaskRuns and runs them. For a TaskRun to be eligible to be run, its parent/input tasks must already all be complete.
type Task ¶
type TaskDAG ¶
type TaskDAG struct { *simple.DirectedGraph DOTSource string }
TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks.
func NewTaskDAG ¶
func NewTaskDAG() *TaskDAG
func (TaskDAG) TasksInDependencyOrder ¶
Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.