dsl

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2022 License: MPL-2.0 Imports: 4 Imported by: 2

Documentation

Overview

Package dsl provides a simple and powerful Go API for defining and executing Temporal workflow and activities using a Domain-Specific Language (DSL).

As an example, this package is used by the Operations workflow of the sqlike specification, allowing clients such as the "tsql" CLI to define and execute SQL workflows based on flows defined by end-users.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewApplicationError

func NewApplicationError(message string, errType string, cause error, failedAt *FailedActivity) error

NewApplicationError is a wrapper to handle Temporal retryable errors consistently in activities of a DSL workflow. If activity is not part of a DSL workflow, you shall use errors.NewApplicationError instead.

func NewNonRetryableApplicationError

func NewNonRetryableApplicationError(message string, errType string, cause error, failedAt *FailedActivity) error

NewNonRetryableApplicationError is a wrapper to handle Temporal non-retryable errors consistently in activities of a DSL workflow. If activity is not part of a DSL workflow, you shall use errors.NewNonRetryableApplicationError instead.

Types

type ActivityToRollback

type ActivityToRollback struct {

	// Name is the name of the Temporal activity type to execute.
	Name string `json:"name"`

	// Input is the first argument of the Temporal activity to execute, transformed
	// as a map.
	Input map[string]any `json:"input"`

	// Step holds details about the current step important for the activity,
	// transformed as a map. It shall be an extract of Input.
	Step map[string]any `json:"step"`

	// Policy holds the activity policy to apply.
	Policy lifecycle.ActivityPolicy `json:"policy"`
	// contains filtered or unexported fields
}

ActivityToRollback is used to express invoking a Temporal activity, using it's "rollback" logic. In other words, when the parent Statement's state holding this ActivityToRollback is "rollingback".

func (ActivityToRollback) Execute

func (a ActivityToRollback) Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)

Execute executes the activity to rollback.

type ActivityToRun

type ActivityToRun struct {

	// Name is the name of the Temporal activity type to execute.
	Name string `json:"name"`

	// Input is the first argument of the Temporal activity to execute, transformed
	// as a map.
	Input map[string]any `json:"input"`

	// Step holds details about the current step important for the activity,
	// transformed as a map. It shall be an extract of Input.
	Step map[string]any `json:"step"`

	// Policy holds the activity policy to apply.
	Policy lifecycle.ActivityPolicy `json:"policy"`
	// contains filtered or unexported fields
}

ActivityToRun is used to express invoking a Temporal activity, using it's "run" logic. In other words, when the parent Statement's state holding this ActivityToRun is "running".

func (ActivityToRun) Execute

func (a ActivityToRun) Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)

Execute executes the activity to run.

type Executable

type Executable interface {
	Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)
}

Executable is implemented by each buidling block of the DSL API: Statement, Sequence, Parallel, ActivityToRun, ActivityToRollback. Having a common interface allows to simply Execute a block, regardless its kind and position in the tree of Statement.

type FailedActivity

type FailedActivity struct {

	// Name is the name of the failed activity type.
	Name string `json:"name"`

	// Step holds details about the failed activity important for the activity,
	// transformed as a map. This is the same Step when defined in ActivityToRollback.
	Step map[string]any `json:"step"`
}

FailedActivity is returned in parent workflow results when an activity has failed.

type OnRollbackCompensation

type OnRollbackCompensation string

OnRollbackCompensation is a custom string type to be aware of the different compensation logic that are possible when an activity is "rollingback".

const RollbackToContinue OnRollbackCompensation = "continue"

RollbackToContinue indicates to continue the current workflow even if an error occured in the activity.

const RollbackToStop OnRollbackCompensation = "stop"

RollbackToStop indicates to stop the current workflow if an error occured in the activity.

type OnRollbackReaction

type OnRollbackReaction struct {

	// Timeout indicates what compensation to apply on the workflow when timeout
	// is exceeded.
	Timeout OnRollbackCompensation `json:"timeout,omitempty"`

	// Canceled indicates what compensation to apply on the workflow when it is
	// canceled.
	Canceled OnRollbackCompensation `json:"canceled,omitempty"`

	// Terminated indicates what compensation to apply on the workflow when it is
	// terminated.
	Terminated OnRollbackCompensation `json:"terminated,omitempty"`

	// Error indicates what compensation to apply on the workflow when an error
	// occured in one of the activity.
	Error OnRollbackCompensation `json:"error,omitempty"`
}

OnRollbackReaction indicates the compensation to apply if an error occured when an activity is "rollingback".

type OnRunCompensation

type OnRunCompensation string

OnRunCompensation is a custom string type to be aware of the different compensation logic that are possible when an activity is "running".

const RunToContinue OnRunCompensation = "continue"

RunToContinue indicates to continue the current workflow even if an error occured in the activity.

const RunToRollback OnRunCompensation = "rollback"

RunToRollback indicates to rollback the current workflow if an error occured in the activity.

const RunToStop OnRunCompensation = "stop"

RunToStop indicates to stop the current workflow if an error occured in the activity.

type OnRunReaction

type OnRunReaction struct {

	// Timeout indicates what compensation to apply on the workflow when timeout
	// is exceeded.
	Timeout OnRunCompensation `json:"timeout,omitempty"`

	// Canceled indicates what compensation to apply on the workflow when it is
	// canceled.
	Canceled OnRunCompensation `json:"canceled,omitempty"`

	// Terminated indicates what compensation to apply on the workflow when it is
	// terminated.
	Terminated OnRunCompensation `json:"terminated,omitempty"`

	// Error indicates what compensation to apply on the workflow when an error
	// occured in one of the activity.
	Error OnRunCompensation `json:"error,omitempty"`
}

OnRunReaction indicates the compensation to apply if an error occured when an activity is "running".

type Parallel

type Parallel struct {

	// Branches holds the Statements to execute.
	Branches []*Statement `json:"branches"`
}

Parallel consists of a collection of Statements that runs in parallel (async).

func (Parallel) Execute

func (p Parallel) Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)

Execute executes the collection of Statements in parallel (async).

type Reaction

type Reaction struct {

	// Run determines the logic to apply when the parent Statement is "running".
	Run OnRunReaction `json:"run"`

	// Rollback determines the logic to apply when the parent Statement is
	// "rollingback".
	Rollback OnRollbackReaction `json:"rollback"`
}

Reaction holds the compensation logic to apply when a Statement is "running" or "rollingback".

type Sequence

type Sequence struct {

	// Elements holds the Statements to execute.
	Elements []*Statement `json:"elements"`
}

Sequence consists of a collection of Statements that runs in sequential (sync).

func (Sequence) Execute

func (s Sequence) Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)

Execute executes the collection of Statements in sequential (sync).

type State

type State struct {

	// Status is the status of the Statement. It's one of "running", "rollingback".
	Status lifecycle.Status `json:"status"`

	// FailedAt holds the details of the failed activity, if applicable. If an error
	// occured in the workflow but is not related to an activity failing (such as
	// workflow canceled), this will be nil.
	FailedAt *FailedActivity `json:"failed_at,omitempty"`
}

State holds the current State of the parent Statement.

type Statement

type Statement struct {

	// On holds the compensation logic to apply when the Statement is "running" or
	// "rollingback".
	On Reaction `json:"on"`

	// Previous holds the previous Statement in the tree.
	Previous *Statement `json:"-"`

	// Next holds the next Statement in the tree.
	Next *Statement `json:"-"`

	// Run is the activity to execute when "running" the Statement.
	Run *ActivityToRun `json:"run,omitempty"`

	// Rollback is the activity to execute when "rollingback" the Statement.
	Rollback *ActivityToRollback `json:"rollback,omitempty"`

	// Sequence can hold Statements to execute in sequential.
	Sequence *Sequence `json:"sequence,omitempty"`

	// Parallel can hold Statements to execute in parallel.
	Parallel *Parallel `json:"parallel,omitempty"`
	// contains filtered or unexported fields
}

Statement is the building block of DSL workflow. A Statement can be an Activity (set by ActivityToRun for running it and ActivityToRollback for rolling it back) or it could be a Sequence (sync) or Parallel (async) of Statements to execute.

func Revert

func Revert(stmt *Statement, failedAt *FailedActivity) *Statement

Revert reverts a statement's tree so it can then be rolled back in a reversed order.

func (*Statement) Execute

func (stmt *Statement) Execute(ctx workflow.Context, input map[string]any) (map[string]any, error)

Execute executes the Statement.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL