asyncmachine-go
asyncmachine-go
is a minimal implementation of AsyncMachine in
Golang using channels and context. It aims at simplicity and speed.
It can be used as a lightweight in-memory Temporal alternative, worker for
Asynq, or to write simple consensus engines, stateful firewalls, telemetry, bots,
etc.
AsyncMachine is a relational state machine which never blocks.
package main
import (
"context"
"fmt"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
func main() {
ctx := context.Background()
m := am.New(ctx, am.States{
"Foo": {
Add: am.S{"Bar"}
},
"Bar": {},
}, nil)
m.Add(am.S{"Foo"}, nil)
fmt.Printf("%s", m) // (Foo:1 Bar:1)
}
Examples
am.States{
// CreateExpenseActivity
"CreatingExpense": {
Remove: am.S{"ExpenseCreated"},
},
"ExpenseCreated": {
Remove: am.S{"CreatingExpense"},
},
// WaitForDecisionActivity
"WaitingForApproval": {
Auto: true,
Require: am.S{"ExpenseCreated"},
Remove: am.S{"ApprovalGranted"},
},
"ApprovalGranted": {
Require: am.S{"ExpenseCreated"},
Remove: am.S{"WaitingForApproval"},
},
// PaymentActivity
"PaymentInProgress": {
Auto: true,
Require: am.S{"ApprovalGranted"},
Remove: am.S{"PaymentCompleted"},
},
"PaymentCompleted": {
Require: am.S{"ExpenseCreated", "ApprovalGranted"},
Remove: am.S{"PaymentInProgress"},
},
}
am.States{
// DownloadFileActivity
"DownloadingFile": {
Remove: am.S{"FileDownloaded"},
},
"FileDownloaded": {
Remove: am.S{"DownloadingFile"},
},
// ProcessFileActivity
"ProcessingFile": {
Auto: true,
Require: am.S{"FileDownloaded"},
Remove: am.S{"FileProcessed"},
},
"FileProcessed": {
Remove: am.S{"ProcessingFile"},
},
// UploadFileActivity
"UploadingFile": {
Auto: true,
Require: am.S{"FileProcessed"},
Remove: am.S{"FileUploaded"},
},
"FileUploaded": {
Remove: am.S{"UploadingFile"},
},
}
func HandleFileProcessingTask(ctx context.Context, t *asynq.Task) error {
var p FileProcessingPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Processing file %s", p.Filename)
// use the FileProcessing workflow ported from Temporal
machine, err := processor.FileProcessingFlow(ctx, log.Printf, p.Filename)
if err != nil {
return err
}
// save the machine state as the result
ret := machine.String()
if _, err := t.ResultWriter().Write([]byte(ret)); err != nil {
return fmt.Errorf("failed to write task result: %v", err)
}
return nil
}
Documentation
Status
Beta - although the ideas behind AsyncMachine have been proven to work, the golang implementation is fairly fresh
and may suffer from bugs, memory leaks, race conditions and even panics.
Please report bugs.
TODO
FUT issues.