execution

package
v1.0.0-beta.202410020835 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2024 License: AGPL-3.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PipelineQueuedEvent   = event.PipelineQueued{}
	PipelineStartedEvent  = event.PipelineStarted{}
	PipelineResumedEvent  = event.PipelineResumed{}
	PipelinePlannedEvent  = event.PipelinePlanned{}
	PipelineCanceledEvent = event.PipelineCanceled{}
	PipelinePausedEvent   = event.PipelinePaused{}
	PipelineFinishedEvent = event.PipelineFinished{}
	PipelineFailedEvent   = event.PipelineFailed{}
	PipelineLoadedEvent   = event.PipelineLoaded{}

	StepQueuedEvent          = event.StepQueued{}
	StepFinishedEvent        = event.StepFinished{} // this is the generic step finish event that is fired by the command.step_start command
	StepForEachPlannedEvent  = event.StepForEachPlanned{}
	StepPipelineStartedEvent = event.StepPipelineStarted{} // this event is fired for a specific step type: pipeline step (step that launches a pipeline)

)

Events

View Source
var (
	PipelineCancelCommand = event.PipelineCancel{}
	PipelinePlanCommand   = event.PipelinePlan{}
	PipelineFinishCommand = event.PipelineFinish{}
	PipelineFailCommand   = event.PipelineFail{}
	PipelineLoadCommand   = event.PipelineLoad{}
	PipelinePauseCommand  = event.PipelinePause{}
	PipelineQueueCommand  = event.PipelineQueue{}
	PipelineResumeCommand = event.PipelineResume{}
	PipelineStartCommand  = event.PipelineStart{}

	StepQueueCommand = event.StepQueue{}
	StepStartCommand = event.StepStart{}

	StepPipelineFinishCommand = event.StepPipelineFinish{} // this command is fired when a child pipeline has finished. This is to inform the parent pipeline to continue the execution
)

Commands

View Source
var ExecutionMode string

Functions

func AddEachForEach

func AddEachForEach(stepForEach *modconfig.StepForEach, evalContext *hcl.EvalContext) *hcl.EvalContext

This function mutates evalContext

func AddLoop

func AddLoop(stepLoop *modconfig.StepLoop, evalContext *hcl.EvalContext) *hcl.EvalContext

This function mutates evalContext

func AddStepCalculatedOutputAsResults

func AddStepCalculatedOutputAsResults(stepName string, stepOutput map[string]interface{}, stepInput *modconfig.Input, evalContext *hcl.EvalContext) (*hcl.EvalContext, error)

This function *mutates* the evalContext passed in

func AddStepPrimitiveOutputAsResults

func AddStepPrimitiveOutputAsResults(stepName string, output *modconfig.Output, evalContext *hcl.EvalContext) (*hcl.EvalContext, error)

func BuildSingleStepExecutionOutput

func BuildSingleStepExecutionOutput(lastStepExecution *StepExecution, stepName string) (map[string]cty.Value, error)

func Category

func Category(category string) map[string]interface{}

func CompletePipelineExecutionStepSemaphore added in v0.3.0

func CompletePipelineExecutionStepSemaphore(pipelineExecutionID string)

func GetPipelineExecutionStepSemaphore added in v0.3.0

func GetPipelineExecutionStepSemaphore(pipelineExecutionID string, stepDefn modconfig.PipelineStep, evalContext *hcl.EvalContext) error

func GetPipelineSemaphore added in v0.3.0

func GetPipelineSemaphore(pipelineDefn *modconfig.Pipeline) error

func GetStepTypeSemaphore added in v0.3.0

func GetStepTypeSemaphore(stepType string) error

func InitGlobalStepSemaphores added in v0.3.0

func InitGlobalStepSemaphores()

func LogEventMessageToFile added in v0.6.0

func LogEventMessageToFile(ctx context.Context, logEntry event.EventLogImpl) error

func ReleasePipelineExecutionStepSemaphore added in v0.3.0

func ReleasePipelineExecutionStepSemaphore(pipelineExecutionID string, stepDefn modconfig.PipelineStep) error

func ReleasePipelineSemaphore added in v0.3.0

func ReleasePipelineSemaphore(pipelineDefn *modconfig.Pipeline) error

func ReleaseStepTypeSemaphore added in v0.3.0

func ReleaseStepTypeSemaphore(stepType string)

func SaveEventToSQLite added in v0.3.0

func SaveEventToSQLite(db *sql.DB, executionID string, event event.EventLogImpl) error

Types

type Execution

type Execution struct {
	// Unique identifier for this execution.
	ID string `json:"id"`

	// Pipelines triggered by the execution. Even if the pipelines are nested,
	// we maintain a flat list of all pipelines for easy lookup and querying.
	PipelineExecutions map[string]*PipelineExecution `json:"pipeline_executions"`

	Lock *sync.Mutex `json:"-"`
}

Execution represents the current state of an execution. A single execution is tied to a trigger (webhook, cronjob, etc) and may result in multiple pipelines being executed.

func NewExecution

func NewExecution(ctx context.Context, opts ...ExecutionOption) (*Execution, error)

func (*Execution) AddConnectionsToEvalContext added in v1.0.0

func (ex *Execution) AddConnectionsToEvalContext(evalContext *hcl.EvalContext, stepDefn modconfig.PipelineStep, pipelineDefn *modconfig.Pipeline) (*hcl.EvalContext, error)

func (*Execution) AddCredentialsToEvalContext

func (ex *Execution) AddCredentialsToEvalContext(evalContext *hcl.EvalContext, stepDefn modconfig.PipelineStep) (*hcl.EvalContext, error)

This function mutates evalContext

func (*Execution) AppendEventLogEntry

func (ex *Execution) AppendEventLogEntry(logEntry event.EventLogImpl) error

func (*Execution) BuildEvalContext

func (ex *Execution) BuildEvalContext(pipelineDefn *modconfig.Pipeline, pe *PipelineExecution) (*hcl.EvalContext, error)

func (*Execution) LoadProcessDB added in v0.3.0

func (ex *Execution) LoadProcessDB(e *event.Event) ([]event.EventLogImpl, error)

func (*Execution) ParentStepExecution

func (ex *Execution) ParentStepExecution(pipelineExecutionID string) (*StepExecution, error)

ParentStepExecution returns the parent step execution for the given pipeline execution ID.

func (*Execution) PipelineData

func (ex *Execution) PipelineData(pipelineExecutionID string) (map[string]interface{}, error)

func (*Execution) PipelineDefinition

func (ex *Execution) PipelineDefinition(pipelineExecutionID string) (*modconfig.Pipeline, error)

func (*Execution) PipelineStepExecutions

func (ex *Execution) PipelineStepExecutions(pipelineExecutionID, stepName string) []StepExecution

PipelineStepExecutions returns a list of step executions for the given pipeline execution ID and step name.

func (*Execution) PipelineStepOutputs

func (ex *Execution) PipelineStepOutputs(pipelineExecutionID string) (map[string]interface{}, error)

PipelineStepOutputs returns a single map of all outputs from all steps in the given pipeline execution. The map is keyed by the step name. If a step has a ForTemplate then the result is an array of outputs.

func (*Execution) Snapshot

func (ex *Execution) Snapshot(pipelineExecutionID string) (*Snapshot, error)

func (*Execution) StepDefinition

func (ex *Execution) StepDefinition(pipelineExecutionID, stepExecutionID string) (modconfig.PipelineStep, error)

StepDefinition returns the step definition for the given step execution ID.

func (*Execution) StepExecutionNodeRow

func (ex *Execution) StepExecutionNodeRow(panelName string, sd modconfig.PipelineStep, se *StepExecution) SnapshotPanelDataRow

func (*Execution) StepExecutionSnapshotPanels

func (ex *Execution) StepExecutionSnapshotPanels(pipelineExecutionID string, stepFullyQualifiedName string) (map[string]SnapshotPanel, error)

StepExecutionSnapshotPanels will build and return a set of panels to represent the step execution in a dashboard. The panels includes both nodes and edges, depending on the exection of the step - for example, if the step has a for loop then it will be a start node, fan out to loop items and collapse back to an end node.

type ExecutionInMemory added in v0.2.0

type ExecutionInMemory struct {
	Execution

	Events                  []event.EventLogImpl `json:"events"`
	LastProcessedEventIndex int
}

Execution represents the current state of an execution. A single execution is tied to a trigger (webhook, cronjob, etc) and may result in multiple pipelines being executed.

func GetExecution added in v0.2.0

func GetExecution(executionID string) (*ExecutionInMemory, error)

func GetPipelineDefnFromExecution added in v0.2.0

func GetPipelineDefnFromExecution(executionID, pipelineExecutionID string) (*ExecutionInMemory, *modconfig.Pipeline, error)

func LoadExecutionFromProcessDB added in v1.0.0

func LoadExecutionFromProcessDB(e *event.Event) (*ExecutionInMemory, error)

func (*ExecutionInMemory) AddConnectionsToEvalContextFromPipeline added in v1.0.0

func (ex *ExecutionInMemory) AddConnectionsToEvalContextFromPipeline(evalContext *hcl.EvalContext, pipelineDefn *modconfig.Pipeline) (*hcl.EvalContext, error)

func (*ExecutionInMemory) AddCredentialsToEvalContext added in v0.2.0

func (ex *ExecutionInMemory) AddCredentialsToEvalContext(evalContext *hcl.EvalContext, stepDefn modconfig.PipelineStep) (*hcl.EvalContext, error)

This function mutates evalContext

func (*ExecutionInMemory) AddCredentialsToEvalContextFromPipeline added in v0.3.0

func (ex *ExecutionInMemory) AddCredentialsToEvalContextFromPipeline(evalContext *hcl.EvalContext, pipelineDefn *modconfig.Pipeline) (*hcl.EvalContext, error)

func (*ExecutionInMemory) AddEvent added in v0.2.0

func (ex *ExecutionInMemory) AddEvent(evt event.EventLogImpl) error

func (*ExecutionInMemory) AppendEventLogEntry added in v0.2.0

func (ex *ExecutionInMemory) AppendEventLogEntry(logEntry event.EventLogImpl) error

func (*ExecutionInMemory) AppendSerialisedEventLogEntry added in v1.0.0

func (ex *ExecutionInMemory) AppendSerialisedEventLogEntry(logEntry event.EventLogImpl) error

func (*ExecutionInMemory) BuildEvalContext added in v0.2.0

func (ex *ExecutionInMemory) BuildEvalContext(pipelineDefn *modconfig.Pipeline, pe *PipelineExecution) (*hcl.EvalContext, error)

func (*ExecutionInMemory) EndExecution added in v0.3.0

func (ex *ExecutionInMemory) EndExecution() error

func (*ExecutionInMemory) IsPaused added in v1.0.0

func (ex *ExecutionInMemory) IsPaused() bool

func (*ExecutionInMemory) ParentStepExecution added in v0.2.0

func (ex *ExecutionInMemory) ParentStepExecution(pipelineExecutionID string) (*StepExecution, error)

ParentStepExecution returns the parent step execution for the given pipeline execution ID.

func (*ExecutionInMemory) PipelineData added in v0.2.0

func (ex *ExecutionInMemory) PipelineData(pipelineExecutionID string) (map[string]interface{}, error)

func (*ExecutionInMemory) PipelineDefinition added in v0.2.0

func (ex *ExecutionInMemory) PipelineDefinition(pipelineExecutionID string) (*modconfig.Pipeline, error)

func (*ExecutionInMemory) PipelineStepExecutions added in v0.2.0

func (ex *ExecutionInMemory) PipelineStepExecutions(pipelineExecutionID, stepName string) []StepExecution

func (*ExecutionInMemory) PipelineStepOutputs added in v0.2.0

func (ex *ExecutionInMemory) PipelineStepOutputs(pipelineExecutionID string) (map[string]interface{}, error)

PipelineStepOutputs returns a single map of all outputs from all steps in the given pipeline execution. The map is keyed by the step name. If a step has a ForTemplate then the result is an array of outputs.

func (*ExecutionInMemory) ProcessEvents added in v0.2.0

func (ex *ExecutionInMemory) ProcessEvents() error

func (*ExecutionInMemory) StepDefinition added in v0.2.0

func (ex *ExecutionInMemory) StepDefinition(pipelineExecutionID, stepExecutionID string) (modconfig.PipelineStep, error)

StepDefinition returns the step definition for the given step execution ID.

type ExecutionOption

type ExecutionOption func(*Execution) error

ExecutionOption is a function that modifies an Execution instance.

func WithEvent

func WithEvent(e *event.Event) ExecutionOption

func WithID

func WithID(id string) ExecutionOption

func WithLock

func WithLock(lock *sync.Mutex) ExecutionOption

There are only 2 use cases for creator of Execution to provide the lock: 1) pipeline planner, and 2) step for each planner

Any other use case we should let the execution object aquire its own lock

NOTE: ensure that WithLock is called *before* WithEvent is called

type ExecutionStepOutputs

type ExecutionStepOutputs map[string]map[string]interface{}
  "http" = {
     "http_1": {},
     "http_2": {},
  }
}

The first level is grouping the output by the step type The next level group the output by the step name The value can be a StepOutput OR a slice of StepOutput

type PipelineExecution

type PipelineExecution struct {
	// Unique identifier for this pipeline execution
	ID string `json:"id"`
	// The name of the pipeline
	Name string `json:"name"`
	// The input to the pipeline
	Args modconfig.Input `json:"args,omitempty"`

	// The output of the pipeline
	PipelineOutput map[string]interface{} `json:"pipeline_output,omitempty"`

	// The status of the pipeline execution: queued, planned, started, completed, failed
	Status string `json:"status"`

	ResumedAt time.Time `json:"resumed_at,omitempty"`

	// Status of each step on a per-step index basis. Used to determine if dependencies
	// have been met etc. Note that each step may have multiple executions, the status
	// of which are not tracked here.
	// dependencies have been met, etc.
	//
	// The Step Status used to be per-step, however the addition of for_each means that we now need to expand this
	// tracking to include the "index" of the step
	//
	// for_each have 2 type of results: list or map, however in Flowpipe they are both treated as a map,
	// the list is simply a map that the key happens to be a string of "0", "1", "2"
	//
	/*
		The data structure of StepStatus is as follow:
		{
			"echo.echo": {
				"0": {
					xyz
				},
				"1": {
					xyz
				}
			},
			"http.one": {
				"foo": {
					zzz
				},
				"bar": {
					yyy
				}
			}
		}

		echo.echo has a for_each which is a list, so the key is the index of the list

		http.one has a for_each which is a map, so the key is the key of the map

		LOOP

		Loop will be recorded in StepStatus.StepExecution, it's an array
		**/
	StepStatus map[string]map[string]*StepStatus `json:"step_status,omitempty"`

	// If this is a child pipeline, then track it's parent
	ParentStepExecutionID string `json:"parent_step_execution_id,omitempty"`
	ParentExecutionID     string `json:"parent_execution_id,omitempty"`

	// All errors from the step execution + any errors that can be added to the pipeline execution manually
	Errors []modconfig.StepError `json:"errors,omitempty"`

	// Steps triggered by pipelines in the execution.
	StepExecutions map[string]*StepExecution `json:"-"`

	StartTime time.Time `json:"start_time,omitempty"`
	EndTime   time.Time `json:"end_time,omitempty"`
}

PipelineExecution represents the execution of a single types.

func (*PipelineExecution) Fail

func (pe *PipelineExecution) Fail(stepName string, stepError ...modconfig.StepError)

TODO: this is where we collect the failures so the "ShouldFail" test works .. not sure if this is the correct place?

func (*PipelineExecution) FailStep

func (pe *PipelineExecution) FailStep(stepFullyQualifiedName, key, seID string, loopHold, errorHold bool)

func (*PipelineExecution) FinishStep

func (pe *PipelineExecution) FinishStep(stepFullyQualifiedName, key, seID string, loopHold, errorHold bool)

FinishStep marks the given step execution as started.

func (*PipelineExecution) GetExecutionVariables

func (pe *PipelineExecution) GetExecutionVariables() (map[string]cty.Value, error)

*

Arrange the step outputs in a way that it can be used for HCL Expression evaluation

The expressions look something like: step.echo.text_1.text

So we need to arrange the output as such:

"step": {
	"echo": {
		"text_1": {
			"text": "hello world" <-- this is the output from the step
		},
		"text_2": {
			"text": "hello world" <-- this is the output from the step
		},
	},
	"http": {
		"my_http": {
			"response_body": "hello world" <-- this is the output from the step
		},
	},
},
"param": {
	"my_param": "hello world" <-- this is set by the calling function, but maybe we should do it here?
}

func (*PipelineExecution) InitializeStep

func (pe *PipelineExecution) InitializeStep(stepName string)

InitializeStep initializes the step status for the given step.

func (*PipelineExecution) IsCanceled

func (pe *PipelineExecution) IsCanceled() bool

IsCanceled returns true if the pipeline has been canceled

func (*PipelineExecution) IsComplete

func (pe *PipelineExecution) IsComplete() bool

IsComplete returns true if all steps are complete.

func (*PipelineExecution) IsFail

func (pe *PipelineExecution) IsFail() bool

func (*PipelineExecution) IsFinished

func (pe *PipelineExecution) IsFinished() bool

func (*PipelineExecution) IsFinishing

func (pe *PipelineExecution) IsFinishing() bool

func (*PipelineExecution) IsPaused

func (pe *PipelineExecution) IsPaused() bool

IsPaused returns true if the pipeline has been paused

func (*PipelineExecution) IsStepComplete

func (pe *PipelineExecution) IsStepComplete(stepName string) bool

IsStepComplete returns true if all executions of the step are finished.

func (*PipelineExecution) IsStepFail

func (pe *PipelineExecution) IsStepFail(stepName string) bool

func (*PipelineExecution) IsStepInitialized

func (pe *PipelineExecution) IsStepInitialized(stepName string) bool

IsStepInitialized returns true if the step has been initialized.

func (*PipelineExecution) IsStepQueued

func (pe *PipelineExecution) IsStepQueued(stepName string) bool

TODO: this doesn't work for step execution retry, it assumes that the entire step TODO: must be retried

func (*PipelineExecution) QueueStep

func (pe *PipelineExecution) QueueStep(stepFullyQualifiedName, key, seID string)

QueueStep marks the given step execution as queued.

func (*PipelineExecution) ShouldFail

func (pe *PipelineExecution) ShouldFail() bool

func (*PipelineExecution) StartStep

func (pe *PipelineExecution) StartStep(stepFullyQualifiedName, key, seID string)

StartStep marks the given step execution as started.

type Snapshot

type Snapshot struct {
	SchemaVersion string                   `json:"schema_version"`
	StartTime     string                   `json:"start_time"`
	EndTime       string                   `json:"end_time"`
	Layout        SnapshotLayout           `json:"layout"`
	Panels        map[string]SnapshotPanel `json:"panels"`
}

type SnapshotLayout

type SnapshotLayout struct {
	Name      string           `json:"name"`
	PanelType string           `json:"panel_type"`
	Children  []SnapshotLayout `json:"children,omitempty"`
}

type SnapshotPanel

type SnapshotPanel struct {
	Dashboard   string                 `json:"dashboard"`
	Name        string                 `json:"name"`
	PanelType   string                 `json:"panel_type"`
	Status      string                 `json:"status"`
	Title       string                 `json:"title,omitempty"`
	DisplayType string                 `json:"display_type,omitempty"`
	Width       int                    `json:"width,omitempty"`
	Data        SnapshotPanelData      `json:"data,omitempty"`
	Properties  map[string]interface{} `json:"properties,omitempty"`
}

type SnapshotPanelData

type SnapshotPanelData struct {
	Columns []SnapshotPanelDataColumn `json:"columns,omitempty"`
	Rows    []SnapshotPanelDataRow    `json:"rows,omitempty"`
}

type SnapshotPanelDataColumn

type SnapshotPanelDataColumn struct {
	Name     string `json:"name"`
	DataType string `json:"data_type"`
}

type SnapshotPanelDataRow

type SnapshotPanelDataRow map[string]interface{}

type StepExecution

type StepExecution struct {
	// Unique identifier for this step execution
	PipelineExecutionID string `json:"pipeline_execution_id"`
	ID                  string `json:"id"`

	// The name of the step in the pipeline definition
	Name string `json:"name"`

	// The status of the step execution: "started", "finished", "failed", "skipped"
	Status string `json:"status"`

	// Input to the step
	Input modconfig.Input `json:"input"`

	// for_each controls
	StepForEach *modconfig.StepForEach `json:"step_for_each,omitempty"`
	StepLoop    *modconfig.StepLoop    `json:"step_loop,omitempty"`
	StepRetry   *modconfig.StepRetry   `json:"step_retry,omitempty"`

	NextStepAction modconfig.NextStepAction `json:"next_step_action,omitempty"`

	// Native/primitive output of the step
	Output *modconfig.Output `json:"output,omitempty"`

	// The output from the Step's output block:
	// output "foo" {
	//    value = <xxx>
	//	}
	//
	StepOutput map[string]interface{} `json:"step_output,omitempty"`

	StartTime time.Time `json:"start_time,omitempty"`
	EndTime   time.Time `json:"end_time,omitempty"`
}

StepExecution represents the execution of a single step in a types. A given step definition may be executed multiple times.

func (*StepExecution) Key

func (se *StepExecution) Key() *string

type StepStatus

type StepStatus struct {
	// When the step is initializing it doesn't yet have any executions.
	// We track it as initializing until the first execution is queued.
	Initializing bool   `json:"initializing"`
	OverralState string `json:"overral_state"`

	//
	// Both LoopHold and ErrorHold must be resolved **before** the "finish" event is called, i.e. it needs to be calculated at the
	// end of "step start command" and "step pipeline finish" command.
	//
	// It can't be calculated at the "finish" event because it's already too late. If the planner see that it has an finish
	// event without either a LoopHold or ErrorHold, it will mark the step as completed or failed
	//
	// Indicates that step is in a loop so we don't mark it as finished
	LoopHold bool `json:"loop_hold"`

	// Indicates that a step is in retry loop so we don't mark it as failed
	ErrorHold bool `json:"error_hold"`

	// Step executions that are queued.
	Queued map[string]bool `json:"queued"`
	// Step executions that are started.
	Started map[string]bool `json:"started"`
	// Step executions that are finished.
	Finished map[string]bool `json:"finished"`
	// Step executions that are failed.
	Failed map[string]bool `json:"failed"`

	// There's the step execution in execution, this is the same but in a list for a given step status
	// The element in this slice should point to the same element in the StepExecutions map (in PipelineExecution)
	StepExecutions []StepExecution `json:"step_executions"`
}

This needs to be a map because if we have a for loop, each loop will have a different step execution id

func (*StepStatus) Fail

func (s *StepStatus) Fail(seID string, loopHold, errorHold bool)

func (*StepStatus) FailCount

func (s *StepStatus) FailCount() int

func (*StepStatus) Finish

func (s *StepStatus) Finish(seID string, loopHold, errorHold bool)

Finish marks the given execution as finished.

func (*StepStatus) FinishCount

func (s *StepStatus) FinishCount() int

func (*StepStatus) IsComplete

func (s *StepStatus) IsComplete() bool

IsComplete returns true if all executions of the step are finished or failed.

func (*StepStatus) IsFail

func (s *StepStatus) IsFail() bool

IsFail returns true if any executions of the step failed.

func (*StepStatus) IsStarted

func (s *StepStatus) IsStarted() bool

func (*StepStatus) Progress

func (s *StepStatus) Progress() int

Progress returns the percentage of executions of the step that are complete.

func (*StepStatus) Queue

func (s *StepStatus) Queue(seID string)

Queue marks the given execution as queued.

func (*StepStatus) Start

func (s *StepStatus) Start(seID string)

Start marks the given execution as started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL