Durable workflows using Go
Borrows heavily from Temporal (and since it's a fork also Cadence) as well as DTFx.
See also:
On Go support: the current version of the library requires Go 1.18 or later. There is a version that doesn't require generics and relies more on interface{}
instead, but I think the improved type safety is worth not supporting a version of Go before 1.18 for now.
Simple example
Workflow
Workflows are written in Go code. The only exception is they must not use any of Go's non-deterministic features (select
, iteration over a map
, etc.). Inputs and outputs for workflows and activities have to be serializable:
func Workflow1(ctx workflow.Context, input string) error {
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println("A1 result:", r1)
r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println("A2 result:", r2)
return nil
}
Activities
Activities can have side-effects and don't have to be deterministic. They will be executed only once and the result is persisted:
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
func Activity2(ctx context.Context) (int, error) {
return 12, nil
}
Worker
The worker is responsible for executing Workflows
and Activities
, both need to be registered with it.
func runWorker(ctx context.Context, mb backend.Backend) {
w := worker.New(mb, nil)
w.RegisterWorkflow(Workflow1)
w.RegisterActivity(Activity1)
w.RegisterActivity(Activity2)
if err := w.Start(ctx); err != nil {
panic("could not start worker")
}
}
Backend
The backend is responsible for persisting the workflow events. Currently there is an in-memory backend implementation for testing, one using SQLite, one using MySql, and one using Redis.
b := sqlite.NewSqliteBackend("simple.sqlite")
Putting it all together
We can start workflows from the same process the worker runs in -- or they can be separate. Here we use the SQLite backend, spawn a single worker (which then executes both Workflows
and Activities
), and then start a single instance of our workflow
func main() {
ctx := context.Background()
b := sqlite.NewSqliteBackend("simple.sqlite")
go runWorker(ctx, b)
c := client.New(b)
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
panic("could not start workflow")
}
c2 := make(chan os.Signal, 1)
signal.Notify(c2, os.Interrupt)
<-c2
}
Architecture (WIP)
The high-level architecture follows the same model as Azure's DurableTask library. The "persistence store" or "providers" mentioned there are implementations of backend.Backend
for go-workflows
. There is no intermediate server, clients which create new workflow instances and retrieve their results, as well as worker processes (which could also be the same as clients), talk directly to the backend. For the two implemented backends so far, that also means directly to the database.
Execution model
While there are implementations for other lanuages in the context of Azure Durable Functions, the general purpose version of Durable Task was only implemented for C#.
The execution model for go-workflows
follows closely the one created for Uber's Cadence and which was then forked by the original creators for Temporal.io.
See https://cschleiden.dev/blog/2022-05-02-go-workflows-part2/ for some more details.
Supported backends
For all backends, for now the initial schema is applied upon first usage. In the future this might move to something more powerful to migrate between versions, but in this early stage, there is no upgrade.
Sqlite
The Sqlite backend implementation supports two different modes, in-memory and on-disk:
By default the schema is automatically created/migrations are automatically applied. Use WithApplyMigrations(false)
to disable this behavior.
MySql
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
By default the schema is automatically created/migrations are automatically applied. Use WithApplyMigrations(false)
to disable this behavior.
Redis
redisClient := ...
b, err := redis.NewRedisBackend(redisClient)
if err != nil {
panic(err)
}
Guide
Registering workflows
Workflows need to accept workflow.Context
as their first parameter, and any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chan
s etc.). Workflows need to return an error
and optionally one additional result, which again needs to be serializable.
func Workflow1(ctx workflow.Context) error {
}
Workflows needs to be registered with the worker before they can be started:
var b backend.Backend
w := worker.New(b)
w.RegisterWorkflow(Workflow1)
Registering activities
Similar to workflows, activities need to be registered with the worker before they can be started. They also need to accept context.Context
as their first parameter, and any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chan
s etc.). Activities need to return an error
and optionally one additional result, which again needs to be serializable.
Activites can be registered as plain func
s or as methods on a struct. The latter is useful if you want to provide some shared state to activities, for example, a database connection.
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
var b backend.Backend
w := worker.New(b)
w.RegisterActivity(Activity1)
And using a struct
:
type act struct {
SharedState int
}
func (a *act) Activity1(ctx context.Context, a, b int) (int, error) {
return a + b + act.SharedState, nil
}
func (a *act) Activity2(ctx context.Context, a int) (int, error) {
return a * act.SharedState, nil
}
var b backend.Backend
w := worker.New(b)
w.RegisterActivity(&act{SharedState: 12})
to call activities registered on a struct from a workflow:
// ...
var a *act
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a.Activity1, 35, 12).Get(ctx)
if err != nil {
// handle error
}
// Output r1 = 47 + 12 (from the worker registration) = 59
Starting workflows
CreateWorkflowInstance
on a client instance will start a new workflow instance. Pass options, a workflow to run, and any inputs.
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
// ...
}
Removing workflow instances
RemoveWorkflowInstance
on a client instance will remove that workflow instance including all history data from the backend. A workflow instance needs to be in the finished state before calling this, otherwise an error will be returned.
err = c.RemoveWorkflowInstance(ctx, workflowInstance)
if err != nil {
// ...
}
Automatically expiring finished workflow instances
For now this is only supported for the Redis backend. When an AutoExpiration
is passed to the backend, finished workflow instances will be automatically removed after the specified duration. This works by setting a TTL on the Redis keys for finished workflow instances. If AutoExpiration
is set to 0
(the default), no TTL will be set.
b, err := redis.NewRedisBackend(redisClient, redis.WithAutoExpiration(time.Hour * 48))
// ...
Canceling workflows
Create a Client
instance then then call CancelWorkflow
to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.
Sub-workflows will be canceled if their parent workflow is canceled.
var c *client.Client
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
if err != nil {
panic("could not cancel workflow")
}
If you need to run any activities or make calls using workflow.Context
you need to create a new context with workflow.NewDisconnectedContext
, since the original context is canceled at this point.
func Workflow2(ctx workflow.Context, msg string) (string, error) {
defer func() {
if errors.Is(ctx.Err(), workflow.Canceled) {
// Workflow was canceled. Get new context to perform any cleanup activities
ctx := workflow.NewDisconnectedContext(ctx)
// Execute the cleanup activity
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
return errors.Wrap(err, "could not perform cleanup")
}
}
}()
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
if err != nil { // <---- Workflow is canceled while this activity is running
return errors.Wrap(err, "could not get ActivityCancel result")
}
// r1 will contain the result of ActivityCancel
// ⬇ ActivitySkip will be skipped immediately
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
if err != nil {
return errors.Wrap(err, "could not get ActivitySkip result")
}
return "some result", nil
}
Running activities
From a workflow, call workflow.ExecuteActivity
to execute an activity. The call returns a Future[T]
you can await to get the result or any error it might return.
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12, nil, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println(r1)
Canceling activities
Canceling activities is not supported at this time.
Timers
You can schedule timers to fire at any point in the future by calling workflow.ScheduleTimer
. It returns a Future
you can await to wait for the timer to fire.
t := workflow.ScheduleTimer(ctx, 2*time.Second)
err := t.Get(ctx, nil)
Canceling timers
There is no explicit API to cancel timers. You can cancel a timer by creating a cancelable context, and canceling that:
tctx, cancel := workflow.WithCancel(ctx)
t := workflow.ScheduleTimer(tctx, 2*time.Second)
// Cancel the timer
cancel()
Signals
Signals are a way to send a message to a workflow. You can send a signal to a workflow by calling workflow.Signal
and listen to them by creating a SignalChannel
via NewSignalChannel
:
// From outside the workflow:
c.SignalWorkflow(ctx, "<instance-id>", "signal-name", "value")
func Workflow(ctx workflow.Context) error {
// ...
signalCh := workflow.NewSignalChannel[string](ctx, "signal-name")
// Pause workflow until signal is received
workflow.Select(ctx,
workflow.Receive(signalCh, func(ctx workflow.Context, r string, ok bool) {
logger.Debug("Received signal:", r)
}),
)
// Continue execution
}
Signaling workflows from within workflows
func Workflow(ctx workflow.Context) error {
if _, err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value").Get(ctx); err != nil {
// Handle error
}
}
Executing side effects
Sometimes scheduling an activity is too much overhead for a simple side effect. For those scenarios you can use workflow.SideEffect
. You can pass a func which will be executed only once inline with its result being recorded in the history. Subsequent executions of the workflow will return the previously recorded result.
id, _ := workflow.SideEffect[string](ctx, func(ctx workflow.Context) string) {
return uuid.NewString()
}).Get(ctx)
Running sub-workflows
Call workflow.CreateSubWorkflowInstance
to start a sub-workflow. The returned Future
will resolve once the sub-workflow has finished.
func Workflow1(ctx workflow.Context, msg string) error {
result, err := workflow.CreateSubWorkflowInstance[int]
ctx, workflow.SubWorkflowInstanceOptions{}, SubWorkflow, "some input").Get(ctx)
if err != nil {
return errors.Wrap(err, "could not get sub workflow result")
}
logger.Debug("Sub workflow result:", "result", result)
return nil
}
func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx)
if err != nil {
return "", errors.Wrap(err, "could not get activity result")
}
logger.Debug("A1 result:", "r1", r1)
return r1, nil
}
Canceling sub-workflows
Similar to timer cancellation, you can pass a cancelable context to CreateSubWorkflowInstance
and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the Client
. See Canceling workflows for more details.
Error handling
Custom errors
Errors returned from activities and subworkflows need to be marshalled/unmarshalled by the library so they are wrapped in a workflow.Error
. You can access the original type via the err.Type
field. If a stacktrace was captured, you can access it via err.Stack()
. Example (see also samples/errors
):
func handleError(ctx workflow.Context, logger log.Logger, err error) {
var werr *workflow.Error
if errors.As(err, &werr) {
switch werr.Type {
case "CustomError": // This was a `type CustomError struct...` returned by an activity/subworkflow
logger.Error("Custom error", "err", werr)
return
}
logger.Error("Generic workflow error", "err", werr, "stack", werr.Stack())
return
}
var perr *workflow.PanicError
if errors.As(err, &perr) {
// Activity/subworkflow ran into a panic
logger.Error("Panic", "err", perr, "stack", perr.Stack())
return
}
logger.Error("Generic error", "err", err)
}
Panics
A panic in an activity will be captured by the library and made available as a workflow.PanicError
in the calling workflow. Example:
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
var perr *workflow.PanicError
if errors.As(err, &perr) {
logger.Error("Panic", "err", perr, "stack", perr.Stack())
return
}
Retries
With the default DefaultActivityOptions
, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with workflow.NewPermanentError
:
Workflow:
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println(r1)
Activity:
func Activity1(ctx context.Context, name string) (int, error) {
if name == "test" {
// No need to retry in this case, the activity will aways fail with the given inputs
return 0, workflow.NewPermanentError(errors.New("test is not a valid name"))
}
return http.Do("POST", "https://example.com", name)
}
ContinueAsNew
ContinueAsNew
allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special error
from your workflow that contains the new inputs:
wf := func(ctx workflow.Context, run int) (int, error) {
run = run + 1
if run < 3 {
return run, workflow.ContinueAsNew(ctx, run)
}
return run, nil
}
Here the workflow is going to be restarted when workflow.ContinueAsNew
is returned. Internally the new execution starts with a fresh history. It uses the same InstanceID
but a different ExecutionID
.
If a sub-workflow is restarted, the caller doesn't notice this, only once it ends without being restarted the caller will get the result and control will be passed back.
select
Due its non-deterministic behavior you must not use a select
statement in workflows. Instead you can use the provided workflow.Select
function. It blocks until one of the provided cases is ready. Cases are evaluated in the order passed to `Select.
var f1 workflow.Future[int]
var c workflow.Channel[int]
value := 42
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
// use v
}),
workflow.Send(c, &value, func (ctx workflow.Context) {
// value has been sent to the channel
}),
workflow.Default(ctx, func (ctx workflow.Context) {
// ...
})
)
Waiting for a Future
Await
adds a case to wait for a Future to have a value
var f1, f2 workflow.Future[int]
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
workflow.Await(f2, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
)
Waiting to receive from a Channel
Receive
adds a case to receive from a given channel
var c workflow.Channel[int]
workflow.Select(
ctx,
workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
// ...
}),
)
Default/Non-blocking
A Default
case is executed if no previous case is ready and selected:
var f1 workflow.Future[int]
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx, &r)
// ...
}),
workflow.Default(ctx, func (ctx workflow.Context) {
// ...
})
)
Unit testing
Workflows
go-workflows includes support for testing workflows, a simple example using mocked activities:
func TestWorkflow(t *testing.T) {
tester := tester.NewWorkflowTester[int](Workflow1)
// Mock two activities
tester.OnActivity(Activity1, mock.Anything, 35, 12).Return(47, nil)
tester.OnActivity(Activity2, mock.Anything, mock.Anything, mock.Anything).Return(12, nil)
// Run workflow with inputs
tester.Execute("Hello world")
// Workflows always run to completion, or time-out
require.True(t, tester.WorkflowFinished())
wr, werr := tester.WorkflowResult()
require.Equal(t, 59, wr)
require.Empty(t, werr)
// Ensure any expectations set for activity or sub-workflow mocks were met
tester.AssertExpectations(t)
}
- Timers are automatically fired by advancing a mock workflow clock that is used for testing workflows
- You can register callbacks to fire at specific times (in mock-clock time). Callbacks can send signals, cancel workflows etc.
Activities
Activities can be tested like any other function. If you make use of the activity context, for example, to retrieve a logger, you can use activitytester.WithActivityTestState
to provide a test activity context. If you don't specify a logger, the default logger implementation will be used.
func Activity(ctx context.Context, a int, b int) (int, error) {
activity.Logger(ctx).Debug("Activity is called", "a", a)
return a + b, nil
}
func TestActivity(t *testing.T) {
ctx := activitytester.WithActivityTestState(context.Background(), "activityID", "instanceID", nil)
r, err := Activity(ctx, 35, 12)
require.Equal(t, 47, r)
require.NoError(t, err)
}
Logging
For logging, you can pass a type to the backend via the WithLogger
option to set a custom logger. The type has to implement this simple interface:
type Logger interface {
Debug(msg string, fields ...interface{})
Warn(msg string, fields ...interface{})
Error(msg string, fields ...interface{})
Panic(msg string, fields ...interface{})
With(fields ...interface{}) Logger
}
If you don't pass a logger, a very simple, unoptimized default logger is used. For production use it is strongly recommended to pass another logger.
Workflows
For logging in workflows, you can get a logger using
logger := workflow.Logger(ctx)
The returned logger
implements the Logger
interface, and already has the workflow instance set as default field.
Activities
For logging in activities, you can get a logger using
logger := activity.Logger(ctx)
The returned logger
implements the Logger
interface, and already has the id of the activity, and the workflow instance set as default field.
Tracing
The library supports tracing via OpenTelemetry. When you pass a TracerProvider
when creating a backend instance, workflow execution will be traced. You can also add additional spans for both activities and workflows.
Note: the support is considered experimental right now, if you decide to use it, please leave feedback
Activities
The context.Context
passed into activities is set up with the correct current span. If you create additional spans, they'll show up under the ActivityTaskExecution
:
func Activity1(ctx context.Context, a, b int) (int, error) {
ctx, span := otel.Tracer("activity1").Start(ctx, "Custom Activity1 span")
defer span.End()
// Do something
}
Workflows
For workflows the usage is a bit different, the tracer needs to be aware of whether the workflow is being replayed or not:
func Workflow(ctx workflow.Context) error {
ctx, span := workflow.Tracer(ctx).Start(ctx, "Workflow1 span", trace.WithAttributes(
// Add additional
attribute.String("msg", "hello world"),
))
// Do something
span.End()
Context Propagation
In go programs it is common to use context.Context
to pass around request-scoped data. This library supports context propagation between activities and workflows. When you create a workflow, you can pass a ContextPropagator
to the backend to propagate context values. The interface is:
type ContextPropagator interface {
Inject(context.Context, *Metadata) error
Extract(context.Context, *Metadata) (context.Context, error)
InjectFromWorkflow(Context, *Metadata) error
ExtractToWorkflow(Context, *Metadata) (Context, error)
}
The context-propagation
sample shows an example of how to use this.
Analyzer
/analyzer
contains a simple golangci-lint based analyzer to spot common issues in workflow code.
Diagnostics Web UI
For investigating workflows, the package includes a simple diagnostic web UI. You can serve it via:
m := http.NewServeMux()
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))
go http.ListenAndServe(":3000", m)
It provides a simple paginated list of workflow instances:
And a way to inspect the history of a workflow instance:
FAQ
How are releases versioned?
For now this library is in a pre-release state. There are no guarantees given regarding breaking changes between (pre)-releases.
Workflow versioning
For now, I've intentionally left out versioning. Cadence, Temporal, and DTFx all support the concept of versions for workflows as well as activities. This is mostly required when you make changes to workflows and need to keep backwards compatibility with workflows that are being executed at the time of the upgrade.
Example: when you change a workflow from:
func Workflow1(ctx workflow.Context) {
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
to:
func Workflow1(ctx workflow.Context) {
var r1 int
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
r3, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
log.Println("A3 result:", r3)
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
and you replay a workflow history that contains:
ActivitySchedule
- Activity1
ActivityCompleted
- Activity1
ActivitySchedule
- Activity2
ActivityCompleted
- Activity2
the workflow will encounter an attempt to execute Activity3
in-between event 2 and 3, for which there is no matching event. This is a non-recoverable error. The usual approach to solve is to version the workflows and every time you make a change to a workflow, you have to check that logic. For this example this could look like:
func Workflow1(ctx workflow.Context) {
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
if workflow.Version(ctx) >= 2 {
r3, _ := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
log.Println("A3 result:", r3)
}
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
and only if a workflow instance was created with a version of >= 2
will Activity3
be executed. Older workflows are persisted with a version < 2
and will not execute Activity3
.
This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on side-by-side deployments. See also Azure's Durable Functions documentation for the same topic.