FSE for Temporal
This is a DSL for a finite state engine (FSE)
with an interpreter for running it on
https://www.temporal.io/
To discover FSE workflow types, we also provide a particular
temporal workflow function with a well-known WorkflowID
that acts as an installation-wide singleton storing ans providing
workflow descriptions by name. There is the cmd registry-worker
hosting only this workflow. The sample website in cmd/fse
also hosts the
registry workflow.
Status
The status of this repository is "incubating". I.e. we discourage
production use currently, and there will be braking changes to the API
and to the DSL itself. We expect to release a v1 version in Q4 2020.
Temporal v1 was released in Q4 2020, wich is reflected in
temporal-latest.yaml
There is a Breaking Change in the DSL syntax compared to the
August 2020 version: Descriptions need to be enclosed within
<-- Description -->
instead of three dashes ---
at start and end.
With this change, the parser is better at syncing after an unintended spurious
end of description token within a description, like in
the syntactically wrong --- Hi there --- General K. ---
.
DSL Features
Example DSL Text
Here is a simple example:
workflow version v100 other-workflow
state initial
{
after 5s goto s1;
event close
# close wil be available in all sub states of initial
goto final
;
final-event only-in-initial
# only-in-initial will not be inherited by any substate like s1 and s2
goto s2
;
state s1 {
after 5s goto s2;
# inherits event close from initial
}
state s2 {
after 10s goto final;
event back
goto s1
;
# inherits event close from initial
}
}
state final {
initialize terminate;
}
For a longer FSE grammar example see
sample-workflow-v100.fse.
It is of no practical use, but it shows all available syntax elements.
DSL Prosa
For a detailed syntax description have a look at FSE.g4 containing
the language neutral antlr4 description.
An FSE workflow DSL has a Header naming the workflow and a non empty list of named states.
Each state can have an initialization, a time-out, a list of events and a list of
substates which are normal states. Substates inherit events, but not time-outs from their
containing state. Each initialization, time-out and event can have a list of consequences
described in the table below.
Workflows, states and events are named, i.e. they have a
mandatory key with a restricted set of allowed characters, namely only a..z, A..Z, 0..9 and
inner single dashes or underscores. Furthermore they have an option string, a display name
and a rolenames list. An example for a full named object would be:
I-am-Number-9:"option: A B C; hidden;"
as "This is my display name: I am #9"
for "aliens" "mortals" "immortals"
The workflow interpreter keeps a global map of strings indexed by a so called variables,
which are strings starting with a $
and the rest is constraind exactly as keys of named objects are.
This map is called the environment of this interpreter instance. If you ask for a variable's
value before it is written to, it returns the empty string, otherwise the string that was written to last.
Consequence |
Example |
Description |
assignment |
$x = "abc" |
sets the interpreter's environment variable $x to abc |
assignment |
$x = $y |
sets the interpreter's environment variable $x to the value of $y |
assignment with funcall |
$x = action("abc") |
dispatches action with parameter abc and sets the interpreter's environment variable $x the result of action(abc) where action is either a temporal activity with the name action or sth the code dispatches at will |
map assignment |
$$ = mapaction("abc" $r) |
dispatches mapaction with parameter abc and the value of $r and sets interpreter environment variables according to the map[string]string result of mapaction(abc $r) where mapaction is either a temporal activity with the name mapaction or sth the code dispatches at will |
funcall |
action($x "abc") |
dispatches action with the interpreter's environment variable $x and the literal abc as parameters. No return value is assumed |
terminate |
terminate |
stops consequence evaluation and ends the workflow |
halt |
halt |
stops consequence evaluation (can be prefixed by an if ) |
goto |
goto s1 or goto $x |
go to state s1 or to the state named in $x and initialize it, stop consequence evaluation (can be prefixed by an if ) |
ifgoto |
if $x goto/halt |
check if $x is not the empty string, if true execute goto or halt like described above |
ifgoto |
if $x == $y ... |
check if $x equals $y , if true execute goto or halt like described above |
ifgoto |
if $x != $y ... |
check if $x doesn't equal $y , if true execute goto or halt like described above |
ifgoto |
if $x == "abc" ... |
check if $x equals abc , if true execute goto or halt like described above |
ifgoto |
if $x != "abc" ... |
check if $x doesn't equal $y , if true execute goto or halt like described above |
So an if
always has a variable on the left, then an optional ==
or a !=
plus a literal or another variable
on the right and it is followed by a goto
or a halt
which always stop consequence evaluation.
A funcall always blocks.
Goto by Environment Variable
Some common application is to add some "Go Back" event to a container state with a
final consequence of goto $PREVIOUS
which is inherited to its inner states. This way
the workflow gets a quasi-automatic one step backward feature.
A goto $x
does this:
- lookup
$x
in the environment
- if the value is empty,
*new*
or the environment var $x
was never set, the goto
is silently ignored
- if the value, e.g.
s1
, is not empty and the state is known by the workflow it is treated
like a plain literal goto s1
in the example
- if the value is not a known state, the workflow errors out
Auto Environment Vars
The following table lists environment variables that the interpreter
takes care of itself.
Environment Variable |
Description |
Special Case |
$CURRENT |
Name of the state the workflow entered |
none |
$CURRENTDISPLAY |
Displayname of the state the workflow entered |
none |
$EVENT |
Name of the event that was signaled last |
For timer events this value is *timer* |
$EVENTDISPLAY |
Displayname of the event that was signaled last |
For timer events this value is after <duration> |
$PREVIOUS |
Name of the state the signal was received. It is kept even if multiple states are hoped over |
On workflow initialization the value is *new* |
$PREVIOUSDISPLAY |
Displayname of the state the signal was received. It is kept even if multiple states are hoped over |
On workflow initialization the value is empty |
Note: For easier reading it is recommended to restrict user defined environment
variables to lowercase letters and digits.
Usage
In this section we describe the intended use cases.
To Parse an FSE DSL within a Program
import (
...
"gitlab.com/lercher/fse-temporal/syntax"
)
var r io.Reader // open a reader from somewhere
// parse the reader's content, note that the whole byte stream must fit into memory
wf, diag, err := syntax.Parse(r, "") // or a file's path insted of "" if available
if err != nil {
for i := range diag {
// if an error ocurred there can be many diagnostic lines
log.Println(diag[i])
}
log.Fatalln(err)
}
// wf loaded successfully, inspect some properties
log.Printf("%s %q loaded successfully", wf.Name, wf.Display)
log.Println(wf.Description)
log.Printf("%v total states, %v top level states", len(wf.AllStates), len(wf.States))
Have a look at the grammar checker source code
for a more complete example.
To Use the Interpreter with a Temporal Worker
import (
...
"gitlab.com/lercher/fse-temporal/exec"
"gitlab.com/lercher/fse-temporal/syntax"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
In your workflow worker code wf, _, _ := syntax.Parse(r)
the reader
somewhere, decide upon which temporal TaskQueue(s)
your workflows and activities listen on (taskQueue
, actionsTaskQueue
)
and then register a generic workflow worker function by calling a setup
method like this one:
func setup(wf *syntax.Workflow, taskQueue, actionsTaskQueue string) error {
// if err != nil return err is omited for brevity
serviceClient, err := client.NewClient(client.Options{})
defer serviceClient.Close()
fseWorker := worker.New(serviceClient, taskQueue, worker.Options{})
fseWorker.RegisterWorkflowWithOptions(makeWorkflow(wf, actionsTaskQueue), workflow.RegisterOptions{
Name: string(wf.VersionedName()), // use the FSE workflow's versioned name as name for the temporal workflow
})
err = fseWorker.Start()
}
// makeWorkflow captures the wf definition and the actionsTaskQueue in a closure
func makeWorkflow(wf *syntax.Workflow, actionsTaskQueue string) func(ctx workflow.Context) error {
return func(ctx workflow.Context) error {
// nil means hear, that all called FSE activities are temporal activities
// otherwise you could plug-in a dispatcher function maping FSE names to general function calls
in := exec.New(ctx, wf, nil)
in.ActivityOptions.TaskQueue = actionsTaskQueue
// "in" is now an interpreter for wf bound to this workflow.Context
// with its own environment listening for events and delays, dispatching
// activities called by consequences to temporal activities listening at actionsTaskQueue
return in.Run() // returns nil on termination or an error if sth went wrong
}
}
For a more complete example, have a look at the web server's
source at cmd/fse.
To Use the File Based Cache
We provide a cache for FSE DSLs stored within files, where the cache invalidates a
stored *syntax.Workflow
when the modification timestamp of the underlying file
changes. Instead of syntax.Parse()
create a new cache struct and call its Workflow()
method with the same return signature:
import (
"gitlab.com/lercher/fse-temporal/cache"
)
c := cache.New() // Has a variant NewWith() for providing your own version of os.Open and os.Stat
fn := "my-workflow.fse"
wf, diag, err := c.Workflow(fn)
if err != nil {
for i := range diag {
log.Println(diag[i].String()) // diag[i].String() already contains the file name
}
panic(err) // err includes fn
}
To Run the Grammar checker
The grammar checker is located under cmd/grammar.
It checks an FSE grammer file for syntax and semantic errors and it
has an option to convert a grammar file to a JSON representation
to enable use cases beside using the packages provided by this
repository. To build it, cd cmd/grammar
and go build
it. There
is a sample.fse file and the JSON for that
file sample.json.
To check the sample file for errors:
C:\...\fse-temporal\cmd\grammar>grammar.exe sample.fse
2020/08/04 19:32:51 Sample-Workflow "Display Name" loaded successfully
2020/08/04 19:32:51 Triple dash delimited Explanation of what this workflow does. Can contain
any characters, markdown is an option. This text is to be shown
as in info box on a user interface for this workflow.
2020/08/04 19:32:51 6 total states, 4 top level states
2020/08/04 19:32:51 9.0302ms total processing time
C:\...\fse-temporal\cmd\grammar>
To convert the sample file to json add the -j option:
C:\...\fse-temporal\cmd\grammar>grammar.exe -j sample.fse > sample.json
2020/08/04 19:34:40 Sample-Workflow "Display Name" loaded successfully
2020/08/04 19:34:40 Triple dash delimited Explanation of what this workflow does. Can contain
any characters, markdown is an option. This text is to be shown
as in info box on a user interface for this workflow.
2020/08/04 19:34:40 6 total states, 4 top level states
2020/08/04 19:34:40 4.9881ms total processing time
C:\...\fse-temporal\cmd\grammar>type sample.json
{
"name": "Sample-Workflow",
"option": "for: STH;",
"display": "Display Name",
"description": "Triple dash delimited Explanation of what this workflow does. Can contain\r\nany characters, markdown is an option. This text is to be shown\r\nas in info box on a user interface for this workflow.",
"roles": [
"me",
"and",
"you"
],
"states": [
...
To Run the Example Website
As a preparation for running the sample website, you need
a running installation of a temporal server v1 or greater.
Follow their instruction using docker-compose or have a look at the
description below on using podman play.
Then build and start the server in the cmd/fse
subdirectory.
Assuming temporal runs locally and you want to examine
the sample workflows we provide that's:
cd cmd/fse
go run .
or under a Windows cmd shell:
C:\...\fse-temporal>cd cmd\fse
C:\...\fse-temporal\cmd\fse>go build && fse.exe
You are probably asked by the Windows firewall to allow traffic for port 9001.
Then open http://localhost:9001/
with your favourite browser.
To Run the API Server
The API server's published API is described in detail in
cmd/wfs/readme.md. It provides lists
for startable and already running workflow instances for
particular business objects represented by an option string and
and ID. Furthermore operations to create, inspect, signal and terminate
instances. To run it:
cd cmd/wfs
go run .
API Server UI
The API server also listens on requests to /
and displays a UI
of the workflow types it hosts. Query Parameters:
Paramname |
Description |
name |
Name of the workflow to be displayed. If empty or missing show a list of all available workflow types. |
version |
Use this particular version of the workflow. Only relavant if name is not empty. If missing, an arbitrary version is shown. |
embed |
Optional. If not empty, only a header- and footer-less html fragment is returned for embedding into a differnt web site. |
Please note that the UI format is subject to arbitrary changes and enhancements and thus
it may probably not be suitable for particular needs. It's probably better to parse an own syntax
graph
and render it yourself instead of embedding or referencing to the API server's UI.
However, if there is no need for a particular view, it's handy to have a UI to get
a quick overview over what workflow types are available on this API server and to inspect
their definitions.
To Access the API Server from a Go Program
To access the wfs server's http API, import the wfsclient
package, create a Client
with the base URL and use its methods.
Hosting the Registry Workflow (stand-alone)
C:\...\fse-temporal\cmd\registry-worker>go build && registry-worker.exe -temporal temporal-server:7233
2020/08/04 17:10:13 INFO No logger configured for temporal client. Created default one.
2020/08/04 17:10:13 INFO No activities registered. Skipping activity worker start Namespace default TaskQueue _registry WorkerID 20792@LPOD@
2020/08/04 17:10:13 INFO Started Worker Namespace default TaskQueue _registry WorkerID 20792@LPOD@
2020/08/04 17:10:13 hosting the Registry workflow, press ^C to stop
[--- ^C pressed ---]
2020/08/04 17:10:22 Stopping worker ...
2020/08/04 17:10:22 INFO Stopped Worker Namespace default TaskQueue _registry WorkerID 20792@LPOD@
C:\...\fse-temporal\cmd\registry-worker>
Using temporal with Podman
This is quite a straight-forward conversion of the provided
docker-compose.yml file to
a k8s pod specification to be used via podman play kube
.
The only important difference is, that the pod internal
communication goes over localhost and not a
docker internal DNS name. So we need to specify that
the temporal server binds to 127.0.0.1 instead of the
host's proper IP address, which is the default.
Creating and Starting the services as a single pod
podman play kube temporal-latest.yaml
Starting/Stopping the pod
podman pod start/stop temporal-latest
Storing Cassandra Data locally
The cassandra container stores its data files in
/var/lib/cassandra
, so interactively you would
give podman the option
podman ... -v /my/own/datadir:/var/lib/cassandra ...
However, I guess, we should to do this in the k8s yaml file. But
I can't find docs on that one, only issues. So we probably
stick with storing data within the cassandra container.
It's a different story with prod deployments, of course. But
there we're going to run persistent storage on a different
server anyway.
Inspecting the Server
podman exec -it temporal-srv sh
E.g. listing open TCP ports:
etc/temporal/config # netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:9042 0.0.0.0:* LISTEN -
tcp 0 0 :::8088 :::* LISTEN -
tcp 0 0 10.0.2.100:7233 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:7234 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:7235 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:7239 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:6933 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:6934 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:6935 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:6939 0.0.0.0:* LISTEN 117/temporal-server
tcp 0 0 10.0.2.100:7000 0.0.0.0:* LISTEN -
tcp 0 0 127.0.0.1:8081 0.0.0.0:* LISTEN -
tcp 0 0 127.0.0.1:46653 0.0.0.0:* LISTEN -
tcp 0 0 127.0.0.1:7199 0.0.0.0:* LISTEN -
/etc/temporal/config #
License BSD 3-clause
This project is licensed under the BSD 3-clause "New" or "Revised" License.
Breaking Changes
We try to avoid breaking changes, but sometimes it's not possible.
v0.8.15
With the upgrade of temporal.io from v1.7 to 1.16.1 we noticed a serious bug
regarding taskqueue names and multiple wokers within our single process
approach. So we decided to change the API:
wfs.Serveroptions
no more has ActionqueueNameOverride
and ActionqueueName()
. TaskqueueName
is now used for temporal actions, too.
func (so ServerOptions) InitializeServer/2
has a new mandatory parameter: a callback to add activities to the single internal worker.