compiler

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 13 Imported by: 5

Documentation

Overview

Package compiler provides compiler services for flyte workflows. It performs static analysis on the Workflow and produces CompilerErrors for any detected issue. A flyte workflow should only be considered valid for execution if it passed through the compiler first. The intended usage for the compiler is as follows: 1) Call GetRequirements(...) and load/retrieve all tasks/workflows referenced in the response. 2) Call CompileWorkflow(...) and make sure it reports no errors. 3) Use one of the transformer packages (e.g. transformer/k8s) to build the final executable workflow.

               +-------------------+
               | start(StartNode)  |
               +-------------------+
                 |
                 | wf_input
                 v
+--------+     +-------------------+
| static | --> | node_1(TaskNode)  |
+--------+     +-------------------+
  |              |
  |              | x
  |              v
  |            +-------------------+
  +----------> | node_2(TaskNode)  |
               +-------------------+
                 |
                 | n2_output
                 v
               +-------------------+
               |   end(EndNode)    |
               +-------------------+
               +-------------------+
               | Workflow Id: repo |
               +-------------------+

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CompileTask

func CompileTask(task *core.TaskTemplate) (*core.CompiledTask, error)

CompileTask compiles a given Task into an executable Task. It validates all required parameters and ensures a Task is well-formed.

func CompileWorkflow

func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.WorkflowTemplate, tasks []*core.CompiledTask,
	launchPlans []c.InterfaceProvider) (*core.CompiledWorkflowClosure, error)

CompileWorkflow compiles a flyte workflow a and all of its dependencies into a single executable Workflow. Refer to GetRequirements() to obtain a list of launchplan and Task ids to load/compile first. Returns an executable Workflow (if no errors are found) or a list of errors that must be addressed before the Workflow can be executed. Cast the error to errors.CompileErrors to inspect individual errors.

Example (Basic)
inputWorkflow := &core.WorkflowTemplate{
	Id: &core.Identifier{Name: "repo"},
	Interface: &core.TypedInterface{
		Inputs:  createEmptyVariableMap(),
		Outputs: createEmptyVariableMap(),
	},
	Nodes: []*core.Node{
		{
			Id: "FirstNode",
			Target: &core.Node_TaskNode{
				TaskNode: &core.TaskNode{
					Reference: &core.TaskNode_ReferenceId{
						ReferenceId: &core.Identifier{Name: "task_123"},
					},
				},
			},
		},
	},
}

// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
	fmt.Printf("failed to get requirements. Error: %v", err)
	return
}

fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
	strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
	strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))

// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
tasks := []*core.TaskTemplate{
	{
		Id: &core.Identifier{Name: "task_123"},
		Interface: &core.TypedInterface{
			Inputs:  createEmptyVariableMap(),
			Outputs: createEmptyVariableMap(),
		},
		Target: &core.TaskTemplate_Container{
			Container: &core.Container{
				Image:   "image://",
				Command: []string{"cmd"},
				Args:    []string{"args"},
			},
		},
	},
}

compiledTasks := make([]*core.CompiledTask, 0, len(tasks))
for _, task := range tasks {
	compiledTask, err := CompileTask(task)
	if err != nil {
		fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err)
		return
	}

	compiledTasks = append(compiledTasks, compiledTask)
}

output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows)
fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary))
fmt.Printf("Compile Errors: %v\n", errs)
Output:

Needed Tasks: [task_123], Needed Workflows []
Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo""];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "FirstNode()" [label="execution",style="dashed"];"FirstNode()" -> "end-node(end)" [label="execution",style="dashed"];}
Compile Errors: <nil>
Example (CompileErrors)
inputWorkflow := &core.WorkflowTemplate{
	Id: &core.Identifier{Name: "repo"},
	Interface: &core.TypedInterface{
		Inputs:  createEmptyVariableMap(),
		Outputs: createEmptyVariableMap(),
	},
	Nodes: []*core.Node{
		{
			Target: &core.Node_TaskNode{
				TaskNode: &core.TaskNode{
					Reference: &core.TaskNode_ReferenceId{
						ReferenceId: &core.Identifier{Name: "task_123"},
					},
				},
			},
		},
	},
}

// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
	fmt.Printf("Failed to get requirements. Error: %v", err)
	return
}

fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
	strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
	strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))

// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
_, errs := CompileWorkflow(inputWorkflow, subWorkflows, []*core.CompiledTask{}, workflows)
fmt.Printf("Compile Errors: %v\n", errs)
Output:

Needed Tasks: [task_123], Needed Workflows []
Compile Errors: Collected Errors: 1
	Error 0: Code: TaskReferenceNotFound, Node Id: start-node, Description: Referenced Task [name:"task_123"] not found.
Example (InputsOutputsBinding)
inputWorkflow := &core.WorkflowTemplate{
	Id: &core.Identifier{Name: "repo"},
	Interface: &core.TypedInterface{
		Inputs: createVariableMap(map[string]*core.Variable{
			"wf_input": {
				Type: getIntegerLiteralType(),
			},
		}),
		Outputs: createVariableMap(map[string]*core.Variable{
			"wf_output": {
				Type: getIntegerLiteralType(),
			},
		}),
	},
	Nodes: []*core.Node{
		{
			Id: "node_1",
			Target: &core.Node_TaskNode{
				TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}},
			},
			Inputs: []*core.Binding{
				newVarBinding("", "wf_input", "x"), newIntegerBinding(124, "y"),
			},
		},
		{
			Id: "node_2",
			Target: &core.Node_TaskNode{
				TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}},
			},
			Inputs: []*core.Binding{
				newIntegerBinding(124, "y"), newVarBinding("node_1", "x", "x"),
			},
			OutputAliases: []*core.Alias{{Var: "x", Alias: "n2_output"}},
		},
	},
	Outputs: []*core.Binding{newVarBinding("node_2", "n2_output", "wf_output")},
}

// Detect what other graphs/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
	fmt.Printf("Failed to get requirements. Error: %v", err)
	return
}

fmt.Printf("Needed Tasks: [%v], Needed Graphs [%v]\n",
	strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
	strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))

// Replace with logic to satisfy the requirements
graphs := make([]common.InterfaceProvider, 0)
inputTasks := []*core.TaskTemplate{
	{
		Id:       &core.Identifier{Name: "task_123"},
		Metadata: &core.TaskMetadata{},
		Interface: &core.TypedInterface{
			Inputs: createVariableMap(map[string]*core.Variable{
				"x": {
					Type: getIntegerLiteralType(),
				},
				"y": {
					Type: getIntegerLiteralType(),
				},
			}),
			Outputs: createVariableMap(map[string]*core.Variable{
				"x": {
					Type: getIntegerLiteralType(),
				},
			}),
		},
		Target: &core.TaskTemplate_Container{
			Container: &core.Container{
				Image:   "image://",
				Command: []string{"cmd"},
				Args:    []string{"args"},
			},
		},
	},
}

// Compile all tasks before proceeding with Workflow
compiledTasks := make([]*core.CompiledTask, 0, len(inputTasks))
for _, task := range inputTasks {
	compiledTask, err := CompileTask(task)
	if err != nil {
		fmt.Printf("Failed to compile task [%v]. Error: %v", task.Id, err)
		return
	}

	compiledTasks = append(compiledTasks, compiledTask)
}

output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, graphs)
if errs != nil {
	fmt.Printf("Compile Errors: %v\n", errs)
} else {
	fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary))
}
Output:

Needed Tasks: [task_123], Needed Graphs []
Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo""];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "node_1()" [label="wf_input",style="solid"];"node_1()" -> "node_2()" [label="x",style="solid"];"static" -> "node_1()" [label=""];"node_2()" -> "end-node(end)" [label="n2_output",style="solid"];"static" -> "node_2()" [label=""];}

Types

type LaunchPlanInterfaceProvider

type LaunchPlanInterfaceProvider struct {
	// contains filtered or unexported fields
}

This object is meant to satisfy github.com/flyteorg/flytepropeller/pkg/compiler/common.InterfaceProvider This file is pretty much copied from Admin, (sorry for the link, a real link made go mod import admin) github-dot-com/flyteorg/flyteadmin/blob/1acce744b8c7839ab77a0eb1ed922905af15baa5/pkg/workflowengine/impl/interface_provider.go but that implementation relies on the internal Admin Gorm model. We should consider deprecating that one in favor of this one as Admin already has a dependency on the Propeller compiler.

func NewLaunchPlanInterfaceProvider

func NewLaunchPlanInterfaceProvider(launchPlan *admin.LaunchPlan) *LaunchPlanInterfaceProvider

func (*LaunchPlanInterfaceProvider) GetExpectedInputs

func (p *LaunchPlanInterfaceProvider) GetExpectedInputs() *core.ParameterMap

func (*LaunchPlanInterfaceProvider) GetExpectedOutputs

func (p *LaunchPlanInterfaceProvider) GetExpectedOutputs() *core.VariableMap

func (*LaunchPlanInterfaceProvider) GetID

type LaunchPlanRefIdentifier

type LaunchPlanRefIdentifier = common.Identifier

type TaskIdentifier

type TaskIdentifier = common.Identifier

type WorkflowExecutionRequirements

type WorkflowExecutionRequirements struct {
	// contains filtered or unexported fields
}

WorkflowExecutionRequirements represents the set of required resources for a given Workflow's execution. All the resources should be loaded beforehand and passed to the compiler.

func GetRequirements

func GetRequirements(fg *core.WorkflowTemplate, subWfs []*core.WorkflowTemplate) (reqs WorkflowExecutionRequirements, err error)

GetRequirements computes requirements for a given Workflow.

func (WorkflowExecutionRequirements) GetRequiredLaunchPlanIds

func (g WorkflowExecutionRequirements) GetRequiredLaunchPlanIds() []LaunchPlanRefIdentifier

GetRequiredLaunchPlanIds gets a slice of required Workflow ids to load.

func (WorkflowExecutionRequirements) GetRequiredTaskIds

func (g WorkflowExecutionRequirements) GetRequiredTaskIds() []TaskIdentifier

GetRequiredTaskIds gets a slice of required Task ids to load.

Directories

Path Synopsis
This package defines the intermediate layer that the compiler builds and transformers accept.
This package defines the intermediate layer that the compiler builds and transformers accept.
This package is a central repository of all compile errors that can be reported.
This package is a central repository of all compile errors that can be reported.
transformers
k8s
This package converts the output of the compiler into a K8s resource for propeller to execute.
This package converts the output of the compiler into a K8s resource for propeller to execute.
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings...
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL