orchid

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

README

Orchid: Dynamic Dataflow Orchestration

Go Report Card GoDoc English 日本語

Orchid is a lightweight Go framework for orchestrating data-driven workflows. It combines concepts from Flow-Based Programming (FBP) and workflow engines to provide a simple, fault-tolerant solution for managing data flows and task execution within applications.

Inspired by tools like Uber Cadence and Temporal.io, Orchid offers a minimalistic approach to workflow orchestration without the complexity and heavy dependencies often found in other solutions. It was created out of a lack of a simple executor such as temporalite, but is no longer maintained. Other solutions are often complex with many dependencies.

Orchid is designed with the following principles in mind:

  • Simplicity: Designed to be easy to understand and use, with minimal boilerplate. Orchid's core is less than 2k lines of code.
  • Data Passing: Facilitates data passing between nodes using byte arrays, aligning with flow-based programming paradigms.
  • Dynamic Routing: Supports dynamic routing based on data and error conditions, enabling flexible workflow logic.
  • Sequential Execution by Default: Executes workflows sequentially by default. When a node has multiple outgoing edges, it deterministically follows the first path unless parallelism is explicitly enabled.
  • Explicit Parallelism: Allows explicit parallel execution through the DisableSequentialFlow node option. By setting this option on a node, you enable it to execute multiple outgoing paths in parallel, making parallelism an intentional design choice.
  • Hybrid Execution: Allows both synchronous and asynchronous task execution.
  • Modular: Encourages modular code by encapsulating functionality within independent nodes.
  • Retry Policies: Allows to define custom retry policies for each task.
  • Context Propagation: Utilizes Go's context package to pass metadata and cancellation signals throughout workflow execution.
  • Graph-Based Workflows: Workflows are directed graphs, making complex processes easier to visualize.
  • Fault Tolerance: Provides optional state persistence and workflow recovery.
  • JSON Workflow Definitions: Supports a human-readable DSL, a basic JSON format for defining workflows, in addition to code-based definitions.

Use Cases:

  • Data Pipelines: Orchestrate data processing tasks, including data ingestion, transformation, and analysis.
  • Automation Workflows: Manage automated tasks and interactions with machine or human agents.
  • Service Orchestration: Coordinate interactions between application services and manage distributed API workflows.
  • Human-in-the-Loop Processes: Handle workflows that require human intervention or approval.
  • Agent-Based Models: Manage the interactions and behaviors of agents in simulations.

Connections to Related Concepts:

  • Workflow Engines: Orchid shares foundational similarities with other DAG-based workflow engines like Airflow and Prefect but focuses on simplicity and ease of use and does allow for loops for retries. It provides a streamlined configuration format, error-based routing, and supports both local and remote execution.
  • Flow-Based Programming: The serial execution and data passing between nodes resemble flow-based programming paradigms, but Orchid offers additional features like triggers, callbacks, and conditional branching for more complex orchestration.
  • Agent-Based Modeling (ABM): Orchid's ability to represent dependencies and trigger workflows based on events aligns well with the interaction patterns of agent-based models.
  • Large language model (LLM) agents: The flexibility to pass data and handle conditional branching can be applied in orchestrating tasks involving language models.

Trade-offs, Constraints, and Considerations

Orchid is a lot simpler than other workflow engines, designed to be easy to understand and a low number of lines of code. This simplicity comes with trade-offs and constraints that users should be aware of:

  • Fixed Activity Interface: For simple fault tolerance, all activities must adhere to a simple interface with an input and output byte array. This design choice simplifies the implementation but may require additional logic for complex data types client side.
  • Sequential Flow by Default: Orchid executes nodes sequentially, following a single path even if multiple outgoing edges are present. When a node has multiple outgoing edges, it deterministically picks the first next node based on the order nodes were added, ensuring consistent execution. This default behavior helps prevent unintended parallelism.
  • Enabling Parallelism: To enable parallel execution from a node, you must explicitly set the DisableSequentialFlow option on that node. By doing so, you allow the node to execute multiple outgoing paths in parallel. This explicit control ensures that parallelism occurs only where intended.
wf.AddNode(orchid.NewNode("ParallelNode", orchid.WithDisableSequentialFlow())).
    // other nodes...

A workflow ends with a single node, which can be a merge point where parallel branches converge. A merge point is a special node type "reducer" with the signature func([][]byte) []byte that combines the outputs of parallel branches. See the examples/parallel example for more details.

  • Dynamic Parallelism via Activities: Orchid can support dynamic fan-out/fan-in or dynamic parallelism scenarios, where the number of parallel tasks is determined at runtime. This is achieved by using activities to spawn child workflows or goroutines. Note that panics in spawned, client side goroutines MAY be able to crash the program and cannot be recovered by orchid. See the examples/dynamic-parallelism example for more details.
  • Activity Panic Recovery: Activity panics can be recovered in both top-level and child workflows. Persisted workflows can therefore continue after code fixes. This only works for panics in activities that don't occur in goroutines spawned by activities. The functional option WithFailWorkflowOnActivityPanic of the orchestrator can be used terminate workflow and mark them non-restorable after activity panics. In situations where panics are expected, disable activity panic recovery with WithDisableActivityPanicRecovery.
  • Dynamic Routing Over Explicit Edges: With dynamic routing is used (e.g., orchid.RouteTo("nodeName")), the orchestrator can route dynamically to a path.
  • Forced Transition Support: By default, orchid does not support forced transitions between nodes. This is to prevent unintended side effects and ensure that workflows are executed as designed. If forced transitions are required, enable this on the orchestrator with WithForcedTransitions.
  • Idempotency and Error Handling: Orchid ensures that workflows and activities have unique execution IDs to prevent duplicate processing. All activities are executed at-least-once, so they MUST BE idempotent to ensure that retries do not cause unintended side effects. Pass the ActivityToken to external systems to ensure idempotency and fault tolerance.
  • Atomicity and Side Effects: Activities SHALL perform atomic operations, non-atomicity (like partial database updates) may leave the system in an inconsistent state if they fail midway.
  • Avoid shared mutable state in Activities: Activities SHOULD NOT share mutable state between them, as this can lead to race conditions and data corruption. Use the context package to pass data between activities, atomic datastores or synchronization primitives for shared state.
  • Execution and Recovery Trade-off: If an optional persister is provided, Orchid can recover the state of workflows after failures.
    • Note that passing objects in NodeConfig DOES NOT Work for Restoration. NodeConfig is included in the serialized form of the workflow when exporting or persisting the workflow. The NodeConfig must be serializable into JSON along with the rest of the workflow data.
    • Further to note, the NodeConfig is currently stored in plaintext JSON in the persister.
  • Distributed Execution: Clients can utilize the Activity interface to implement an executor pattern for remote execution of tasks. Each activity executes with a stable ActivityToken that can be used to register callbacks for human-in-the-loop scenarios. The token will be restored in failure cases to ensure the workflow can continue.
Note on LangChain Comparison

Orchid supports workflows with modular task chaining, error handling, and context-passing—concepts similar to LangChain's approach to workflow chaining and agent-driven tasks. While language-agnostic, Orchid can be extended for LLMs and dynamic prompts, offering a Go-based alternative for structured workflows akin to LangChain.

Usage

Install the package:

go get github.com/kyodo-tech/orchid

Define a Workflow: Create a new workflow and add nodes (tasks) and edges (data routes) to define the processing steps and how data moves between them.

wf := orchid.NewWorkflow("example_workflow")
wf.AddNode(orchid.NewNode("task1"))
wf.AddNode(orchid.NewNode("task2"))
wf.Link("task1", "task2")

Or use the fluent API for linear use cases:

wf := orchid.NewWorkflow("example_workflow").
    AddNode(orchid.NewNode("task1")).
    Then(orchid.NewNode("task2"))

Implement an Activity: Define the logic for each task in the workflow.

func task1Handler(ctx context.Context, input []byte) ([]byte, error) {
    // Task logic here
    return output, nil
}

Execute the Workflow: Initialize the orchestrator, register task handlers, and execute the workflow.

o := orchid.NewOrchestrator()
o.RegisterActivity("task1", task1Handler)
o.RegisterActivity("task2", task2Handler)
o.LoadWorkflow(wf)
o.RunTriggers(context.Background(), nil)

Context based logging if slog is used and the orchestrator is set up with it:

logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

o := orchid.NewOrchestrator(
    orchid.WithLogger(logger),
)

Which is available in activities:

orchid.Logger(ctx).Error("Error executing task", "task", taskName, "error", err)

Graph Visualization with Mermaid

Orchid supports importing and exporting workflows in Mermaid format, enabling clear visual representation of workflows and seamless integration with Mermaid-compatible tools. Labels on nodes and edges enhance readability and make the workflows easier to interpret.

Exporting to Mermaid

You can export an Orchid workflow to Mermaid syntax for visualization. Each node and edge label can be customized, supporting descriptions, links, and shapes (e.g., decision nodes). Exported workflows are ideal for documentation, debugging, or sharing with non-technical stakeholders.

mermaidData := wf.ExportMermaid("  ", nil, nil)
fmt.Println(string(mermaidData))
Importing from Mermaid

Orchid also supports importing workflows defined in Mermaid syntax. This allows you to define workflows graphically and load them directly into Orchid, enhancing collaboration and workflow iteration.

mermaidDefinition := `
    flowchart TD
        A[Start] --> B{Decision}
        B -- "Yes" --> C[Option 1]
        B -- "No" --> D[Option 2]
        C --> E[End]
        D --> E
`
wf, err := orchid.ImportMermaid("mermaid_workflow", mermaidDefinition)
if err != nil {
    log.Fatalf("Failed to import Mermaid workflow: %v", err)
}

With Mermaid import/export, you can visualize, define, and refine workflows, expanding the ways orchid can integrate into your development workflow.

Code Generation

Similar to Mermaid import/export, orchid allows to generate Go code from workflow definitions. Going from Mermaid or JSON to Go code enables integration of visual workflows into Go code.

code, err := wf.Codegen()
if err != nil {
    log.Fatalf("Failed to generate code: %v", err)
}

fmt.Println(code) // Print or save the generated code

The generated code reflects the workflow structure, using AddNode, Then, and Link calls, and can be integrated into a new orchid Go project.

Persistence and Fault Tolerance

Orchid supports fault tolerance through state persistence and recovery mechanisms. By providing an optional persister, Orchid can recover the state of workflows after failures. The persistence layer MUST accurately reflect the state of the workflow to avoid incorrect restoration. Cyclic dependencies, disconnected graphs, or malformed workflows CAN cause the restoration to fail or behave unpredictably. Well-formed workflows recover at the last non-parallel node, replay successful node executions until the failure point, and retry the failed nodes.

To enable persistence, create a persister instance and pass it to the orchestrator:

persister, err := persistence.NewSQLitePersister("orchid.db")
if err != nil {
    // Handle error
}
defer persister.DB.Close()

o := orchid.NewOrchestrator(orchid.WithPersistence(persister))

We can also pass a default retry policy for all nodes if desired:

o := orchid.NewOrchestratorWithWorkflow(wf,
    orchid.WithDefaultRetryPolicy(&orchid.RetryPolicy{
        MaxRetries:         3,
        InitInterval:       2 * time.Second,
        MaxInterval:        30 * time.Second,
        BackoffCoefficient: 2.0,
    }),
    // ... other options ...
)

Middleware Support

Middlewares allow wrapping activities with additional functionality, such as logging, error handling, or custom serialization. We can apply cross-cutting concerns consistently across activities without modifying their core logic.

A middleware is a function that takes an Activity and returns a new Activity. This allows us to compose additional behavior.

type Middleware func(Activity) Activity

To apply a middleware, register it with the orchestrator before registering activities:

o := orchid.NewOrchestrator(
    orchid.WithLogger(logger),
    // Other options...
)

// Register global middlewares
o.Use(middleware.Logging)
// ...

// Register activities after adding middlewares
o.RegisterActivity("ActivityA", activityA)
o.RegisterActivity("ActivityB", activityB)

For example, a logging middleware can be defined as follows:

func SomeMiddleware(activity orchid.Activity) orchid.Activity {
	return func(ctx context.Context, input []byte) ([]byte, error) {
        // pre process
		output, outErr := activity(ctx, input)
        // post process
		return output, outErr
	}
}

Typed Activities

Activities use []byte for inputs and outputs to facilitate easy serialization and deserialization, which is important for simple state persistence and recovery. To work with custom types, you can use the TypedActivity helper, which handles JSON marshaling and unmarshaling. Define a custom, JSON serializable struct for your data:

type flow struct {
    Data   []byte
    Rating int
}

Then, use TypedActivity to wrap your activity functions:

func someTypedActivity(ctx context.Context, input *flow) (*flow, error) {
    fmt.Println("fnA input:", input)
    return &flow{
        Data:   []byte("A"),
        Rating: 60,
    }, nil
}

Register your activities using TypedActivity:

o.RegisterActivity("A", orchid.TypedActivity(fnA))

Caching

Orchid provides a snapshotter activity that can either save and replay the full payload or cache specific fields. This can be useful for caching data from external sources or expensive operations to avoid recomputation.

fieldsToCache := []string{"user.name", "user.email", "transaction.id"}

checkpoint := orchid.NewSnapshotter(fieldsToCache)
o.RegisterActivity("checkpoint", checkpoint.Save)

Note that even with typed activities, snapshotting can be used as we operate on JSON data.

See the ./examples directory for different usage scenarios.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNodeNotFound                 = fmt.Errorf("node not found")
	ErrWorkflowNodeAlreadyExists    = fmt.Errorf("node already exists")
	ErrWorkflowInvalid              = fmt.Errorf("invalid, it has no trigger or no starting node")
	ErrWorkflowHasNoTriggers        = fmt.Errorf("no triggers found")
	ErrWorkflowHasNoNodes           = fmt.Errorf("workflow has no nodes")
	ErrOrchestratorActivityNotFound = fmt.Errorf("activity not found")
	ErrOrchestratorHasNoPersister   = fmt.Errorf("no persister set")
	ErrNoWorkflowID                 = fmt.Errorf("no execution ID set")
)
View Source
var ErrPanic = errors.New("panic in activity")
View Source
var ErrRestorable = fmt.Errorf("workflow is restorable")

Functions

func ActivityName

func ActivityName(ctx context.Context) string

func ActivityToken

func ActivityToken(ctx context.Context) string

func Config

func Config(ctx context.Context, key string) (interface{}, bool)

func ConfigInt added in v0.5.0

func ConfigInt(ctx context.Context, key string) (int, bool)

func ConfigString

func ConfigString(ctx context.Context, key string) (string, bool)

func IsNonRestorable

func IsNonRestorable(ctx context.Context) bool

func IsWorkflowRestore added in v0.5.0

func IsWorkflowRestore(ctx context.Context) bool

func Logger

func Logger(ctx context.Context) *slog.Logger

func NopActivity added in v0.6.0

func NopActivity(ctx context.Context, input []byte) ([]byte, error)

func ParentWorkflowID added in v0.5.0

func ParentWorkflowID(ctx context.Context) string

func RouteTo

func RouteTo(nodeKey string) error

func SignalWorkflowRestore added in v0.5.0

func SignalWorkflowRestore(ctx context.Context) context.Context

func WithNewWorkflowID added in v0.5.0

func WithNewWorkflowID(ctx context.Context) context.Context

func WithNonRestorable

func WithNonRestorable(ctx context.Context) context.Context

func WithParentWorkflowID added in v0.5.0

func WithParentWorkflowID(ctx context.Context, id string) context.Context

func WithWorkflowID

func WithWorkflowID(ctx context.Context, id string) context.Context

func WorkflowID

func WorkflowID(ctx context.Context) string

Types

type Activity

type Activity func(ctx context.Context, input []byte) (output []byte, err error)

func AsyncExecutor

func AsyncExecutor(ctx context.Context) (Activity, bool)

func SyncExecutor

func SyncExecutor(ctx context.Context) (Activity, bool)

func TypedActivity

func TypedActivity[T any](activity func(ctx context.Context, input T) (T, error)) Activity

type DynamicRoute

type DynamicRoute struct {
	Key string `json:"key"`
	Err error  `json:"error"`
}

func (*DynamicRoute) Error

func (e *DynamicRoute) Error() string

func (*DynamicRoute) MarshalJSON

func (e *DynamicRoute) MarshalJSON() ([]byte, error)

func (*DynamicRoute) UnmarshalJSON

func (e *DynamicRoute) UnmarshalJSON(data []byte) error

func (*DynamicRoute) Unwrap

func (e *DynamicRoute) Unwrap() error

type Edge

type Edge struct {
	From  string  `json:"from"`
	To    string  `json:"to"`
	Label *string `json:"label,omitempty"`
}

type Entrypoint added in v0.5.0

type Entrypoint func(ctx context.Context) ([]byte, error)

type Middleware

type Middleware func(Activity) Activity

type Node

type Node struct {
	ID                    int64                  `json:"id"`
	ActivityName          string                 `json:"activity"`
	Config                map[string]interface{} `json:"config,omitempty"`
	Type                  NodeType               `json:"type,omitempty"`
	RetryPolicy           *RetryPolicy           `json:"retry,omitempty"`
	EditLink              *string                `json:"link,omitempty"`
	DisableSequentialFlow bool                   `json:"disable_sequential_flow,omitempty"`
}

func NewNode

func NewNode(activity string, options ...NodeOption) *Node

func (*Node) ActivityStartTime

func (n *Node) ActivityStartTime(ctx context.Context) time.Time

func (*Node) ConfigString added in v0.7.0

func (n *Node) ConfigString(key string) (string, bool)

func (*Node) GetConfigIntArray added in v0.7.0

func (n *Node) GetConfigIntArray(key string) ([]int, bool)

func (*Node) GetConfigStringArray added in v0.7.0

func (n *Node) GetConfigStringArray(key string) ([]string, bool)

func (*Node) IsRetryableError

func (n *Node) IsRetryableError(err error) bool

func (*Node) IsTrigger

func (n *Node) IsTrigger() bool

func (*Node) RetryAttempts

func (n *Node) RetryAttempts(ctx context.Context) int

func (*Node) RetryPolicyOrDefault

func (n *Node) RetryPolicyOrDefault(d *RetryPolicy) *RetryPolicy
type NodeLink struct {
	Name string // Display text for the link
	URI  string // URL for the link
}

Define NodeLink struct for each link's name and URI

type NodeMetadata added in v0.7.0

type NodeMetadata struct {
	Description string     // Node description, which may include line breaks
	Links       []NodeLink // List of links to display within the node
	Standalone  bool       // Flag indicating if the node should be rendered as standalone
}

Define NodeMetadata struct with optional description and links

type NodeOption

type NodeOption func(*Node)

func WithNodeConfig

func WithNodeConfig(config map[string]interface{}) NodeOption

func WithNodeDisableSequentialFlow added in v0.6.0

func WithNodeDisableSequentialFlow() NodeOption
func WithNodeEditLink(link string) NodeOption

func WithNodeID

func WithNodeID(id int64) NodeOption

func WithNodeRetryPolicy

func WithNodeRetryPolicy(policy *RetryPolicy) NodeOption

func WithNodeType

func WithNodeType(t NodeType) NodeOption

type NodeType

type NodeType string
const (
	Trigger NodeType = "trigger"
	Action  NodeType = "action"
)

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

func NewOrchestrator

func NewOrchestrator(options ...OrchestratorOption) *Orchestrator

func (*Orchestrator) GetActivity

func (o *Orchestrator) GetActivity(node *Node) (Activity, bool)

func (*Orchestrator) GetNode

func (o *Orchestrator) GetNode(graphNode graph.Node) (*Node, bool)

func (*Orchestrator) GetReducer

func (o *Orchestrator) GetReducer(node *Node) (Reducer, bool)

func (*Orchestrator) LoadWorkflow

func (o *Orchestrator) LoadWorkflow(w *Workflow) error

func (*Orchestrator) RegisterActivity

func (o *Orchestrator) RegisterActivity(name string, activity Activity)

func (*Orchestrator) RegisterReducer

func (o *Orchestrator) RegisterReducer(name string, reducer Reducer)

func (*Orchestrator) RestorableWorkflows

func (o *Orchestrator) RestorableWorkflows(ctx context.Context) (RestorableWorkflows, error)

func (*Orchestrator) RestoreWorkflowsAsync

func (o *Orchestrator) RestoreWorkflowsAsync(ctx context.Context, withChildWorkflows bool) error

func (*Orchestrator) RunTriggers

func (o *Orchestrator) RunTriggers(ctx context.Context, input []byte) ([]byte, error)

func (*Orchestrator) Start

func (o *Orchestrator) Start(ctx context.Context, data []byte) (output []byte, err error)

func (*Orchestrator) StartAsync

func (o *Orchestrator) StartAsync(ctx context.Context, data []byte) (output []byte, err error)

func (*Orchestrator) Use

func (o *Orchestrator) Use(mw Middleware)

type OrchestratorOption

type OrchestratorOption func(*Orchestrator)

func WithDefaultRetryPolicy added in v0.2.0

func WithDefaultRetryPolicy(policy *RetryPolicy) OrchestratorOption

func WithDisableActivityPanicRecovery added in v0.7.0

func WithDisableActivityPanicRecovery() OrchestratorOption

func WithFailWorkflowOnActivityPanic added in v0.5.0

func WithFailWorkflowOnActivityPanic() OrchestratorOption

func WithForcedTransitions added in v0.6.0

func WithForcedTransitions() OrchestratorOption

func WithLogger

func WithLogger(logger *slog.Logger) OrchestratorOption

func WithPersistence

func WithPersistence(persister persistence.Persister) OrchestratorOption

type OrchestratorRegistry added in v0.5.0

type OrchestratorRegistry map[string]*Orchestrator

func NewOrchestratorRegistry added in v0.5.0

func NewOrchestratorRegistry() OrchestratorRegistry

func (OrchestratorRegistry) Get added in v0.5.0

func (OrchestratorRegistry) Set added in v0.5.0

func (r OrchestratorRegistry) Set(name string, o *Orchestrator)

type Reducer

type Reducer func([][]byte) []byte

type RestorableWorkflowState added in v0.5.0

type RestorableWorkflowState struct {
	Status     *persistence.WorkflowStatus
	Entrypoint Entrypoint
}

type RestorableWorkflows

type RestorableWorkflows map[string]map[string]RestorableWorkflowState

RestorableWorkflows returns a map of workflowName to a map of workflowID to a an array of node functions that allow the client to restore them.

type RetryPolicy

type RetryPolicy struct {
	MaxRetries               int           `json:"max_retries"`
	InitInterval             time.Duration `json:"init_interval"`
	MaxInterval              time.Duration `json:"max_interval"`
	BackoffCoefficient       float64       `json:"backoff_coefficient"`
	NonRetriableErrorReasons []string      `json:"non_retriable_error_reasons"`
}

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

Default retry policy

type Snapshotter added in v0.6.0

type Snapshotter struct {
	// contains filtered or unexported fields
}

func NewAttributeShapshotter added in v0.6.0

func NewAttributeShapshotter(fields []string) *Snapshotter

NewAttributeShapshotter can be used for both full and partial snapshotting.

func NewSnapshotter added in v0.6.0

func NewSnapshotter() *Snapshotter

NewShapshotter initializes a new checkpoint activity with data storage.

func (*Snapshotter) Delete added in v0.6.0

func (chp *Snapshotter) Delete(ctx context.Context, input []byte) ([]byte, error)

Delete removes checkpoint data for a given workflow ID.

func (*Snapshotter) Save added in v0.6.0

func (chp *Snapshotter) Save(ctx context.Context, input []byte) ([]byte, error)

Save stores data on first invocation, returns stored data on subsequent calls.

func (*Snapshotter) SaveAttributes added in v0.6.0

func (chp *Snapshotter) SaveAttributes(ctx context.Context, input []byte) ([]byte, error)

type Workflow

type Workflow struct {
	Name  string           `json:"name"`
	Nodes map[string]*Node `json:"nodes"`
	Edges []*Edge          `json:"edges"`
	// contains filtered or unexported fields
}

func ImportMermaid added in v0.8.0

func ImportMermaid(name, chart string) (*Workflow, error)

func NewWorkflow

func NewWorkflow(name string) *Workflow

func (*Workflow) AddNewNode added in v0.5.0

func (wf *Workflow) AddNewNode(activity string, options ...NodeOption) *Workflow

func (*Workflow) AddNode

func (wf *Workflow) AddNode(node *Node) *Workflow

AddNode adds a new task to the workflow.

func (*Workflow) Codegen added in v0.8.0

func (wf *Workflow) Codegen() (string, error)

func (*Workflow) Export

func (wf *Workflow) Export() ([]byte, error)

func (*Workflow) ExportMermaid

func (wf *Workflow) ExportMermaid(indent string, nodeToChildWorkflows map[string]*Workflow, nodeToMetadata map[string]NodeMetadata) []byte

ExportMermaid generates the Mermaid representation of the workflow. Optionally, it can include child workflows as subgraphs and requires a map of node names to corresponding child workflows they spawn. It also accepts an optional nodeToMetadata parameter to add descriptions and links to nodes.

func (*Workflow) ExportMermaidHTML added in v0.3.0

func (wf *Workflow) ExportMermaidHTML(indent string, optionalChildWorkflows map[string]*Workflow, nodeToMetadata map[string]NodeMetadata) ([]byte, error)

func (*Workflow) ExportMermaidToFile

func (wf *Workflow) ExportMermaidToFile(filename string, optionalChildWorkflows map[string]*Workflow, nodeToMetadata map[string]NodeMetadata) error

func (*Workflow) Import

func (wf *Workflow) Import(workflow []byte) error
func (wf *Workflow) Link(from, to string) *Workflow

func (*Workflow) LinkWithLabel added in v0.8.0

func (wf *Workflow) LinkWithLabel(from, to string, label string) *Workflow

func (*Workflow) Then added in v0.2.0

func (wf *Workflow) Then(node *Node) *Workflow

func (*Workflow) ThenNewNode added in v0.5.0

func (wf *Workflow) ThenNewNode(activity string, options ...NodeOption) *Workflow

func (*Workflow) WithLabel added in v0.8.0

func (wf *Workflow) WithLabel(label string) *Workflow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL