actors

package
v1.14.0-rc.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkflowNameLabelKey = "workflow"
	ActivityNameLabelKey = "activity"
)
View Source
const (
	CallbackChannelProperty = "dapr.callback"

	CreateWorkflowInstanceMethod = "CreateWorkflowInstance"
	GetWorkflowMetadataMethod    = "GetWorkflowMetadata"
	AddWorkflowEventMethod       = "AddWorkflowEvent"
	PurgeWorkflowStateMethod     = "PurgeWorkflowState"
	GetWorkflowStateMethod       = "GetWorkflowState"
)

Variables

View Source
var ErrDuplicateInvocation = errors.New("duplicate invocation")

Functions

func LoadWorkflowState

func LoadWorkflowState(ctx context.Context, actorRuntime actors.Actors, actorID string, config actorsBackendConfig) (*workflowState, error)

func NewActivityActor

func NewActivityActor(scheduler activityScheduler, backendConfig actorsBackendConfig, opts *activityActorOpts) actors.InternalActorFactory

NewActivityActor creates an internal activity actor for executing workflow activity logic.

func NewActorBackend

func NewActorBackend(md wfbe.Metadata, _ logger.Logger) (backend.Backend, error)

func NewActorsBackendConfig

func NewActorsBackendConfig(appID string) actorsBackendConfig

NewActorsBackendConfig creates a new workflow engine configuration

func NewDurableTimer

func NewDurableTimer(bytes []byte, generation uint64) durableTimer

func NewWorkflowActor

func NewWorkflowActor(scheduler workflowScheduler, config actorsBackendConfig, opts *workflowActorOpts) actors.InternalActorFactory

func NewWorkflowState

func NewWorkflowState(config actorsBackendConfig) *workflowState

Types

type ActivityRequest

type ActivityRequest struct {
	HistoryEvent []byte
}

ActivityRequest represents a request by a worklow to invoke an activity.

type ActorBackend

type ActorBackend struct {
	// contains filtered or unexported fields
}

func (*ActorBackend) AbandonActivityWorkItem

func (*ActorBackend) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error

AbandonActivityWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow activity execution pipeline.

func (*ActorBackend) AbandonOrchestrationWorkItem

func (*ActorBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error

AbandonOrchestrationWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow orchestration execution pipeline.

func (*ActorBackend) AddNewOrchestrationEvent

func (abe *ActorBackend) AddNewOrchestrationEvent(ctx context.Context, id api.InstanceID, e *backend.HistoryEvent) error

AddNewOrchestrationEvent implements backend.Backend and sends the event e to the workflow actor identified by id.

func (*ActorBackend) CompleteActivityWorkItem

func (*ActorBackend) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error

CompleteActivityWorkItem implements backend.Backend

func (*ActorBackend) CompleteOrchestrationWorkItem

func (*ActorBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error

CompleteOrchestrationWorkItem implements backend.Backend

func (*ActorBackend) CreateOrchestrationInstance

func (abe *ActorBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error

CreateOrchestrationInstance implements backend.Backend and creates a new workflow instance.

Internally, creating a workflow instance also creates a new actor with the same ID. The create request is saved into the actor's "inbox" and then executed via a reminder thread. If the app is scaled out across multiple replicas, the actor might get assigned to a replicas other than this one.

func (*ActorBackend) CreateTaskHub

func (*ActorBackend) CreateTaskHub(context.Context) error

CreateTaskHub implements backend.Backend

func (*ActorBackend) DeleteTaskHub

func (*ActorBackend) DeleteTaskHub(context.Context) error

DeleteTaskHub implements backend.Backend

func (*ActorBackend) DisableActorCaching

func (abe *ActorBackend) DisableActorCaching(disable bool)

DisableActorCaching turns off the default caching done by the workflow and activity actors. This method is primarily intended to be used for testing to ensure correct behavior when actors are newly activated on nodes, but without requiring the actor to actually go through activation.

func (*ActorBackend) GetActivityWorkItem

func (abe *ActorBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error)

GetActivityWorkItem implements backend.Backend

func (*ActorBackend) GetInternalActorsMap

func (abe *ActorBackend) GetInternalActorsMap() map[string]actors.InternalActorFactory

InternalActors returns a map of internal actors that are used to implement workflows

func (*ActorBackend) GetOrchestrationMetadata

func (abe *ActorBackend) GetOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)

GetOrchestrationMetadata implements backend.Backend

func (*ActorBackend) GetOrchestrationRuntimeState

func (abe *ActorBackend) GetOrchestrationRuntimeState(ctx context.Context, owi *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error)

GetOrchestrationRuntimeState implements backend.Backend

func (*ActorBackend) GetOrchestrationWorkItem

func (abe *ActorBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error)

GetOrchestrationWorkItem implements backend.Backend

func (*ActorBackend) PurgeOrchestrationState

func (abe *ActorBackend) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error

PurgeOrchestrationState deletes all saved state for the specific orchestration instance.

func (*ActorBackend) RegisterActor

func (abe *ActorBackend) RegisterActor(ctx context.Context) error

func (*ActorBackend) SetActivityTimeout

func (abe *ActorBackend) SetActivityTimeout(timeout time.Duration)

SetActivityTimeout allows configuring a default timeout for activity executions. If the timeout is exceeded, the activity execution will be abandoned and retried.

func (*ActorBackend) SetActorReminderInterval

func (abe *ActorBackend) SetActorReminderInterval(interval time.Duration)

SetActorReminderInterval sets the amount of delay between internal retries for workflow and activity actors. This impacts how long it takes for an operation to restart itself after a timeout or a process failure is encountered while running.

func (*ActorBackend) SetActorRuntime

func (abe *ActorBackend) SetActorRuntime(ctx context.Context, actorRuntime actors.ActorRuntime)

func (*ActorBackend) SetWorkflowTimeout

func (abe *ActorBackend) SetWorkflowTimeout(timeout time.Duration)

SetWorkflowTimeout allows configuring a default timeout for workflow execution steps. If the timeout is exceeded, the workflow execution step will be abandoned and retried. Note that this timeout is for a non-blocking step in the workflow (which is expected to always complete almost immediately) and not for the end-to-end workflow execution.

func (*ActorBackend) Start

func (abe *ActorBackend) Start(ctx context.Context) error

Start implements backend.Backend

func (*ActorBackend) Stop

Stop implements backend.Backend

func (*ActorBackend) String

func (abe *ActorBackend) String() string

String displays the type information

func (*ActorBackend) WaitForActorsReady

func (abe *ActorBackend) WaitForActorsReady(ctx context.Context)

WaitForActorsReady blocks until the actor runtime is set in the object (or until the context is canceled).

type CreateWorkflowInstanceRequest

type CreateWorkflowInstanceRequest struct {
	Policy          *api.OrchestrationIdReusePolicy `json:"policy"`
	StartEventBytes []byte                          `json:"startEventBytes"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL