Documentation ¶
Overview ¶
Package compiler provides compiler services for kozmo workflows. It performs static analysis on the Workflow and produces CompilerErrors for any detected issue. A kozmo workflow should only be considered valid for execution if it passed through the compiler first. The intended usage for the compiler is as follows: 1) Call GetRequirements(...) and load/retrieve all tasks/workflows referenced in the response. 2) Call CompileWorkflow(...) and make sure it reports no errors. 3) Use one of the transformer packages (e.g. transformer/k8s) to build the final executable workflow.
+-------------------+ | start(StartNode) | +-------------------+ | | wf_input v +--------+ +-------------------+ | static | --> | node_1(TaskNode) | +--------+ +-------------------+ | | | | x | v | +-------------------+ +----------> | node_2(TaskNode) | +-------------------+ | | n2_output v +-------------------+ | end(EndNode) | +-------------------+ +-------------------+ | Workflow Id: repo | +-------------------+
Index ¶
- func CompileTask(task *core.TaskTemplate) (*core.CompiledTask, error)
- func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.WorkflowTemplate, ...) (*core.CompiledWorkflowClosure, error)
- type LaunchPlanInterfaceProvider
- type LaunchPlanRefIdentifier
- type TaskIdentifier
- type WorkflowExecutionRequirements
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompileTask ¶
func CompileTask(task *core.TaskTemplate) (*core.CompiledTask, error)
CompileTask compiles a given Task into an executable Task. It validates all required parameters and ensures a Task is well-formed.
func CompileWorkflow ¶
func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.WorkflowTemplate, tasks []*core.CompiledTask, launchPlans []c.InterfaceProvider) (*core.CompiledWorkflowClosure, error)
CompileWorkflow compiles a kozmo workflow a and all of its dependencies into a single executable Workflow. Refer to GetRequirements() to obtain a list of launchplan and Task ids to load/compile first. Returns an executable Workflow (if no errors are found) or a list of errors that must be addressed before the Workflow can be executed. Cast the error to errors.CompileErrors to inspect individual errors.
Example (Basic) ¶
inputWorkflow := &core.WorkflowTemplate{ Id: &core.Identifier{Name: "repo"}, Interface: &core.TypedInterface{ Inputs: createEmptyVariableMap(), Outputs: createEmptyVariableMap(), }, Nodes: []*core.Node{ { Id: "FirstNode", Target: &core.Node_TaskNode{ TaskNode: &core.TaskNode{ Reference: &core.TaskNode_ReferenceId{ ReferenceId: &core.Identifier{Name: "task_123"}, }, }, }, }, }, } // Detect what other workflows/tasks does this coreWorkflow reference subWorkflows := make([]*core.WorkflowTemplate, 0) reqs, err := GetRequirements(inputWorkflow, subWorkflows) if err != nil { fmt.Printf("failed to get requirements. Error: %v", err) return } fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n", strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","), strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ",")) // Replace with logic to satisfy the requirements workflows := make([]common.InterfaceProvider, 0) tasks := []*core.TaskTemplate{ { Id: &core.Identifier{Name: "task_123"}, Interface: &core.TypedInterface{ Inputs: createEmptyVariableMap(), Outputs: createEmptyVariableMap(), }, Target: &core.TaskTemplate_Container{ Container: &core.Container{ Image: "image://", Command: []string{"cmd"}, Args: []string{"args"}, }, }, }, } compiledTasks := make([]*core.CompiledTask, 0, len(tasks)) for _, task := range tasks { compiledTask, err := CompileTask(task) if err != nil { fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err) return } compiledTasks = append(compiledTasks, compiledTask) } output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows) fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary)) fmt.Printf("Compile Errors: %v\n", errs)
Output: Needed Tasks: [task_123], Needed Workflows [] Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo""];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "FirstNode()" [label="execution",style="dashed"];"FirstNode()" -> "end-node(end)" [label="execution",style="dashed"];} Compile Errors: <nil>
Example (CompileErrors) ¶
inputWorkflow := &core.WorkflowTemplate{ Id: &core.Identifier{Name: "repo"}, Interface: &core.TypedInterface{ Inputs: createEmptyVariableMap(), Outputs: createEmptyVariableMap(), }, Nodes: []*core.Node{ { Target: &core.Node_TaskNode{ TaskNode: &core.TaskNode{ Reference: &core.TaskNode_ReferenceId{ ReferenceId: &core.Identifier{Name: "task_123"}, }, }, }, }, }, } // Detect what other workflows/tasks does this coreWorkflow reference subWorkflows := make([]*core.WorkflowTemplate, 0) reqs, err := GetRequirements(inputWorkflow, subWorkflows) if err != nil { fmt.Printf("Failed to get requirements. Error: %v", err) return } fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n", strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","), strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ",")) // Replace with logic to satisfy the requirements workflows := make([]common.InterfaceProvider, 0) _, errs := CompileWorkflow(inputWorkflow, subWorkflows, []*core.CompiledTask{}, workflows) fmt.Printf("Compile Errors: %v\n", errs)
Output: Needed Tasks: [task_123], Needed Workflows [] Compile Errors: Collected Errors: 1 Error 0: Code: TaskReferenceNotFound, Node Id: start-node, Description: Referenced Task [name:"task_123"] not found.
Example (InputsOutputsBinding) ¶
inputWorkflow := &core.WorkflowTemplate{ Id: &core.Identifier{Name: "repo"}, Interface: &core.TypedInterface{ Inputs: createVariableMap(map[string]*core.Variable{ "wf_input": { Type: getIntegerLiteralType(), }, }), Outputs: createVariableMap(map[string]*core.Variable{ "wf_output": { Type: getIntegerLiteralType(), }, }), }, Nodes: []*core.Node{ { Id: "node_1", Target: &core.Node_TaskNode{ TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}}, }, Inputs: []*core.Binding{ newVarBinding("", "wf_input", "x"), newIntegerBinding(124, "y"), }, }, { Id: "node_2", Target: &core.Node_TaskNode{ TaskNode: &core.TaskNode{Reference: &core.TaskNode_ReferenceId{ReferenceId: &core.Identifier{Name: "task_123"}}}, }, Inputs: []*core.Binding{ newIntegerBinding(124, "y"), newVarBinding("node_1", "x", "x"), }, OutputAliases: []*core.Alias{{Var: "x", Alias: "n2_output"}}, }, }, Outputs: []*core.Binding{newVarBinding("node_2", "n2_output", "wf_output")}, } // Detect what other graphs/tasks does this coreWorkflow reference subWorkflows := make([]*core.WorkflowTemplate, 0) reqs, err := GetRequirements(inputWorkflow, subWorkflows) if err != nil { fmt.Printf("Failed to get requirements. Error: %v", err) return } fmt.Printf("Needed Tasks: [%v], Needed Graphs [%v]\n", strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","), strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ",")) // Replace with logic to satisfy the requirements graphs := make([]common.InterfaceProvider, 0) inputTasks := []*core.TaskTemplate{ { Id: &core.Identifier{Name: "task_123"}, Metadata: &core.TaskMetadata{}, Interface: &core.TypedInterface{ Inputs: createVariableMap(map[string]*core.Variable{ "x": { Type: getIntegerLiteralType(), }, "y": { Type: getIntegerLiteralType(), }, }), Outputs: createVariableMap(map[string]*core.Variable{ "x": { Type: getIntegerLiteralType(), }, }), }, Target: &core.TaskTemplate_Container{ Container: &core.Container{ Image: "image://", Command: []string{"cmd"}, Args: []string{"args"}, }, }, }, } // Compile all tasks before proceeding with Workflow compiledTasks := make([]*core.CompiledTask, 0, len(inputTasks)) for _, task := range inputTasks { compiledTask, err := CompileTask(task) if err != nil { fmt.Printf("Failed to compile task [%v]. Error: %v", task.Id, err) return } compiledTasks = append(compiledTasks, compiledTask) } output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, graphs) if errs != nil { fmt.Printf("Compile Errors: %v\n", errs) } else { fmt.Printf("Compiled Workflow in GraphViz: %v\n", visualize.ToGraphViz(output.Primary)) }
Output: Needed Tasks: [task_123], Needed Graphs [] Compiled Workflow in GraphViz: digraph G {rankdir=TB;workflow[label="Workflow Id: name:"repo""];node[style=filled];"start-node(start)" [shape=Msquare];"start-node(start)" -> "node_1()" [label="wf_input",style="solid"];"node_1()" -> "node_2()" [label="x",style="solid"];"static" -> "node_1()" [label=""];"node_2()" -> "end-node(end)" [label="n2_output",style="solid"];"static" -> "node_2()" [label=""];}
Types ¶
type LaunchPlanInterfaceProvider ¶
type LaunchPlanInterfaceProvider struct {
// contains filtered or unexported fields
}
This object is meant to satisfy github.com/kozmoai/kozmopropeller/pkg/compiler/common.InterfaceProvider This file is pretty much copied from Admin, (sorry for the link, a real link made go mod import admin) github-dot-com/kozmoai/kozmoadmin/blob/1acce744b8c7839ab77a0eb1ed922905af15baa5/pkg/workflowengine/impl/interface_provider.go but that implementation relies on the internal Admin Gorm model. We should consider deprecating that one in favor of this one as Admin already has a dependency on the Propeller compiler.
func NewLaunchPlanInterfaceProvider ¶
func NewLaunchPlanInterfaceProvider(launchPlan admin.LaunchPlan) *LaunchPlanInterfaceProvider
func (*LaunchPlanInterfaceProvider) GetExpectedInputs ¶
func (p *LaunchPlanInterfaceProvider) GetExpectedInputs() *core.ParameterMap
func (*LaunchPlanInterfaceProvider) GetExpectedOutputs ¶
func (p *LaunchPlanInterfaceProvider) GetExpectedOutputs() *core.VariableMap
func (*LaunchPlanInterfaceProvider) GetID ¶
func (p *LaunchPlanInterfaceProvider) GetID() *core.Identifier
type LaunchPlanRefIdentifier ¶
type LaunchPlanRefIdentifier = common.Identifier
type TaskIdentifier ¶
type TaskIdentifier = common.Identifier
type WorkflowExecutionRequirements ¶
type WorkflowExecutionRequirements struct {
// contains filtered or unexported fields
}
WorkflowExecutionRequirements represents the set of required resources for a given Workflow's execution. All the resources should be loaded beforehand and passed to the compiler.
func GetRequirements ¶
func GetRequirements(fg *core.WorkflowTemplate, subWfs []*core.WorkflowTemplate) (reqs WorkflowExecutionRequirements, err error)
GetRequirements computes requirements for a given Workflow.
func (WorkflowExecutionRequirements) GetRequiredLaunchPlanIds ¶
func (g WorkflowExecutionRequirements) GetRequiredLaunchPlanIds() []LaunchPlanRefIdentifier
GetRequiredLaunchPlanIds gets a slice of required Workflow ids to load.
func (WorkflowExecutionRequirements) GetRequiredTaskIds ¶
func (g WorkflowExecutionRequirements) GetRequiredTaskIds() []TaskIdentifier
GetRequiredTaskIds gets a slice of required Task ids to load.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
This package defines the intermediate layer that the compiler builds and transformers accept.
|
This package defines the intermediate layer that the compiler builds and transformers accept. |
This package is a central repository of all compile errors that can be reported.
|
This package is a central repository of all compile errors that can be reported. |
transformers
|
|
k8s
This package converts the output of the compiler into a K8s resource for propeller to execute.
|
This package converts the output of the compiler into a K8s resource for propeller to execute. |
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings...
|
This package contains validators for all elements of the workflow spec (node, task, branch, interface, bindings... |