Documentation ¶
Index ¶
- Variables
- func ActivityName(ctx context.Context) string
- func ActivityToken(ctx context.Context) string
- func Config(ctx context.Context, key string) (interface{}, bool)
- func ConfigInt(ctx context.Context, key string) (int, bool)
- func ConfigString(ctx context.Context, key string) (string, bool)
- func IsNonRestorable(ctx context.Context) bool
- func IsWorkflowRestore(ctx context.Context) bool
- func Logger(ctx context.Context) *slog.Logger
- func NopActivity(ctx context.Context, input []byte) ([]byte, error)
- func ParentWorkflowID(ctx context.Context) string
- func RouteTo(nodeKey string) error
- func SignalWorkflowRestore(ctx context.Context) context.Context
- func WithNewWorkflowID(ctx context.Context) context.Context
- func WithNonRestorable(ctx context.Context) context.Context
- func WithParentWorkflowID(ctx context.Context, id string) context.Context
- func WithWorkflowID(ctx context.Context, id string) context.Context
- func WorkflowID(ctx context.Context) string
- type Activity
- type DynamicRoute
- type Edge
- type Entrypoint
- type Middleware
- type Node
- func (n *Node) ActivityStartTime(ctx context.Context) time.Time
- func (n *Node) ConfigString(key string) (string, bool)
- func (n *Node) GetConfigIntArray(key string) ([]int, bool)
- func (n *Node) GetConfigStringArray(key string) ([]string, bool)
- func (n *Node) IsRetryableError(err error) bool
- func (n *Node) IsTrigger() bool
- func (n *Node) RetryAttempts(ctx context.Context) int
- func (n *Node) RetryPolicyOrDefault(d *RetryPolicy) *RetryPolicy
- type NodeLink
- type NodeMetadata
- type NodeOption
- type NodeType
- type Orchestrator
- func (o *Orchestrator) GetActivity(node *Node) (Activity, bool)
- func (o *Orchestrator) GetNode(graphNode graph.Node) (*Node, bool)
- func (o *Orchestrator) GetReducer(node *Node) (Reducer, bool)
- func (o *Orchestrator) LoadWorkflow(w *Workflow) error
- func (o *Orchestrator) RegisterActivity(name string, activity Activity)
- func (o *Orchestrator) RegisterReducer(name string, reducer Reducer)
- func (o *Orchestrator) RestorableWorkflows(ctx context.Context) (RestorableWorkflows, error)
- func (o *Orchestrator) RestoreWorkflowsAsync(ctx context.Context, withChildWorkflows bool) error
- func (o *Orchestrator) RunTriggers(ctx context.Context, input []byte) ([]byte, error)
- func (o *Orchestrator) Start(ctx context.Context, data []byte) (output []byte, err error)
- func (o *Orchestrator) StartAsync(ctx context.Context, data []byte) (output []byte, err error)
- func (o *Orchestrator) Use(mw Middleware)
- type OrchestratorOption
- func WithDefaultRetryPolicy(policy *RetryPolicy) OrchestratorOption
- func WithDisableActivityPanicRecovery() OrchestratorOption
- func WithFailWorkflowOnActivityPanic() OrchestratorOption
- func WithForcedTransitions() OrchestratorOption
- func WithLogger(logger *slog.Logger) OrchestratorOption
- func WithPersistence(persister persistence.Persister) OrchestratorOption
- type OrchestratorRegistry
- type Reducer
- type RestorableWorkflowState
- type RestorableWorkflows
- type RetryPolicy
- type Snapshotter
- type Workflow
- func (wf *Workflow) AddNewNode(activity string, options ...NodeOption) *Workflow
- func (wf *Workflow) AddNode(node *Node) *Workflow
- func (wf *Workflow) Codegen() (string, error)
- func (wf *Workflow) Export() ([]byte, error)
- func (wf *Workflow) ExportMermaid(indent string, nodeToChildWorkflows map[string]*Workflow, ...) []byte
- func (wf *Workflow) ExportMermaidHTML(indent string, optionalChildWorkflows map[string]*Workflow, ...) ([]byte, error)
- func (wf *Workflow) ExportMermaidToFile(filename string, optionalChildWorkflows map[string]*Workflow, ...) error
- func (wf *Workflow) Import(workflow []byte) error
- func (wf *Workflow) Link(from, to string) *Workflow
- func (wf *Workflow) LinkWithLabel(from, to string, label string) *Workflow
- func (wf *Workflow) Then(node *Node) *Workflow
- func (wf *Workflow) ThenNewNode(activity string, options ...NodeOption) *Workflow
- func (wf *Workflow) WithLabel(label string) *Workflow
Constants ¶
This section is empty.
Variables ¶
var ( ErrNodeNotFound = fmt.Errorf("node not found") ErrWorkflowNodeAlreadyExists = fmt.Errorf("node already exists") ErrWorkflowInvalid = fmt.Errorf("invalid, it has no trigger or no starting node") ErrWorkflowHasNoTriggers = fmt.Errorf("no triggers found") ErrWorkflowHasNoNodes = fmt.Errorf("workflow has no nodes") ErrOrchestratorActivityNotFound = fmt.Errorf("activity not found") ErrOrchestratorHasNoPersister = fmt.Errorf("no persister set") ErrNoWorkflowID = fmt.Errorf("no execution ID set") )
var ErrPanic = errors.New("panic in activity")
var ErrRestorable = fmt.Errorf("workflow is restorable")
Functions ¶
func ActivityName ¶
func ActivityToken ¶
func IsNonRestorable ¶
func IsWorkflowRestore ¶ added in v0.5.0
func NopActivity ¶ added in v0.6.0
func ParentWorkflowID ¶ added in v0.5.0
func SignalWorkflowRestore ¶ added in v0.5.0
func WithNewWorkflowID ¶ added in v0.5.0
func WithParentWorkflowID ¶ added in v0.5.0
func WorkflowID ¶
Types ¶
type DynamicRoute ¶
func (*DynamicRoute) Error ¶
func (e *DynamicRoute) Error() string
func (*DynamicRoute) MarshalJSON ¶
func (e *DynamicRoute) MarshalJSON() ([]byte, error)
func (*DynamicRoute) UnmarshalJSON ¶
func (e *DynamicRoute) UnmarshalJSON(data []byte) error
func (*DynamicRoute) Unwrap ¶
func (e *DynamicRoute) Unwrap() error
type Middleware ¶
type Node ¶
type Node struct { ID int64 `json:"id"` ActivityName string `json:"activity"` Config map[string]interface{} `json:"config,omitempty"` Type NodeType `json:"type,omitempty"` RetryPolicy *RetryPolicy `json:"retry,omitempty"` EditLink *string `json:"link,omitempty"` DisableSequentialFlow bool `json:"disable_sequential_flow,omitempty"` }
func NewNode ¶
func NewNode(activity string, options ...NodeOption) *Node
func (*Node) GetConfigIntArray ¶ added in v0.7.0
func (*Node) GetConfigStringArray ¶ added in v0.7.0
func (*Node) IsRetryableError ¶
func (*Node) RetryPolicyOrDefault ¶
func (n *Node) RetryPolicyOrDefault(d *RetryPolicy) *RetryPolicy
type NodeMetadata ¶ added in v0.7.0
type NodeMetadata struct { Description string // Node description, which may include line breaks Links []NodeLink // List of links to display within the node Standalone bool // Flag indicating if the node should be rendered as standalone }
Define NodeMetadata struct with optional description and links
type NodeOption ¶
type NodeOption func(*Node)
func WithNodeConfig ¶
func WithNodeConfig(config map[string]interface{}) NodeOption
func WithNodeDisableSequentialFlow ¶ added in v0.6.0
func WithNodeDisableSequentialFlow() NodeOption
func WithNodeEditLink ¶ added in v0.3.0
func WithNodeEditLink(link string) NodeOption
func WithNodeID ¶
func WithNodeID(id int64) NodeOption
func WithNodeRetryPolicy ¶
func WithNodeRetryPolicy(policy *RetryPolicy) NodeOption
func WithNodeType ¶
func WithNodeType(t NodeType) NodeOption
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
func NewOrchestrator ¶
func NewOrchestrator(options ...OrchestratorOption) *Orchestrator
func (*Orchestrator) GetActivity ¶
func (o *Orchestrator) GetActivity(node *Node) (Activity, bool)
func (*Orchestrator) GetReducer ¶
func (o *Orchestrator) GetReducer(node *Node) (Reducer, bool)
func (*Orchestrator) LoadWorkflow ¶
func (o *Orchestrator) LoadWorkflow(w *Workflow) error
func (*Orchestrator) RegisterActivity ¶
func (o *Orchestrator) RegisterActivity(name string, activity Activity)
func (*Orchestrator) RegisterReducer ¶
func (o *Orchestrator) RegisterReducer(name string, reducer Reducer)
func (*Orchestrator) RestorableWorkflows ¶
func (o *Orchestrator) RestorableWorkflows(ctx context.Context) (RestorableWorkflows, error)
func (*Orchestrator) RestoreWorkflowsAsync ¶
func (o *Orchestrator) RestoreWorkflowsAsync(ctx context.Context, withChildWorkflows bool) error
func (*Orchestrator) RunTriggers ¶
func (*Orchestrator) StartAsync ¶
func (*Orchestrator) Use ¶
func (o *Orchestrator) Use(mw Middleware)
type OrchestratorOption ¶
type OrchestratorOption func(*Orchestrator)
func WithDefaultRetryPolicy ¶ added in v0.2.0
func WithDefaultRetryPolicy(policy *RetryPolicy) OrchestratorOption
func WithDisableActivityPanicRecovery ¶ added in v0.7.0
func WithDisableActivityPanicRecovery() OrchestratorOption
func WithFailWorkflowOnActivityPanic ¶ added in v0.5.0
func WithFailWorkflowOnActivityPanic() OrchestratorOption
func WithForcedTransitions ¶ added in v0.6.0
func WithForcedTransitions() OrchestratorOption
func WithLogger ¶
func WithLogger(logger *slog.Logger) OrchestratorOption
func WithPersistence ¶
func WithPersistence(persister persistence.Persister) OrchestratorOption
type OrchestratorRegistry ¶ added in v0.5.0
type OrchestratorRegistry map[string]*Orchestrator
func NewOrchestratorRegistry ¶ added in v0.5.0
func NewOrchestratorRegistry() OrchestratorRegistry
func (OrchestratorRegistry) Get ¶ added in v0.5.0
func (r OrchestratorRegistry) Get(name string) (*Orchestrator, error)
func (OrchestratorRegistry) Set ¶ added in v0.5.0
func (r OrchestratorRegistry) Set(name string, o *Orchestrator)
type RestorableWorkflowState ¶ added in v0.5.0
type RestorableWorkflowState struct { Status *persistence.WorkflowStatus Entrypoint Entrypoint }
type RestorableWorkflows ¶
type RestorableWorkflows map[string]map[string]RestorableWorkflowState
RestorableWorkflows returns a map of workflowName to a map of workflowID to a an array of node functions that allow the client to restore them.
type RetryPolicy ¶
type Snapshotter ¶ added in v0.6.0
type Snapshotter struct {
// contains filtered or unexported fields
}
func NewAttributeShapshotter ¶ added in v0.6.0
func NewAttributeShapshotter(fields []string) *Snapshotter
NewAttributeShapshotter can be used for both full and partial snapshotting.
func NewSnapshotter ¶ added in v0.6.0
func NewSnapshotter() *Snapshotter
NewShapshotter initializes a new checkpoint activity with data storage.
func (*Snapshotter) Delete ¶ added in v0.6.0
Delete removes checkpoint data for a given workflow ID.
func (*Snapshotter) Save ¶ added in v0.6.0
Save stores data on first invocation, returns stored data on subsequent calls.
func (*Snapshotter) SaveAttributes ¶ added in v0.6.0
type Workflow ¶
type Workflow struct { Name string `json:"name"` Nodes map[string]*Node `json:"nodes"` Edges []*Edge `json:"edges"` // contains filtered or unexported fields }
func ImportMermaid ¶ added in v0.8.0
func NewWorkflow ¶
func (*Workflow) AddNewNode ¶ added in v0.5.0
func (wf *Workflow) AddNewNode(activity string, options ...NodeOption) *Workflow
func (*Workflow) ExportMermaid ¶
func (wf *Workflow) ExportMermaid(indent string, nodeToChildWorkflows map[string]*Workflow, nodeToMetadata map[string]NodeMetadata) []byte
ExportMermaid generates the Mermaid representation of the workflow. Optionally, it can include child workflows as subgraphs and requires a map of node names to corresponding child workflows they spawn. It also accepts an optional nodeToMetadata parameter to add descriptions and links to nodes.
func (*Workflow) ExportMermaidHTML ¶ added in v0.3.0
func (*Workflow) ExportMermaidToFile ¶
func (*Workflow) LinkWithLabel ¶ added in v0.8.0
func (*Workflow) ThenNewNode ¶ added in v0.5.0
func (wf *Workflow) ThenNewNode(activity string, options ...NodeOption) *Workflow