Documentation ¶
Index ¶
- Constants
- Variables
- func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)
- func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm
- func NewRunner(orm ORM, config Config) *runner
- func WrapResultIfError(result *Result, msg string, args ...interface{})
- type AnyTask
- type BaseTask
- type BridgeTask
- func (t BridgeTask) ExportedEquals(otherTask Task) bool
- func (t *BridgeTask) HelperSetConfigAndTxDB(config Config, txdb *gorm.DB)
- func (t *BridgeTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) (result Result)
- func (t *BridgeTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
- func (t *BridgeTask) Type() TaskType
- type Config
- type FinalErrors
- type FinalResult
- type HTTPTask
- func (t HTTPTask) ExportedEquals(otherTask Task) bool
- func (t *HTTPTask) HelperSetConfig(config Config)
- func (t *HTTPTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result
- func (t *HTTPTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
- func (t *HTTPTask) Type() TaskType
- type HttpRequestData
- type JSONParseTask
- type JSONPath
- type JSONSerializable
- type MaybeBool
- type MedianTask
- type MultiplyTask
- type ORM
- type PossibleErrorResponses
- type ProcessRunFunc
- type Result
- type ResultTask
- type Run
- type Runner
- type Spec
- type Task
- type TaskDAG
- type TaskRun
- type TaskRunResult
- type TaskRunResults
- type TaskSpec
- type TaskType
Constants ¶
const ( MaybeBoolTrue = MaybeBool("true") MaybeBoolFalse = MaybeBool("false") MaybeBoolNull = MaybeBool("") )
const (
DotStr = `` /* 623-byte string literal not displayed */
)
const ResultTaskDotID = "__result__"
Variables ¶
var ( ErrWrongInputCardinality = errors.New("wrong number of task inputs") ErrBadInput = errors.New("bad input for task") )
Functions ¶
func FindBridge ¶
FindBridge find a bridge using the given database
func NewORM ¶
func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm
func WrapResultIfError ¶
Types ¶
type AnyTask ¶ added in v0.10.0
type AnyTask struct {
BaseTask `mapstructure:",squash"`
}
AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.
func (*AnyTask) SetDefaults ¶ added in v0.10.0
type BaseTask ¶
type BaseTask struct { Index int32 `mapstructure:"index" json:"-" ` Timeout time.Duration `mapstructure:"timeout"` // contains filtered or unexported fields }
func (BaseTask) OutputIndex ¶
func (BaseTask) OutputTask ¶
func (*BaseTask) SetOutputTask ¶
type BridgeTask ¶
type BridgeTask struct { BaseTask `mapstructure:",squash"` Name string `json:"name"` RequestData HttpRequestData `json:"requestData"` // contains filtered or unexported fields }
func (BridgeTask) ExportedEquals ¶ added in v0.9.9
func (t BridgeTask) ExportedEquals(otherTask Task) bool
func (*BridgeTask) HelperSetConfigAndTxDB ¶ added in v0.9.9
func (t *BridgeTask) HelperSetConfigAndTxDB(config Config, txdb *gorm.DB)
func (*BridgeTask) SetDefaults ¶ added in v0.9.9
func (t *BridgeTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
func (*BridgeTask) Type ¶
func (t *BridgeTask) Type() TaskType
type Config ¶
type Config interface { BridgeResponseURL() *url.URL DatabaseMaximumTxDuration() time.Duration DatabaseURL() string DefaultHTTPLimit() int64 DefaultHTTPTimeout() models.Duration DefaultMaxHTTPAttempts() uint DefaultHTTPAllowUnrestrictedNetworkAccess() bool TriggerFallbackDBPollInterval() time.Duration JobPipelineMaxTaskDuration() time.Duration JobPipelineMaxRunDuration() time.Duration JobPipelineParallelism() uint8 JobPipelineReaperInterval() time.Duration JobPipelineReaperThreshold() time.Duration }
type FinalErrors ¶
type FinalErrors []null.String
FIXME: This error/FinalErrors conflation exists solely because of the __result__ task. It is confusing and needs to go, making this note to remove it along with the special __result__ task. https://www.pivotaltracker.com/story/show/176557536
func (FinalErrors) Error ¶
func (fe FinalErrors) Error() string
func (FinalErrors) HasErrors ¶ added in v0.9.7
func (fe FinalErrors) HasErrors() bool
func (*FinalErrors) Scan ¶
func (fe *FinalErrors) Scan(value interface{}) error
type FinalResult ¶ added in v0.9.10
type FinalResult struct { Values []interface{} Errors []error }
FinalResult is the result of a Run TODO: Get rid of FinalErrors and use FinalResult instead https://www.pivotaltracker.com/story/show/176557536
func (FinalResult) ErrorsDB ¶ added in v0.9.10
func (result FinalResult) ErrorsDB() JSONSerializable
ErrorsDB dumps a result error for a pipeline_run
func (FinalResult) HasErrors ¶ added in v0.9.10
func (result FinalResult) HasErrors() bool
HasErrors returns true if the final result has any errors
func (FinalResult) OutputsDB ¶ added in v0.9.10
func (result FinalResult) OutputsDB() JSONSerializable
OutputsDB dumps a result output for a pipeline_run
func (FinalResult) SingularResult ¶ added in v0.9.10
func (result FinalResult) SingularResult() (Result, error)
SingularResult returns a single result if the FinalResult only has one set of outputs/errors
type HTTPTask ¶
type HTTPTask struct { BaseTask `mapstructure:",squash"` Method string URL models.WebURL RequestData HttpRequestData `json:"requestData"` AllowUnrestrictedNetworkAccess MaybeBool // contains filtered or unexported fields }
func (HTTPTask) ExportedEquals ¶ added in v0.9.9
func (*HTTPTask) HelperSetConfig ¶ added in v0.9.9
func (*HTTPTask) SetDefaults ¶ added in v0.9.9
type HttpRequestData ¶
type HttpRequestData map[string]interface{}
func (HttpRequestData) AsMap ¶
func (h HttpRequestData) AsMap() map[string]interface{}
func (*HttpRequestData) Scan ¶
func (h *HttpRequestData) Scan(value interface{}) error
type JSONParseTask ¶
type JSONParseTask struct { BaseTask `mapstructure:",squash"` Path JSONPath `json:"path"` // Lax when disabled will return an error if the path does not exist // Lax when enabled will return nil with no error if the path does not exist Lax bool }
func (JSONParseTask) ExportedEquals ¶ added in v0.9.9
func (t JSONParseTask) ExportedEquals(otherTask Task) bool
func (*JSONParseTask) SetDefaults ¶ added in v0.9.9
func (t *JSONParseTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
func (*JSONParseTask) Type ¶
func (t *JSONParseTask) Type() TaskType
type JSONSerializable ¶
type JSONSerializable struct { Val interface{} Null bool }
func (JSONSerializable) MarshalJSON ¶
func (js JSONSerializable) MarshalJSON() ([]byte, error)
func (*JSONSerializable) Scan ¶
func (js *JSONSerializable) Scan(value interface{}) error
func (*JSONSerializable) UnmarshalJSON ¶
func (js *JSONSerializable) UnmarshalJSON(bs []byte) error
type MedianTask ¶
type MedianTask struct { BaseTask `mapstructure:",squash"` AllowedFaults uint64 `json:"allowedFaults"` }
func (MedianTask) ExportedEquals ¶ added in v0.9.9
func (t MedianTask) ExportedEquals(otherTask Task) bool
func (*MedianTask) SetDefaults ¶ added in v0.9.9
func (t *MedianTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
func (*MedianTask) Type ¶
func (t *MedianTask) Type() TaskType
type MultiplyTask ¶
func (MultiplyTask) ExportedEquals ¶ added in v0.9.9
func (t MultiplyTask) ExportedEquals(otherTask Task) bool
func (*MultiplyTask) SetDefaults ¶ added in v0.9.9
func (t *MultiplyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
func (*MultiplyTask) Type ¶
func (t *MultiplyTask) Type() TaskType
type ORM ¶
type ORM interface { CreateSpec(ctx context.Context, db *gorm.DB, taskDAG TaskDAG, maxTaskTimeout models.Interval) (int32, error) CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error) ProcessNextUnfinishedRun(ctx context.Context, fn ProcessRunFunc) (bool, error) ListenForNewRuns() (postgres.Subscription, error) InsertFinishedRunWithResults(ctx context.Context, run Run, trrs []TaskRunResult) (runID int64, err error) AwaitRun(ctx context.Context, runID int64) error RunFinished(runID int64) (bool, error) ResultsForRun(ctx context.Context, runID int64) ([]Result, error) DeleteRunsOlderThan(threshold time.Duration) error FindBridge(name models.TaskType) (models.BridgeType, error) DB() *gorm.DB }
type PossibleErrorResponses ¶
type ProcessRunFunc ¶ added in v0.9.10
type ProcessRunFunc func(ctx context.Context, txdb *gorm.DB, pRun Run, l logger.Logger) (TaskRunResults, error)
TODO: Remove generation of special "result" task TODO: Remove the unique index on successor_id https://www.pivotaltracker.com/story/show/176557536
type Result ¶
type Result struct { Value interface{} Error error }
Result is the result of a TaskRun
func (Result) ErrorDB ¶ added in v0.9.10
func (result Result) ErrorDB() null.String
ErrorDB dumps a single result error for a pipeline_task_run
func (Result) OutputDB ¶ added in v0.9.10
func (result Result) OutputDB() JSONSerializable
OutputDB dumps a single result output for a pipeline_run or pipeline_task_run
type ResultTask ¶
type ResultTask struct {
BaseTask `mapstructure:",squash"`
}
ResultTask exists solely as a Postgres performance optimization. It's added automatically to the end of every pipeline, and it receives the outputs of all tasks that have no successor tasks. This allows the pipeline runner to detect when it has reached the end a given pipeline simply by checking the `successor_id` field, rather than having to try to SELECT all of the pipeline run's task runs, (which must be done from inside of a transaction, and causes lock contention and serialization anomaly issues).
func (ResultTask) ExportedEquals ¶ added in v0.9.9
func (t ResultTask) ExportedEquals(otherTask Task) bool
func (*ResultTask) SetDefaults ¶ added in v0.9.9
func (t *ResultTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
func (*ResultTask) Type ¶
func (t *ResultTask) Type() TaskType
type Run ¶
type Run struct { ID int64 `json:"-" gorm:"primary_key"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"pipelineSpec"` Meta JSONSerializable `json:"meta"` Errors JSONSerializable `json:"errors" gorm:"type:jsonb"` Outputs JSONSerializable `json:"outputs" gorm:"type:jsonb"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` PipelineTaskRuns []TaskRun `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"` }
func (Run) FinalErrors ¶ added in v0.9.10
func (r Run) FinalErrors() (f FinalErrors)
type Runner ¶
type Runner interface { Start() error Close() error CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (runID int64, err error) ExecuteRun(ctx context.Context, run Run, l logger.Logger) (trrs TaskRunResults, err error) ExecuteAndInsertNewRun(ctx context.Context, spec Spec, l logger.Logger) (finalResult FinalResult, err error) AwaitRun(ctx context.Context, runID int64) error ResultsForRun(ctx context.Context, runID int64) ([]Result, error) }
Runner checks the DB for incomplete TaskRuns and runs them. For a TaskRun to be eligible to be run, its parent/input tasks must already all be complete.
type Spec ¶
type Task ¶
type TaskDAG ¶
type TaskDAG struct { *simple.DirectedGraph DOTSource string }
TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks.
func NewTaskDAG ¶
func NewTaskDAG() *TaskDAG
func (TaskDAG) MinTimeout ¶ added in v0.9.10
func (TaskDAG) TasksInDependencyOrder ¶
Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.
func (*TaskDAG) UnmarshalText ¶
type TaskRun ¶
type TaskRun struct { ID int64 `json:"-" gorm:"primary_key"` Type TaskType `json:"type"` PipelineRun Run `json:"-"` PipelineRunID int64 `json:"-"` Output *JSONSerializable `json:"output" gorm:"type:jsonb"` Error null.String `json:"error"` PipelineTaskSpecID int32 `json:"-"` PipelineTaskSpec TaskSpec `json:"taskSpec" gorm:"foreignkey:PipelineTaskSpecID;->"` CreatedAt time.Time `json:"createdAt"` FinishedAt *time.Time `json:"finishedAt"` }
type TaskRunResult ¶ added in v0.9.10
type TaskRunResult struct { ID int64 TaskSpecID int32 Result Result FinishedAt time.Time IsTerminal bool }
TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero
type TaskRunResults ¶ added in v0.9.10
type TaskRunResults []TaskRunResult
TaskRunResults represents a collection of results for all task runs for one pipeline run
func (TaskRunResults) FinalResult ¶ added in v0.9.10
func (trrs TaskRunResults) FinalResult() (result FinalResult)
FinalResult pulls the FinalResult for the pipeline_run from the task runs
type TaskSpec ¶
type TaskSpec struct { ID int32 `json:"-" gorm:"primary_key"` DotID string `json:"dotId"` PipelineSpecID int32 `json:"-"` PipelineSpec Spec `json:"-"` Type TaskType `json:"-"` JSON JSONSerializable `json:"-" gorm:"type:jsonb"` Index int32 `json:"-"` SuccessorID null.Int `json:"-"` CreatedAt time.Time `json:"-"` }