pipeline

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2021 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaybeBoolTrue  = MaybeBool("true")
	MaybeBoolFalse = MaybeBool("false")
	MaybeBoolNull  = MaybeBool("")
)
View Source
const (
	DotStr = `` /* 623-byte string literal not displayed */

)
View Source
const ResultTaskDotID = "__result__"

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
)

Functions

func FindBridge

func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)

FindBridge find a bridge using the given database

func NewORM

func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm

func NewRunner

func NewRunner(orm ORM, config Config) *runner

func WrapResultIfError

func WrapResultIfError(result *Result, msg string, args ...interface{})

Types

type AnyTask added in v0.10.0

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run added in v0.10.0

func (t *AnyTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*AnyTask) SetDefaults added in v0.10.0

func (t *AnyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*AnyTask) Type added in v0.10.0

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index   int32         `mapstructure:"index" json:"-" `
	Timeout time.Duration `mapstructure:"timeout"`
	// contains filtered or unexported fields
}

func NewBaseTask added in v0.9.9

func NewBaseTask(dotID string, t Task, index int32) BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) OutputTask

func (t BaseTask) OutputTask() Task

func (*BaseTask) SetOutputTask

func (t *BaseTask) SetOutputTask(outputTask Task)

func (BaseTask) TaskTimeout added in v0.9.7

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name        string          `json:"name"`
	RequestData HttpRequestData `json:"requestData"`
	// contains filtered or unexported fields
}

func (BridgeTask) ExportedEquals added in v0.9.9

func (t BridgeTask) ExportedEquals(otherTask Task) bool

func (*BridgeTask) HelperSetConfigAndTxDB added in v0.9.9

func (t *BridgeTask) HelperSetConfigAndTxDB(config Config, txdb *gorm.DB)

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*BridgeTask) SetDefaults added in v0.9.9

func (t *BridgeTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() string
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	DefaultMaxHTTPAttempts() uint
	DefaultHTTPAllowUnrestrictedNetworkAccess() bool
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineMaxTaskDuration() time.Duration
	JobPipelineMaxRunDuration() time.Duration
	JobPipelineParallelism() uint8
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type FinalErrors

type FinalErrors []null.String

FIXME: This error/FinalErrors conflation exists solely because of the __result__ task. It is confusing and needs to go, making this note to remove it along with the special __result__ task. https://www.pivotaltracker.com/story/show/176557536

func (FinalErrors) Error

func (fe FinalErrors) Error() string

func (FinalErrors) HasErrors added in v0.9.7

func (fe FinalErrors) HasErrors() bool

func (*FinalErrors) Scan

func (fe *FinalErrors) Scan(value interface{}) error

func (FinalErrors) Value

func (fe FinalErrors) Value() (driver.Value, error)

type FinalResult added in v0.9.10

type FinalResult struct {
	Values []interface{}
	Errors []error
}

FinalResult is the result of a Run TODO: Get rid of FinalErrors and use FinalResult instead https://www.pivotaltracker.com/story/show/176557536

func (FinalResult) ErrorsDB added in v0.9.10

func (result FinalResult) ErrorsDB() JSONSerializable

ErrorsDB dumps a result error for a pipeline_run

func (FinalResult) HasErrors added in v0.9.10

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) OutputsDB added in v0.9.10

func (result FinalResult) OutputsDB() JSONSerializable

OutputsDB dumps a result output for a pipeline_run

func (FinalResult) SingularResult added in v0.9.10

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type HTTPTask

type HTTPTask struct {
	BaseTask                       `mapstructure:",squash"`
	Method                         string
	URL                            models.WebURL
	RequestData                    HttpRequestData `json:"requestData"`
	AllowUnrestrictedNetworkAccess MaybeBool
	// contains filtered or unexported fields
}

func (HTTPTask) ExportedEquals added in v0.9.9

func (t HTTPTask) ExportedEquals(otherTask Task) bool

func (*HTTPTask) HelperSetConfig added in v0.9.9

func (t *HTTPTask) HelperSetConfig(config Config)

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result

func (*HTTPTask) SetDefaults added in v0.9.9

func (t *HTTPTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HttpRequestData

type HttpRequestData map[string]interface{}

func (HttpRequestData) AsMap

func (h HttpRequestData) AsMap() map[string]interface{}

func (*HttpRequestData) Scan

func (h *HttpRequestData) Scan(value interface{}) error

func (HttpRequestData) Value

func (h HttpRequestData) Value() (driver.Value, error)

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     JSONPath `json:"path"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax bool
}

func (JSONParseTask) ExportedEquals added in v0.9.9

func (t JSONParseTask) ExportedEquals(otherTask Task) bool

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*JSONParseTask) SetDefaults added in v0.9.9

func (t *JSONParseTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPath

type JSONPath []string

func (*JSONPath) Scan

func (p *JSONPath) Scan(value interface{}) error

func (*JSONPath) UnmarshalText

func (p *JSONPath) UnmarshalText(bs []byte) error

func (JSONPath) Value

func (p JSONPath) Value() (driver.Value, error)

type JSONSerializable

type JSONSerializable struct {
	Val  interface{}
	Null bool
}

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type MaybeBool added in v0.9.7

type MaybeBool string

func MaybeBoolFromString added in v0.9.7

func MaybeBoolFromString(s string) (MaybeBool, error)

func (MaybeBool) Bool added in v0.9.7

func (m MaybeBool) Bool() (b bool, isSet bool)

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	AllowedFaults uint64 `json:"allowedFaults"`
}

func (MedianTask) ExportedEquals added in v0.9.9

func (t MedianTask) ExportedEquals(otherTask Task) bool

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*MedianTask) SetDefaults added in v0.9.9

func (t *MedianTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Times    decimal.Decimal `json:"times"`
}

func (MultiplyTask) ExportedEquals added in v0.9.9

func (t MultiplyTask) ExportedEquals(otherTask Task) bool

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*MultiplyTask) SetDefaults added in v0.9.9

func (t *MultiplyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(ctx context.Context, db *gorm.DB, taskDAG TaskDAG, maxTaskTimeout models.Interval) (int32, error)
	CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error)
	ProcessNextUnfinishedRun(ctx context.Context, fn ProcessRunFunc) (bool, error)
	ListenForNewRuns() (postgres.Subscription, error)
	InsertFinishedRunWithResults(ctx context.Context, run Run, trrs []TaskRunResult) (runID int64, err error)
	AwaitRun(ctx context.Context, runID int64) error
	RunFinished(runID int64) (bool, error)
	ResultsForRun(ctx context.Context, runID int64) ([]Result, error)
	DeleteRunsOlderThan(threshold time.Duration) error

	FindBridge(name models.TaskType) (models.BridgeType, error)

	DB() *gorm.DB
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type ProcessRunFunc added in v0.9.10

type ProcessRunFunc func(ctx context.Context, txdb *gorm.DB, pRun Run, l logger.Logger) (TaskRunResults, error)

TODO: Remove generation of special "result" task TODO: Remove the unique index on successor_id https://www.pivotaltracker.com/story/show/176557536

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB added in v0.9.10

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB added in v0.9.10

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type ResultTask

type ResultTask struct {
	BaseTask `mapstructure:",squash"`
}

ResultTask exists solely as a Postgres performance optimization. It's added automatically to the end of every pipeline, and it receives the outputs of all tasks that have no successor tasks. This allows the pipeline runner to detect when it has reached the end a given pipeline simply by checking the `successor_id` field, rather than having to try to SELECT all of the pipeline run's task runs, (which must be done from inside of a transaction, and causes lock contention and serialization anomaly issues).

func (ResultTask) ExportedEquals added in v0.9.9

func (t ResultTask) ExportedEquals(otherTask Task) bool

func (*ResultTask) Run

func (t *ResultTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) Result

func (*ResultTask) SetDefaults added in v0.9.9

func (t *ResultTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*ResultTask) Type

func (t *ResultTask) Type() TaskType

type Run

type Run struct {
	ID               int64            `json:"-" gorm:"primary_key"`
	PipelineSpecID   int32            `json:"-"`
	PipelineSpec     Spec             `json:"pipelineSpec"`
	Meta             JSONSerializable `json:"meta"`
	Errors           JSONSerializable `json:"errors" gorm:"type:jsonb"`
	Outputs          JSONSerializable `json:"outputs" gorm:"type:jsonb"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       *time.Time       `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"`
}

func (Run) FinalErrors added in v0.9.10

func (r Run) FinalErrors() (f FinalErrors)

func (Run) GetID added in v0.9.6

func (r Run) GetID() string

func (Run) HasErrors added in v0.9.10

func (r Run) HasErrors() bool

func (*Run) SetID added in v0.9.6

func (r *Run) SetID(value string) error

func (Run) TableName

func (Run) TableName() string

type Runner

type Runner interface {
	Start() error
	Close() error
	CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (runID int64, err error)
	ExecuteRun(ctx context.Context, run Run, l logger.Logger) (trrs TaskRunResults, err error)
	ExecuteAndInsertNewRun(ctx context.Context, spec Spec, l logger.Logger) (finalResult FinalResult, err error)
	AwaitRun(ctx context.Context, runID int64) error
	ResultsForRun(ctx context.Context, runID int64) ([]Result, error)
}

Runner checks the DB for incomplete TaskRuns and runs them. For a TaskRun to be eligible to be run, its parent/input tasks must already all be complete.

type Spec

type Spec struct {
	ID                int32           `gorm:"primary_key"`
	DotDagSource      string          `json:"dotDagSource"`
	CreatedAt         time.Time       `json:"-"`
	MaxTaskDuration   models.Interval `json:"-"`
	PipelineTaskSpecs []TaskSpec      `json:"-" gorm:"foreignkey:PipelineSpecID;->"`
}

func (Spec) TableName

func (Spec) TableName() string

type Task

type Task interface {
	Type() TaskType
	DotID() string
	Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result
	OutputTask() Task
	SetOutputTask(task Task)
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
	SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, dotID string, config Config, txdb *gorm.DB, txdbMutex *sync.Mutex) (_ Task, err error)

type TaskDAG

type TaskDAG struct {
	*simple.DirectedGraph
	DOTSource string
}

TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks.

func NewTaskDAG

func NewTaskDAG() *TaskDAG

func (*TaskDAG) HasCycles

func (g *TaskDAG) HasCycles() bool

func (TaskDAG) MinTimeout added in v0.9.10

func (g TaskDAG) MinTimeout() (time.Duration, bool, error)

func (*TaskDAG) NewNode

func (g *TaskDAG) NewNode() graph.Node

func (TaskDAG) TasksInDependencyOrder

func (g TaskDAG) TasksInDependencyOrder() ([]Task, error)

Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.

func (*TaskDAG) UnmarshalText

func (g *TaskDAG) UnmarshalText(bs []byte) (err error)

type TaskRun

type TaskRun struct {
	ID                 int64             `json:"-" gorm:"primary_key"`
	Type               TaskType          `json:"type"`
	PipelineRun        Run               `json:"-"`
	PipelineRunID      int64             `json:"-"`
	Output             *JSONSerializable `json:"output" gorm:"type:jsonb"`
	Error              null.String       `json:"error"`
	PipelineTaskSpecID int32             `json:"-"`
	PipelineTaskSpec   TaskSpec          `json:"taskSpec" gorm:"foreignkey:PipelineTaskSpecID;->"`
	CreatedAt          time.Time         `json:"createdAt"`
	FinishedAt         *time.Time        `json:"finishedAt"`
}

func (TaskRun) DotID

func (tr TaskRun) DotID() string

func (TaskRun) GetID added in v0.9.6

func (tr TaskRun) GetID() string

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID added in v0.9.6

func (tr *TaskRun) SetID(value string) error

func (TaskRun) TableName

func (TaskRun) TableName() string

type TaskRunResult added in v0.9.10

type TaskRunResult struct {
	ID         int64
	TaskSpecID int32
	Result     Result
	FinishedAt time.Time
	IsTerminal bool
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

type TaskRunResults added in v0.9.10

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult added in v0.9.10

func (trrs TaskRunResults) FinalResult() (result FinalResult)

FinalResult pulls the FinalResult for the pipeline_run from the task runs

type TaskSpec

type TaskSpec struct {
	ID             int32            `json:"-" gorm:"primary_key"`
	DotID          string           `json:"dotId"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"-"`
	Type           TaskType         `json:"-"`
	JSON           JSONSerializable `json:"-" gorm:"type:jsonb"`
	Index          int32            `json:"-"`
	SuccessorID    null.Int         `json:"-"`
	CreatedAt      time.Time        `json:"-"`
}

func (TaskSpec) IsFinalPipelineOutput

func (s TaskSpec) IsFinalPipelineOutput() bool

func (TaskSpec) TableName

func (TaskSpec) TableName() string

type TaskType

type TaskType string
const (
	TaskTypeHTTP      TaskType = "http"
	TaskTypeBridge    TaskType = "bridge"
	TaskTypeMedian    TaskType = "median"
	TaskTypeMultiply  TaskType = "multiply"
	TaskTypeJSONParse TaskType = "jsonparse"
	TaskTypeResult    TaskType = "result"
	TaskTypeAny       TaskType = "any"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL