execution

package
v0.2.0-beta.202401080304 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: AGPL-3.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PipelineQueuedEvent   = event.PipelineQueued{}
	PipelineStartedEvent  = event.PipelineStarted{}
	PipelineResumedEvent  = event.PipelineResumed{}
	PipelinePlannedEvent  = event.PipelinePlanned{}
	PipelineCanceledEvent = event.PipelineCanceled{}
	PipelinePausedEvent   = event.PipelinePaused{}
	PipelineFinishedEvent = event.PipelineFinished{}
	PipelineFailedEvent   = event.PipelineFailed{}
	PipelineLoadedEvent   = event.PipelineLoaded{}

	StepQueuedEvent          = event.StepQueued{}
	StepFinishedEvent        = event.StepFinished{} // this is the generic step finish event that is fired by the command.step_start command
	StepForEachPlannedEvent  = event.StepForEachPlanned{}
	StepPipelineStartedEvent = event.StepPipelineStarted{} // this event is fired for a specific step type: pipeline step (step that launches a pipeline)

)

Events

View Source
var (
	PipelineCancelCommand = event.PipelineCancel{}
	PipelinePlanCommand   = event.PipelinePlan{}
	PipelineFinishCommand = event.PipelineFinish{}
	PipelineFailCommand   = event.PipelineFail{}
	PipelineLoadCommand   = event.PipelineLoad{}
	PipelinePauseCommand  = event.PipelinePause{}
	PipelineQueueCommand  = event.PipelineQueue{}
	PipelineResumeCommand = event.PipelineResume{}
	PipelineStartCommand  = event.PipelineStart{}

	StepQueueCommand = event.StepQueue{}
	StepStartCommand = event.StepStart{}

	StepPipelineFinishCommand = event.StepPipelineFinish{} // this command is fired when a child pipeline has finished. This is to inform the parent pipeline to continue the execution
)

Commands

View Source
var ExecutionMode string

Functions

func AddEachForEach

func AddEachForEach(stepForEach *modconfig.StepForEach, evalContext *hcl.EvalContext) *hcl.EvalContext

This function mutates evalContext

func AddLoop

func AddLoop(stepLoop *modconfig.StepLoop, evalContext *hcl.EvalContext) *hcl.EvalContext

This function mutates evalContext

func AddStepCalculatedOutputAsResults

func AddStepCalculatedOutputAsResults(stepName string, stepOutput map[string]interface{}, stepInput *modconfig.Input, evalContext *hcl.EvalContext) (*hcl.EvalContext, error)

This function *mutates* the evalContext passed in

func AddStepPrimitiveOutputAsResults

func AddStepPrimitiveOutputAsResults(stepName string, output *modconfig.Output, evalContext *hcl.EvalContext) (*hcl.EvalContext, error)

func BuildSingleStepExecutionOutput

func BuildSingleStepExecutionOutput(lastStepExecution *StepExecution, stepName string) (map[string]cty.Value, error)

func Category

func Category(category string) map[string]interface{}

func CompleteExecution added in v0.2.0

func CompleteExecution(executionID string) error

func LoadEventStoreEntries

func LoadEventStoreEntries(executionID string) ([]types.EventLogEntry, error)

Types

type Execution

type Execution struct {
	// Unique identifier for this execution.
	ID string `json:"id"`

	// Pipelines triggered by the execution. Even if the pipelines are nested,
	// we maintain a flat list of all pipelines for easy lookup and querying.
	PipelineExecutions map[string]*PipelineExecution `json:"pipeline_executions"`

	Lock *sync.Mutex `json:"-"`
}

Execution represents the current state of an execution. A single execution is tied to a trigger (webhook, cronjob, etc) and may result in multiple pipelines being executed.

func NewExecution

func NewExecution(ctx context.Context, opts ...ExecutionOption) (*Execution, error)

func (*Execution) AddCredentialsToEvalContext

func (ex *Execution) AddCredentialsToEvalContext(evalContext *hcl.EvalContext, stepDefn modconfig.PipelineStep) (*hcl.EvalContext, error)

This function mutates evalContext

func (*Execution) AppendEventLogEntry

func (ex *Execution) AppendEventLogEntry(logEntry types.EventLogEntry) error

func (*Execution) BuildEvalContext

func (ex *Execution) BuildEvalContext(pipelineDefn *modconfig.Pipeline, pe *PipelineExecution) (*hcl.EvalContext, error)

func (*Execution) LoadJSON

func (ex *Execution) LoadJSON(fileName string) error

LoadFromFile loads an execution from a JSON file.

func (*Execution) LoadProcess

func (ex *Execution) LoadProcess(e *event.Event) error

LoadProcess loads the event log file (the .jsonl file) continously and update the ex.PipelineExecutions and ex.StepExecutions

func (*Execution) ParentStepExecution

func (ex *Execution) ParentStepExecution(pipelineExecutionID string) (*StepExecution, error)

ParentStepExecution returns the parent step execution for the given pipeline execution ID.

func (*Execution) PipelineData

func (ex *Execution) PipelineData(pipelineExecutionID string) (map[string]interface{}, error)

func (*Execution) PipelineDefinition

func (ex *Execution) PipelineDefinition(pipelineExecutionID string) (*modconfig.Pipeline, error)

func (*Execution) PipelineStepExecutions

func (ex *Execution) PipelineStepExecutions(pipelineExecutionID, stepName string) []StepExecution

PipelineStepExecutions returns a list of step executions for the given pipeline execution ID and step name.

func (*Execution) PipelineStepOutputs

func (ex *Execution) PipelineStepOutputs(pipelineExecutionID string) (map[string]interface{}, error)

PipelineStepOutputs returns a single map of all outputs from all steps in the given pipeline execution. The map is keyed by the step name. If a step has a ForTemplate then the result is an array of outputs.

func (*Execution) Snapshot

func (ex *Execution) Snapshot(pipelineExecutionID string) (*Snapshot, error)

func (*Execution) StepDefinition

func (ex *Execution) StepDefinition(pipelineExecutionID, stepExecutionID string) (modconfig.PipelineStep, error)

StepDefinition returns the step definition for the given step execution ID.

func (*Execution) StepExecutionNodeRow

func (ex *Execution) StepExecutionNodeRow(panelName string, sd modconfig.PipelineStep, se *StepExecution) SnapshotPanelDataRow

func (*Execution) StepExecutionSnapshotPanels

func (ex *Execution) StepExecutionSnapshotPanels(pipelineExecutionID string, stepFullyQualifiedName string) (map[string]SnapshotPanel, error)

StepExecutionSnapshotPanels will build and return a set of panels to represent the step execution in a dashboard. The panels includes both nodes and edges, depending on the exection of the step - for example, if the step has a for loop then it will be a start node, fan out to loop items and collapse back to an end node.

type ExecutionInMemory added in v0.2.0

type ExecutionInMemory struct {
	Execution

	Events                  []event.EventLogEntry `json:"events"`
	LastProcessedEventIndex int
}

Execution represents the current state of an execution. A single execution is tied to a trigger (webhook, cronjob, etc) and may result in multiple pipelines being executed.

func GetExecution added in v0.2.0

func GetExecution(executionID string) (*ExecutionInMemory, error)

func GetPipelineDefnFromExecution added in v0.2.0

func GetPipelineDefnFromExecution(executionID, pipelineExecutionID string) (*ExecutionInMemory, *modconfig.Pipeline, error)

func (*ExecutionInMemory) AddCredentialsToEvalContext added in v0.2.0

func (ex *ExecutionInMemory) AddCredentialsToEvalContext(evalContext *hcl.EvalContext, stepDefn modconfig.PipelineStep) (*hcl.EvalContext, error)

This function mutates evalContext

func (*ExecutionInMemory) AddEvent added in v0.2.0

func (ex *ExecutionInMemory) AddEvent(evt event.EventLogEntry) error

func (*ExecutionInMemory) AppendEventLogEntry added in v0.2.0

func (ex *ExecutionInMemory) AppendEventLogEntry(logEntry event.EventLogEntry) error

func (*ExecutionInMemory) BuildEvalContext added in v0.2.0

func (ex *ExecutionInMemory) BuildEvalContext(pipelineDefn *modconfig.Pipeline, pe *PipelineExecution) (*hcl.EvalContext, error)

func (*ExecutionInMemory) LoadJSON added in v0.2.0

func (ex *ExecutionInMemory) LoadJSON(fileName string) error

LoadFromFile loads an execution from a JSON file.

func (*ExecutionInMemory) ParentStepExecution added in v0.2.0

func (ex *ExecutionInMemory) ParentStepExecution(pipelineExecutionID string) (*StepExecution, error)

ParentStepExecution returns the parent step execution for the given pipeline execution ID.

func (*ExecutionInMemory) PipelineData added in v0.2.0

func (ex *ExecutionInMemory) PipelineData(pipelineExecutionID string) (map[string]interface{}, error)

func (*ExecutionInMemory) PipelineDefinition added in v0.2.0

func (ex *ExecutionInMemory) PipelineDefinition(pipelineExecutionID string) (*modconfig.Pipeline, error)

func (*ExecutionInMemory) PipelineStepExecutions added in v0.2.0

func (ex *ExecutionInMemory) PipelineStepExecutions(pipelineExecutionID, stepName string) []StepExecution

func (*ExecutionInMemory) PipelineStepOutputs added in v0.2.0

func (ex *ExecutionInMemory) PipelineStepOutputs(pipelineExecutionID string) (map[string]interface{}, error)

PipelineStepOutputs returns a single map of all outputs from all steps in the given pipeline execution. The map is keyed by the step name. If a step has a ForTemplate then the result is an array of outputs.

func (*ExecutionInMemory) ProcessEvents added in v0.2.0

func (ex *ExecutionInMemory) ProcessEvents() error

func (*ExecutionInMemory) SaveToFile added in v0.2.0

func (ex *ExecutionInMemory) SaveToFile() error

func (*ExecutionInMemory) StepDefinition added in v0.2.0

func (ex *ExecutionInMemory) StepDefinition(pipelineExecutionID, stepExecutionID string) (modconfig.PipelineStep, error)

StepDefinition returns the step definition for the given step execution ID.

type ExecutionOption

type ExecutionOption func(*Execution) error

ExecutionOption is a function that modifies an Execution instance.

func WithEvent

func WithEvent(e *event.Event) ExecutionOption

func WithID

func WithID(id string) ExecutionOption

func WithLock

func WithLock(lock *sync.Mutex) ExecutionOption

There are only 2 use cases for creator of Execution to provide the lock: 1) pipeline planner, and 2) step for each planner

Any other use case we should let the execution object aquire its own lock

NOTE: ensure that WithLock is called *before* WithEvent is called

type ExecutionStepOutputs

type ExecutionStepOutputs map[string]map[string]interface{}
  "http" = {
     "http_1": {},
     "http_2": {},
  }
}

The first level is grouping the output by the step type The next level group the output by the step name The value can be a StepOutput OR a slice of StepOutput

type PipelineExecution

type PipelineExecution struct {
	// Unique identifier for this pipeline execution
	ID string `json:"id"`
	// The name of the pipeline
	Name string `json:"name"`
	// The input to the pipeline
	Args modconfig.Input `json:"args,omitempty"`

	// The output of the pipeline
	PipelineOutput map[string]interface{} `json:"pipeline_output,omitempty"`

	// The status of the pipeline execution: queued, planned, started, completed, failed
	Status string `json:"status"`

	// Status of each step on a per-step index basis. Used to determine if dependencies
	// have been met etc. Note that each step may have multiple executions, the status
	// of which are not tracked here.
	// dependencies have been met, etc.
	//
	// The Step Status used to be per-step, however the addition of for_each means that we now need to expand this
	// tracking to include the "index" of the step
	//
	// for_each have 2 type of results: list or map, however in Flowpipe they are both treated as a map,
	// the list is simply a map that the key happens to be a string of "0", "1", "2"
	//
	/*
		The data structure of StepStatus is as follow:
		{
			"echo.echo": {
				"0": {
					xyz
				},
				"1": {
					xyz
				}
			},
			"http.one": {
				"foo": {
					zzz
				},
				"bar": {
					yyy
				}
			}
		}

		echo.echo has a for_each which is a list, so the key is the index of the list

		http.one has a for_each which is a map, so the key is the key of the map

		LOOP

		Loop will be recorded in StepStatus.StepExecution, it's an array
		**/
	StepStatus map[string]map[string]*StepStatus `json:"step_status,omitempty"`

	// If this is a child pipeline, then track it's parent
	ParentStepExecutionID string `json:"parent_step_execution_id,omitempty"`
	ParentExecutionID     string `json:"parent_execution_id,omitempty"`

	// All errors from the step execution + any errors that can be added to the pipeline execution manually
	Errors []modconfig.StepError `json:"errors,omitempty"`

	// Steps triggered by pipelines in the execution.
	StepExecutions map[string]*StepExecution `json:"-"`

	StartTime time.Time `json:"start_time,omitempty"`
	EndTime   time.Time `json:"end_time,omitempty"`
}

PipelineExecution represents the execution of a single types.

func (*PipelineExecution) Fail

func (pe *PipelineExecution) Fail(stepName string, stepError ...modconfig.StepError)

TODO: this is where we collect the failures so the "ShouldFail" test works .. not sure if this is the correct place?

func (*PipelineExecution) FailStep

func (pe *PipelineExecution) FailStep(stepFullyQualifiedName, key, seID string)

func (*PipelineExecution) FinishStep

func (pe *PipelineExecution) FinishStep(stepFullyQualifiedName, key, seID string, loopHold, errorHold bool)

FinishStep marks the given step execution as started.

func (*PipelineExecution) GetExecutionVariables

func (pe *PipelineExecution) GetExecutionVariables() (map[string]cty.Value, error)

*

Arrange the step outputs in a way that it can be used for HCL Expression evaluation

The expressions look something like: step.echo.text_1.text

So we need to arrange the output as such:

"step": {
	"echo": {
		"text_1": {
			"text": "hello world" <-- this is the output from the step
		},
		"text_2": {
			"text": "hello world" <-- this is the output from the step
		},
	},
	"http": {
		"my_http": {
			"response_body": "hello world" <-- this is the output from the step
		},
	},
},
"param": {
	"my_param": "hello world" <-- this is set by the calling function, but maybe we should do it here?
}

func (*PipelineExecution) InitializeStep

func (pe *PipelineExecution) InitializeStep(stepName string)

InitializeStep initializes the step status for the given step.

func (*PipelineExecution) IsCanceled

func (pe *PipelineExecution) IsCanceled() bool

IsCanceled returns true if the pipeline has been canceled

func (*PipelineExecution) IsComplete

func (pe *PipelineExecution) IsComplete() bool

IsComplete returns true if all steps are complete.

func (*PipelineExecution) IsFail

func (pe *PipelineExecution) IsFail() bool

func (*PipelineExecution) IsFinished

func (pe *PipelineExecution) IsFinished() bool

func (*PipelineExecution) IsFinishing

func (pe *PipelineExecution) IsFinishing() bool

func (*PipelineExecution) IsPaused

func (pe *PipelineExecution) IsPaused() bool

IsPaused returns true if the pipeline has been paused

func (*PipelineExecution) IsStepComplete

func (pe *PipelineExecution) IsStepComplete(stepName string) bool

IsStepComplete returns true if all executions of the step are finished.

func (*PipelineExecution) IsStepFail

func (pe *PipelineExecution) IsStepFail(stepName string) bool

func (*PipelineExecution) IsStepInLoopHold

func (pe *PipelineExecution) IsStepInLoopHold(stepName string) bool

func (*PipelineExecution) IsStepInitialized

func (pe *PipelineExecution) IsStepInitialized(stepName string) bool

IsStepInitialized returns true if the step has been initialized.

func (*PipelineExecution) IsStepQueued

func (pe *PipelineExecution) IsStepQueued(stepName string) bool

TODO: this doesn't work for step execution retry, it assumes that the entire step TODO: must be retried

func (*PipelineExecution) QueueStep

func (pe *PipelineExecution) QueueStep(stepFullyQualifiedName, key, seID string)

QueueStep marks the given step execution as queued.

func (*PipelineExecution) ShouldFail

func (pe *PipelineExecution) ShouldFail() bool

func (*PipelineExecution) StartStep

func (pe *PipelineExecution) StartStep(stepFullyQualifiedName, key, seID string)

StartStep marks the given step execution as started.

type Snapshot

type Snapshot struct {
	SchemaVersion string                   `json:"schema_version"`
	StartTime     string                   `json:"start_time"`
	EndTime       string                   `json:"end_time"`
	Layout        SnapshotLayout           `json:"layout"`
	Panels        map[string]SnapshotPanel `json:"panels"`
}

type SnapshotLayout

type SnapshotLayout struct {
	Name      string           `json:"name"`
	PanelType string           `json:"panel_type"`
	Children  []SnapshotLayout `json:"children,omitempty"`
}

type SnapshotPanel

type SnapshotPanel struct {
	Dashboard   string                 `json:"dashboard"`
	Name        string                 `json:"name"`
	PanelType   string                 `json:"panel_type"`
	Status      string                 `json:"status"`
	Title       string                 `json:"title,omitempty"`
	DisplayType string                 `json:"display_type,omitempty"`
	Width       int                    `json:"width,omitempty"`
	Data        SnapshotPanelData      `json:"data,omitempty"`
	Properties  map[string]interface{} `json:"properties,omitempty"`
}

type SnapshotPanelData

type SnapshotPanelData struct {
	Columns []SnapshotPanelDataColumn `json:"columns,omitempty"`
	Rows    []SnapshotPanelDataRow    `json:"rows,omitempty"`
}

type SnapshotPanelDataColumn

type SnapshotPanelDataColumn struct {
	Name     string `json:"name"`
	DataType string `json:"data_type"`
}

type SnapshotPanelDataRow

type SnapshotPanelDataRow map[string]interface{}

type StepExecution

type StepExecution struct {
	// Unique identifier for this step execution
	PipelineExecutionID string `json:"pipeline_execution_id"`
	ID                  string `json:"id"`

	// The name of the step in the pipeline definition
	Name string `json:"name"`

	// The status of the step execution: "started", "finished", "failed", "skipped"
	Status string `json:"status"`

	// Input to the step
	Input modconfig.Input `json:"input"`

	// for_each controls
	StepForEach *modconfig.StepForEach `json:"step_for_each,omitempty"`
	StepLoop    *modconfig.StepLoop    `json:"step_loop,omitempty"`
	StepRetry   *modconfig.StepRetry   `json:"step_retry,omitempty"`

	NextStepAction modconfig.NextStepAction `json:"next_step_action,omitempty"`

	// Native/primitive output of the step
	Output *modconfig.Output `json:"output,omitempty"`

	// The output from the Step's output block:
	// output "foo" {
	//    value = <xxx>
	//	}
	//
	StepOutput map[string]interface{} `json:"step_output,omitempty"`

	StartTime time.Time `json:"start_time,omitempty"`
	EndTime   time.Time `json:"end_time,omitempty"`
}

StepExecution represents the execution of a single step in a types. A given step definition may be executed multiple times.

func (*StepExecution) Key

func (se *StepExecution) Key() *string

type StepStatus

type StepStatus struct {
	// When the step is initializing it doesn't yet have any executions.
	// We track it as initializing until the first execution is queued.
	Initializing bool   `json:"initializing"`
	OverralState string `json:"overral_state"`

	//
	// Both LoopHold and ErrorHold must be resolved **before** the "finish" event is called, i.e. it needs to be calculated at the
	// end of "step start command" and "step pipeline finish" command.
	//
	// It can't be calculated at the "finish" event because it's already too late. If the planner see that it has an finish
	// event without either a LoopHold or ErrorHold, it will mark the step as completed or failed
	//
	// Indicates that step is in a loop so we don't mark it as finished
	LoopHold bool `json:"loop_hold"`

	// Indicates that a step is in retry loop so we don't mark it as failed
	ErrorHold bool `json:"error_hold"`

	// Step executions that are queued.
	Queued map[string]bool `json:"queued"`
	// Step executions that are started.
	Started map[string]bool `json:"started"`
	// Step executions that are finished.
	Finished map[string]bool `json:"finished"`
	// Step executions that are failed.
	Failed map[string]bool `json:"failed"`

	// There's the step execution in execution, this is the same but in a list for a given step status
	// The element in this slice should point to the same element in the StepExecutions map (in PipelineExecution)
	StepExecutions []StepExecution `json:"step_executions"`
}

This needs to be a map because if we have a for loop, each loop will have a different step execution id

func (*StepStatus) Fail

func (s *StepStatus) Fail(seID string)

func (*StepStatus) FailCount

func (s *StepStatus) FailCount() int

func (*StepStatus) Finish

func (s *StepStatus) Finish(seID string, loopHold, errorHold bool)

Finish marks the given execution as finished.

func (*StepStatus) FinishCount

func (s *StepStatus) FinishCount() int

func (*StepStatus) IsComplete

func (s *StepStatus) IsComplete() bool

IsComplete returns true if all executions of the step are finished or failed.

func (*StepStatus) IsFail

func (s *StepStatus) IsFail() bool

IsFail returns true if any executions of the step failed.

func (*StepStatus) IsStarted

func (s *StepStatus) IsStarted() bool

func (*StepStatus) Progress

func (s *StepStatus) Progress() int

Progress returns the percentage of executions of the step that are complete.

func (*StepStatus) Queue

func (s *StepStatus) Queue(seID string)

Queue marks the given execution as queued.

func (*StepStatus) Start

func (s *StepStatus) Start(seID string)

Start marks the given execution as started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL