fse-temporal

module
v0.8.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: BSD-3-Clause

README

FSE for Temporal

This is a DSL for a finite state engine (FSE) with an interpreter for running it on https://www.temporal.io/

To discover FSE workflow types, we also provide a particular temporal workflow function with a well-known WorkflowID that acts as an installation-wide singleton storing ans providing workflow descriptions by name. There is the cmd registry-worker hosting only this workflow. The sample website in cmd/fse also hosts the registry workflow.

Status

The status of this repository is "incubating". I.e. we discourage production use currently, and there will be braking changes to the API and to the DSL itself. We expect to release a v1 version in Q4 2020.

Temporal v1 was released in Q4 2020, wich is reflected in temporal-latest.yaml

There is a Breaking Change in the DSL syntax compared to the August 2020 version: Descriptions need to be enclosed within <-- Description --> instead of three dashes --- at start and end. With this change, the parser is better at syncing after an unintended spurious end of description token within a description, like in the syntactically wrong --- Hi there --- General K. ---.

DSL Features

Example DSL Text

Here is a simple example:

workflow version v100 other-workflow

state initial
{
    after 5s goto s1;

    event close
        # close wil be available in all sub states of initial
        goto final
    ;

    final-event only-in-initial
        # only-in-initial will not be inherited by any substate like s1 and s2
        goto s2
    ;

    state s1 {
        after 5s goto s2;

        # inherits event close from initial
    }

    state s2 {
        after 10s goto final;

        event back
            goto s1
        ;

        # inherits event close from initial
    }
}

state final {
    initialize terminate;
}

For a longer FSE grammar example see sample-workflow-v100.fse. It is of no practical use, but it shows all available syntax elements.

DSL Prosa

For a detailed syntax description have a look at FSE.g4 containing the language neutral antlr4 description.

An FSE workflow DSL has a Header naming the workflow and a non empty list of named states. Each state can have an initialization, a time-out, a list of events and a list of substates which are normal states. Substates inherit events, but not time-outs from their containing state. Each initialization, time-out and event can have a list of consequences described in the table below.

Workflows, states and events are named, i.e. they have a mandatory key with a restricted set of allowed characters, namely only a..z, A..Z, 0..9 and inner single dashes or underscores. Furthermore they have an option string, a display name and a rolenames list. An example for a full named object would be:

I-am-Number-9:"option: A B C; hidden;"
as "This is my display name: I am #9"
for "aliens" "mortals" "immortals"

The workflow interpreter keeps a global map of strings indexed by a so called variables, which are strings starting with a $ and the rest is constraind exactly as keys of named objects are. This map is called the environment of this interpreter instance. If you ask for a variable's value before it is written to, it returns the empty string, otherwise the string that was written to last.

Consequence Example Description
assignment $x = "abc" sets the interpreter's environment variable $x to abc
assignment $x = $y sets the interpreter's environment variable $x to the value of $y
assignment with funcall $x = action("abc") dispatches action with parameter abc and sets the interpreter's environment variable $x the result of action(abc) where action is either a temporal activity with the name action or sth the code dispatches at will
map assignment $$ = mapaction("abc" $r) dispatches mapaction with parameter abc and the value of $r and sets interpreter environment variables according to the map[string]string result of mapaction(abc $r) where mapaction is either a temporal activity with the name mapaction or sth the code dispatches at will
funcall action($x "abc") dispatches action with the interpreter's environment variable $x and the literal abc as parameters. No return value is assumed
terminate terminate stops consequence evaluation and ends the workflow
halt halt stops consequence evaluation (can be prefixed by an if)
goto goto s1 or goto $x go to state s1 or to the state named in $x and initialize it, stop consequence evaluation (can be prefixed by an if)
ifgoto if $x goto/halt check if $x is not the empty string, if true execute goto or halt like described above
ifgoto if $x == $y ... check if $x equals $y, if true execute goto or halt like described above
ifgoto if $x != $y ... check if $x doesn't equal $y, if true execute goto or halt like described above
ifgoto if $x == "abc" ... check if $x equals abc, if true execute goto or halt like described above
ifgoto if $x != "abc" ... check if $x doesn't equal $y, if true execute goto or halt like described above

So an if always has a variable on the left, then an optional == or a != plus a literal or another variable on the right and it is followed by a goto or a halt which always stop consequence evaluation.

A funcall always blocks.

Goto by Environment Variable

Some common application is to add some "Go Back" event to a container state with a final consequence of goto $PREVIOUS which is inherited to its inner states. This way the workflow gets a quasi-automatic one step backward feature.

A goto $x does this:

  • lookup $x in the environment
  • if the value is empty, *new* or the environment var $x was never set, the goto is silently ignored
  • if the value, e.g. s1, is not empty and the state is known by the workflow it is treated like a plain literal goto s1 in the example
  • if the value is not a known state, the workflow errors out

Auto Environment Vars

The following table lists environment variables that the interpreter takes care of itself.

Environment Variable Description Special Case
$CURRENT Name of the state the workflow entered none
$CURRENTDISPLAY Displayname of the state the workflow entered none
$EVENT Name of the event that was signaled last For timer events this value is *timer*
$EVENTDISPLAY Displayname of the event that was signaled last For timer events this value is after <duration>
$PREVIOUS Name of the state the signal was received. It is kept even if multiple states are hoped over On workflow initialization the value is *new*
$PREVIOUSDISPLAY Displayname of the state the signal was received. It is kept even if multiple states are hoped over On workflow initialization the value is empty

Note: For easier reading it is recommended to restrict user defined environment variables to lowercase letters and digits.

Usage

In this section we describe the intended use cases.

To Parse an FSE DSL within a Program
import (
    ...
    "gitlab.com/lercher/fse-temporal/syntax"
)

var r io.Reader // open a reader from somewhere

// parse the reader's content, note that the whole byte stream must fit into memory
wf, diag, err := syntax.Parse(r, "") // or a file's path insted of "" if available
if err != nil {
    for i := range diag {
        // if an error ocurred there can be many diagnostic lines
        log.Println(diag[i])
    }
    log.Fatalln(err)
}

// wf loaded successfully, inspect some properties
log.Printf("%s %q loaded successfully", wf.Name, wf.Display)
log.Println(wf.Description)
log.Printf("%v total states, %v top level states", len(wf.AllStates), len(wf.States))

Have a look at the grammar checker source code for a more complete example.

To Use the Interpreter with a Temporal Worker
import (
    ...
    "gitlab.com/lercher/fse-temporal/exec"
    "gitlab.com/lercher/fse-temporal/syntax"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
    "go.temporal.io/sdk/workflow"
)

In your workflow worker code wf, _, _ := syntax.Parse(r) the reader somewhere, decide upon which temporal TaskQueue(s) your workflows and activities listen on (taskQueue, actionsTaskQueue) and then register a generic workflow worker function by calling a setup method like this one:

func setup(wf *syntax.Workflow, taskQueue, actionsTaskQueue string) error {
    // if err != nil return err is omited for brevity
    serviceClient, err := client.NewClient(client.Options{})
    defer serviceClient.Close()
    fseWorker := worker.New(serviceClient, taskQueue, worker.Options{})

    fseWorker.RegisterWorkflowWithOptions(makeWorkflow(wf, actionsTaskQueue), workflow.RegisterOptions{
        Name: string(wf.VersionedName()), // use the FSE workflow's versioned name as name for the temporal workflow
    })
    err = fseWorker.Start()
}

// makeWorkflow captures the wf definition and the actionsTaskQueue in a closure
func makeWorkflow(wf *syntax.Workflow, actionsTaskQueue string) func(ctx workflow.Context) error {
    return func(ctx workflow.Context) error {
        // nil means hear, that all called FSE activities are temporal activities
        // otherwise you could plug-in a dispatcher function maping FSE names to general function calls
        in := exec.New(ctx, wf, nil)
        in.ActivityOptions.TaskQueue = actionsTaskQueue
        // "in" is now an interpreter for wf bound to this workflow.Context
        // with its own environment listening for events and delays, dispatching
        // activities called by consequences to temporal activities listening at actionsTaskQueue
        return in.Run() // returns nil on termination or an error if sth went wrong
    }
}

For a more complete example, have a look at the web server's source at cmd/fse.

To Use the File Based Cache

We provide a cache for FSE DSLs stored within files, where the cache invalidates a stored *syntax.Workflow when the modification timestamp of the underlying file changes. Instead of syntax.Parse() create a new cache struct and call its Workflow() method with the same return signature:

import (
    "gitlab.com/lercher/fse-temporal/cache"
)

c := cache.New() // Has a variant NewWith() for providing your own version of os.Open and os.Stat
fn := "my-workflow.fse"

wf, diag, err := c.Workflow(fn)
if err != nil {
    for i := range diag {
        log.Println(diag[i].String()) // diag[i].String() already contains the file name
    }
    panic(err) // err includes fn
}

To Run the Grammar checker

The grammar checker is located under cmd/grammar. It checks an FSE grammer file for syntax and semantic errors and it has an option to convert a grammar file to a JSON representation to enable use cases beside using the packages provided by this repository. To build it, cd cmd/grammar and go build it. There is a sample.fse file and the JSON for that file sample.json.

To check the sample file for errors:

C:\...\fse-temporal\cmd\grammar>grammar.exe sample.fse
2020/08/04 19:32:51 Sample-Workflow "Display Name" loaded successfully
2020/08/04 19:32:51 Triple dash delimited Explanation of what this workflow does. Can contain
any characters, markdown is an option. This text is to be shown
as in info box on a user interface for this workflow.
2020/08/04 19:32:51 6 total states, 4 top level states
2020/08/04 19:32:51 9.0302ms total processing time

C:\...\fse-temporal\cmd\grammar>

To convert the sample file to json add the -j option:

C:\...\fse-temporal\cmd\grammar>grammar.exe -j sample.fse > sample.json
2020/08/04 19:34:40 Sample-Workflow "Display Name" loaded successfully
2020/08/04 19:34:40 Triple dash delimited Explanation of what this workflow does. Can contain
any characters, markdown is an option. This text is to be shown
as in info box on a user interface for this workflow.
2020/08/04 19:34:40 6 total states, 4 top level states
2020/08/04 19:34:40 4.9881ms total processing time

C:\...\fse-temporal\cmd\grammar>type sample.json
{
  "name": "Sample-Workflow",
  "option": "for: STH;",
  "display": "Display Name",
  "description": "Triple dash delimited Explanation of what this workflow does. Can contain\r\nany characters, markdown is an option. This text is to be shown\r\nas in info box on a user interface for this workflow.",
  "roles": [
    "me",
    "and",
    "you"
  ],
  "states": [
...

To Run the Example Website

As a preparation for running the sample website, you need a running installation of a temporal server v1 or greater. Follow their instruction using docker-compose or have a look at the description below on using podman play.

Then build and start the server in the cmd/fse subdirectory. Assuming temporal runs locally and you want to examine the sample workflows we provide that's:

cd cmd/fse
go run .

or under a Windows cmd shell:

C:\...\fse-temporal>cd cmd\fse
C:\...\fse-temporal\cmd\fse>go build && fse.exe

You are probably asked by the Windows firewall to allow traffic for port 9001. Then open http://localhost:9001/ with your favourite browser.

To Run the API Server

The API server's published API is described in detail in cmd/wfs/readme.md. It provides lists for startable and already running workflow instances for particular business objects represented by an option string and and ID. Furthermore operations to create, inspect, signal and terminate instances. To run it:

cd cmd/wfs
go run .
API Server UI

The API server also listens on requests to / and displays a UI of the workflow types it hosts. Query Parameters:

Paramname Description
name Name of the workflow to be displayed. If empty or missing show a list of all available workflow types.
version Use this particular version of the workflow. Only relavant if name is not empty. If missing, an arbitrary version is shown.
embed Optional. If not empty, only a header- and footer-less html fragment is returned for embedding into a differnt web site.

Please note that the UI format is subject to arbitrary changes and enhancements and thus it may probably not be suitable for particular needs. It's probably better to parse an own syntax graph and render it yourself instead of embedding or referencing to the API server's UI.

However, if there is no need for a particular view, it's handy to have a UI to get a quick overview over what workflow types are available on this API server and to inspect their definitions.

To Access the API Server from a Go Program

To access the wfs server's http API, import the wfsclient package, create a Client with the base URL and use its methods.

Hosting the Registry Workflow (stand-alone)

C:\...\fse-temporal\cmd\registry-worker>go build && registry-worker.exe -temporal temporal-server:7233
2020/08/04 17:10:13 INFO  No logger configured for temporal client. Created default one.
2020/08/04 17:10:13 INFO  No activities registered. Skipping activity worker start Namespace default TaskQueue _registry WorkerID 20792@LPOD@
2020/08/04 17:10:13 INFO  Started Worker Namespace default TaskQueue _registry WorkerID 20792@LPOD@
2020/08/04 17:10:13 hosting the Registry workflow, press ^C to stop
[--- ^C pressed ---]
2020/08/04 17:10:22 Stopping worker ...
2020/08/04 17:10:22 INFO  Stopped Worker Namespace default TaskQueue _registry WorkerID 20792@LPOD@

C:\...\fse-temporal\cmd\registry-worker>

Using temporal with Podman

This is quite a straight-forward conversion of the provided docker-compose.yml file to a k8s pod specification to be used via podman play kube.

The only important difference is, that the pod internal communication goes over localhost and not a docker internal DNS name. So we need to specify that the temporal server binds to 127.0.0.1 instead of the host's proper IP address, which is the default.

Creating and Starting the services as a single pod
podman play kube temporal-latest.yaml
Starting/Stopping the pod
podman pod start/stop temporal-latest
Storing Cassandra Data locally

The cassandra container stores its data files in /var/lib/cassandra, so interactively you would give podman the option

podman ... -v /my/own/datadir:/var/lib/cassandra ...

However, I guess, we should to do this in the k8s yaml file. But I can't find docs on that one, only issues. So we probably stick with storing data within the cassandra container.

It's a different story with prod deployments, of course. But there we're going to run persistent storage on a different server anyway.

Inspecting the Server
podman exec -it temporal-srv sh

E.g. listing open TCP ports:

etc/temporal/config # netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:9042            0.0.0.0:*               LISTEN      -
tcp        0      0 :::8088                 :::*                    LISTEN      -
tcp        0      0 10.0.2.100:7233         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:7234         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:7235         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:7239         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:6933         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:6934         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:6935         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:6939         0.0.0.0:*               LISTEN      117/temporal-server
tcp        0      0 10.0.2.100:7000         0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:8081          0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:46653         0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:7199          0.0.0.0:*               LISTEN      -
/etc/temporal/config #

License BSD 3-clause

This project is licensed under the BSD 3-clause "New" or "Revised" License.

Breaking Changes

We try to avoid breaking changes, but sometimes it's not possible.

v0.8.15

With the upgrade of temporal.io from v1.7 to 1.16.1 we noticed a serious bug regarding taskqueue names and multiple wokers within our single process approach. So we decided to change the API:

  • wfs.Serveroptions no more has ActionqueueNameOverride and ActionqueueName(). TaskqueueName is now used for temporal actions, too.
  • func (so ServerOptions) InitializeServer/2 has a new mandatory parameter: a callback to add activities to the single internal worker.

Directories

Path Synopsis
Package cache provides parsed FSE DSLs on Open and Stat from a filesystem
Package cache provides parsed FSE DSLs on Open and Stat from a filesystem
cmd
fse
wfs
Package exec implements an interpreter of the FSE grammar on top of temporal.io
Package exec implements an interpreter of the FSE grammar on top of temporal.io
Package listener adds semantic error handling to a standard antlr.DiagnosticListener and conforms to the go error handling infrastructure
Package listener adds semantic error handling to a standard antlr.DiagnosticListener and conforms to the go error handling infrastructure
Package reg implements a temporal.io workflow singleton worker and query functions to keep a global registry of all available fse workflow types
Package reg implements a temporal.io workflow singleton worker and query functions to keep a global registry of all available fse workflow types
Package wfsclient establishes the communication between client program and the FSE workflow API server wfs via http.
Package wfsclient establishes the communication between client program and the FSE workflow API server wfs via http.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL