Documentation ¶
Index ¶
- Constants
- Variables
- func LoadWorkflowState(ctx context.Context, actorRuntime actors.Actors, actorID string, ...) (*workflowState, error)
- func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig, ...) actors.InternalActorFactory
- func NewActorBackend(md wfbe.Metadata, _ logger.Logger) (backend.Backend, error)
- func NewActorsBackendConfig(appID string) actorsBackendConfig
- func NewDurableTimer(bytes []byte, generation uint64) durableTimer
- func NewWorkflowActor(scheduler workflowScheduler, config actorsBackendConfig, ...) actors.InternalActorFactory
- func NewWorkflowState(config actorsBackendConfig) *workflowState
- type ActivityRequest
- type ActorBackend
- func (*ActorBackend) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error
- func (*ActorBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error
- func (abe *ActorBackend) AddNewOrchestrationEvent(ctx context.Context, id api.InstanceID, e *backend.HistoryEvent) error
- func (*ActorBackend) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error
- func (*ActorBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error
- func (abe *ActorBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, ...) error
- func (*ActorBackend) CreateTaskHub(context.Context) error
- func (*ActorBackend) DeleteTaskHub(context.Context) error
- func (abe *ActorBackend) DisableActorCaching(disable bool)
- func (abe *ActorBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error)
- func (abe *ActorBackend) GetInternalActorsMap() map[string]actors.InternalActorFactory
- func (abe *ActorBackend) GetOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
- func (abe *ActorBackend) GetOrchestrationRuntimeState(ctx context.Context, owi *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error)
- func (abe *ActorBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error)
- func (abe *ActorBackend) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error
- func (abe *ActorBackend) RegisterActor(ctx context.Context) error
- func (abe *ActorBackend) SetActivityTimeout(timeout time.Duration)
- func (abe *ActorBackend) SetActorReminderInterval(interval time.Duration)
- func (abe *ActorBackend) SetActorRuntime(ctx context.Context, actorRuntime actors.ActorRuntime)
- func (abe *ActorBackend) SetWorkflowTimeout(timeout time.Duration)
- func (abe *ActorBackend) Start(ctx context.Context) error
- func (*ActorBackend) Stop(context.Context) error
- func (abe *ActorBackend) String() string
- func (abe *ActorBackend) WaitForActorsReady(ctx context.Context)
- type CreateWorkflowInstanceRequest
Constants ¶
const ( WorkflowNameLabelKey = "workflow" ActivityNameLabelKey = "activity" )
const ( CallbackChannelProperty = "dapr.callback" CreateWorkflowInstanceMethod = "CreateWorkflowInstance" GetWorkflowMetadataMethod = "GetWorkflowMetadata" AddWorkflowEventMethod = "AddWorkflowEvent" PurgeWorkflowStateMethod = "PurgeWorkflowState" GetWorkflowStateMethod = "GetWorkflowState" )
Variables ¶
var ErrDuplicateInvocation = errors.New("duplicate invocation")
Functions ¶
func LoadWorkflowState ¶
func NewActivityActor ¶
func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig, opts *activityActorOpts) actors.InternalActorFactory
NewActivityActor creates an internal activity actor for executing workflow activity logic.
func NewActorBackend ¶
func NewActorsBackendConfig ¶
func NewActorsBackendConfig(appID string) actorsBackendConfig
NewActorsBackendConfig creates a new workflow engine configuration
func NewDurableTimer ¶
func NewWorkflowActor ¶
func NewWorkflowActor(scheduler workflowScheduler, config actorsBackendConfig, opts *workflowActorOpts) actors.InternalActorFactory
func NewWorkflowState ¶
func NewWorkflowState(config actorsBackendConfig) *workflowState
Types ¶
type ActivityRequest ¶
type ActivityRequest struct {
HistoryEvent []byte
}
ActivityRequest represents a request by a worklow to invoke an activity.
type ActorBackend ¶
type ActorBackend struct {
// contains filtered or unexported fields
}
func (*ActorBackend) AbandonActivityWorkItem ¶
func (*ActorBackend) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error
AbandonActivityWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow activity execution pipeline.
func (*ActorBackend) AbandonOrchestrationWorkItem ¶
func (*ActorBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error
AbandonOrchestrationWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow orchestration execution pipeline.
func (*ActorBackend) AddNewOrchestrationEvent ¶
func (abe *ActorBackend) AddNewOrchestrationEvent(ctx context.Context, id api.InstanceID, e *backend.HistoryEvent) error
AddNewOrchestrationEvent implements backend.Backend and sends the event e to the workflow actor identified by id.
func (*ActorBackend) CompleteActivityWorkItem ¶
func (*ActorBackend) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error
CompleteActivityWorkItem implements backend.Backend
func (*ActorBackend) CompleteOrchestrationWorkItem ¶
func (*ActorBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error
CompleteOrchestrationWorkItem implements backend.Backend
func (*ActorBackend) CreateOrchestrationInstance ¶
func (abe *ActorBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error
CreateOrchestrationInstance implements backend.Backend and creates a new workflow instance.
Internally, creating a workflow instance also creates a new actor with the same ID. The create request is saved into the actor's "inbox" and then executed via a reminder thread. If the app is scaled out across multiple replicas, the actor might get assigned to a replicas other than this one.
func (*ActorBackend) CreateTaskHub ¶
func (*ActorBackend) CreateTaskHub(context.Context) error
CreateTaskHub implements backend.Backend
func (*ActorBackend) DeleteTaskHub ¶
func (*ActorBackend) DeleteTaskHub(context.Context) error
DeleteTaskHub implements backend.Backend
func (*ActorBackend) DisableActorCaching ¶
func (abe *ActorBackend) DisableActorCaching(disable bool)
DisableActorCaching turns off the default caching done by the workflow and activity actors. This method is primarily intended to be used for testing to ensure correct behavior when actors are newly activated on nodes, but without requiring the actor to actually go through activation.
func (*ActorBackend) GetActivityWorkItem ¶
func (abe *ActorBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error)
GetActivityWorkItem implements backend.Backend
func (*ActorBackend) GetInternalActorsMap ¶
func (abe *ActorBackend) GetInternalActorsMap() map[string]actors.InternalActorFactory
InternalActors returns a map of internal actors that are used to implement workflows
func (*ActorBackend) GetOrchestrationMetadata ¶
func (abe *ActorBackend) GetOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
GetOrchestrationMetadata implements backend.Backend
func (*ActorBackend) GetOrchestrationRuntimeState ¶
func (abe *ActorBackend) GetOrchestrationRuntimeState(ctx context.Context, owi *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error)
GetOrchestrationRuntimeState implements backend.Backend
func (*ActorBackend) GetOrchestrationWorkItem ¶
func (abe *ActorBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error)
GetOrchestrationWorkItem implements backend.Backend
func (*ActorBackend) PurgeOrchestrationState ¶
func (abe *ActorBackend) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error
PurgeOrchestrationState deletes all saved state for the specific orchestration instance.
func (*ActorBackend) RegisterActor ¶
func (abe *ActorBackend) RegisterActor(ctx context.Context) error
func (*ActorBackend) SetActivityTimeout ¶
func (abe *ActorBackend) SetActivityTimeout(timeout time.Duration)
SetActivityTimeout allows configuring a default timeout for activity executions. If the timeout is exceeded, the activity execution will be abandoned and retried.
func (*ActorBackend) SetActorReminderInterval ¶
func (abe *ActorBackend) SetActorReminderInterval(interval time.Duration)
SetActorReminderInterval sets the amount of delay between internal retries for workflow and activity actors. This impacts how long it takes for an operation to restart itself after a timeout or a process failure is encountered while running.
func (*ActorBackend) SetActorRuntime ¶
func (abe *ActorBackend) SetActorRuntime(ctx context.Context, actorRuntime actors.ActorRuntime)
func (*ActorBackend) SetWorkflowTimeout ¶
func (abe *ActorBackend) SetWorkflowTimeout(timeout time.Duration)
SetWorkflowTimeout allows configuring a default timeout for workflow execution steps. If the timeout is exceeded, the workflow execution step will be abandoned and retried. Note that this timeout is for a non-blocking step in the workflow (which is expected to always complete almost immediately) and not for the end-to-end workflow execution.
func (*ActorBackend) Start ¶
func (abe *ActorBackend) Start(ctx context.Context) error
Start implements backend.Backend
func (*ActorBackend) Stop ¶
func (*ActorBackend) Stop(context.Context) error
Stop implements backend.Backend
func (*ActorBackend) String ¶
func (abe *ActorBackend) String() string
String displays the type information
func (*ActorBackend) WaitForActorsReady ¶
func (abe *ActorBackend) WaitForActorsReady(ctx context.Context)
WaitForActorsReady blocks until the actor runtime is set in the object (or until the context is canceled).
type CreateWorkflowInstanceRequest ¶
type CreateWorkflowInstanceRequest struct { Policy *api.OrchestrationIdReusePolicy `json:"policy"` StartEventBytes []byte `json:"startEventBytes"` }