Goflow
A workflow/DAG orchestrator written in Go for rapid prototyping of ETL/ML/AI pipelines. Goflow comes complete with a web UI for inspecting and triggering jobs.
Contents
- Quick start
- Use case
- Concepts and features
- Jobs and tasks
- Custom Operators
- Retries
- Task dependencies
- Trigger rules
- The Goflow engine
Quick start
With Docker
docker run -p 8181:8181 ghcr.io/fieldryand/goflow-example:latest
Browse to localhost:8181
to explore the UI.
Without Docker
In a fresh project directory:
go mod init # create a new module
go get github.com/fieldryand/goflow # install dependencies
Create a file main.go
with contents:
package main
import "github.com/fieldryand/goflow"
func main() {
options := goflow.Options{
AssetBasePath: "assets/",
StreamJobRuns: true,
ShowExamples: true,
}
gf := goflow.New(options)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
Download the front-end from the release page, untar it, and move it to the location specified in goflow.Options.AssetBasePath
. Now run the application with go run main.go
and see it in the browser at localhost:8181.
Use case
Goflow was built as a simple replacement for Apache Airflow to manage some small data pipeline projects. Airflow started to feel too heavyweight for these projects where all the computation was offloaded to independent services, but there was still a need for basic orchestration, concurrency, retries, visibility etc.
Goflow prioritizes ease of deployment over features and scalability. If you need distributed workers, backfilling over time slices, a durable database of job runs, etc, then Goflow is not for you. On the other hand, if you want to rapidly prototype some pipelines, then Goflow might be a good fit.
Concepts and features
Job
: A Goflow workflow is called a Job
. Jobs can be scheduled using cron syntax.
Task
: Each job consists of one or more tasks organized into a dependency graph. A task can be run under certain conditions; by default, a task runs when all of its dependencies finish successfully.
- Concurrency: Jobs and tasks execute concurrently.
Operator
: An Operator
defines the work done by a Task
. Goflow comes with a handful of basic operators, and implementing your own Operator
is straightforward.
- Retries: You can allow a
Task
a given number of retry attempts. Goflow comes with two retry strategies, ConstantDelay
and ExponentialBackoff
.
- Database: Goflow supports two database types, in-memory and BoltDB. BoltDB will persist your history of job runs, whereas in-memory means the history will be lost each time the Goflow server is stopped. The default is BoltDB.
- Streaming: Goflow uses server-sent events to stream the status of jobs and tasks to the UI in real time.
Jobs and tasks
Let's start by creating a function that returns a job called myJob
. There is a single task in this job that sleeps for one second.
package main
import (
"errors"
"github.com/fieldryand/goflow"
)
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *", Active: true}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
})
return j
}
By setting Active: true
, we are telling Goflow to apply the provided cron schedule for this job when the application starts.
Job scheduling can be activated and deactivated from the UI.
Custom operators
A custom Operator
needs to implement the Run
method. Here's an example of an operator that adds two positive numbers.
type PositiveAddition struct{ a, b int }
func (o PositiveAddition) Run() (interface{}, error) {
if o.a < 0 || o.b < 0 {
return 0, errors.New("Can't add negative numbers")
}
result := o.a + o.b
return result, nil
}
Retries
Let's add a retry strategy to the sleepForOneSecond
task:
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
return j
}
Instead of ConstantDelay
, we could also use ExponentialBackoff
(see https://en.wikipedia.org/wiki/Exponential_backoff).
Task dependencies
A job can define a directed acyclic graph (DAG) of independent and dependent tasks. Let's use the SetDownstream
method to
define two tasks that are dependent on sleepForOneSecond
. The tasks will use the PositiveAddition
operator we defined earlier,
as well as a new operator provided by Goflow, Get
.
func myJob() *goflow.Job {
j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
j.Add(&goflow.Task{
Name: "getGoogle",
Operator: goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
})
j.Add(&goflow.Task{
Name: "AddTwoPlusThree",
Operator: PositiveAddition{a: 2, b: 3},
})
j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("getGoogle"))
j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("AddTwoPlusThree"))
return j
}
Trigger rules
By default, a task has the trigger rule allSuccessful
, meaning the task starts executing when all the tasks directly
upstream exit successfully. If any dependency exits with an error, all downstream tasks are skipped, and the job exits with an error.
Sometimes you want a downstream task to execute even if there are upstream failures. Often these are situations where you want
to perform some cleanup task, such as shutting down a server. In such cases, you can give a task the trigger rule allDone
.
Let's modify sleepForOneSecond
to have the trigger rule allDone
.
func myJob() *goflow.Job {
// other stuff
j.Add(&goflow.Task{
Name: "sleepForOneSecond",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
TriggerRule: "allDone",
})
// other stuff
}
The Goflow Engine
Finally, let's create a Goflow engine, register our job, attach a logger, and run the application.
func main() {
gf := goflow.New(goflow.Options{StreamJobRuns: true})
gf.AddJob(myJob)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
You can pass different options to the engine. Options currently supported:
AssetBasePath
: The path containing the UI assets, usually assets/
.
DBType
: boltdb
(default) or memory
BoltDBPath
: This will be the filepath of the Bolt database on disk.
StreamJobRuns
: Whether to stream updates to the UI.
ShowExamples
: Whether to show the example jobs.
Goflow is built on the Gin framework, so you can pass any Gin handler to Use
.