Documentation ¶
Index ¶
- Variables
- func FromRuntimeStatusString(status string) protos.OrchestrationStatus
- func ToRuntimeStatusString(status protos.OrchestrationStatus) string
- type ActivityExecutor
- type ActivityWorkItem
- type Backend
- type ExecutionResults
- type Executor
- type OrchestrationRuntimeState
- func (s *OrchestrationRuntimeState) AddEvent(e *protos.HistoryEvent) error
- func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction) (bool, error)
- func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
- func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) FailureDetails() (*protos.TaskFailureDetails, error)
- func (s *OrchestrationRuntimeState) Input() (string, error)
- func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
- func (s *OrchestrationRuntimeState) IsCompleted() bool
- func (s *OrchestrationRuntimeState) IsValid() bool
- func (s *OrchestrationRuntimeState) Name() (string, error)
- func (s *OrchestrationRuntimeState) NewEvents() []*protos.HistoryEvent
- func (s *OrchestrationRuntimeState) OldEvents() []*protos.HistoryEvent
- func (s *OrchestrationRuntimeState) Output() (string, error)
- func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
- func (s *OrchestrationRuntimeState) PendingTasks() []*protos.HistoryEvent
- func (s *OrchestrationRuntimeState) PendingTimers() []*protos.HistoryEvent
- func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
- func (s *OrchestrationRuntimeState) String() string
- type OrchestrationWorkItem
- type OrchestratorExecutor
- type OrchestratorMessage
- type TaskHubWorker
- type TaskProcessor
- type TaskWorker
- type WorkItem
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrDuplicateEvent = errors.New("duplicate event")
View Source
var ErrNoWorkItems = errors.New("no work items were found")
View Source
var ErrNotInitialized = errors.New("backend not initialized")
View Source
var ErrTaskHubExists = errors.New("task hub already exists")
View Source
var ErrTaskHubNotFound = errors.New("task hub not found")
View Source
var ErrWorkItemLockLost = errors.New("lock on work-item was lost")
Functions ¶
func FromRuntimeStatusString ¶
func FromRuntimeStatusString(status string) protos.OrchestrationStatus
func ToRuntimeStatusString ¶
func ToRuntimeStatusString(status protos.OrchestrationStatus) string
Types ¶
type ActivityExecutor ¶
type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}
type ActivityWorkItem ¶
type ActivityWorkItem struct { SequenceNumber int64 InstanceID api.InstanceID NewEvent *protos.HistoryEvent Result *protos.HistoryEvent LockedBy string }
func (*ActivityWorkItem) Description ¶
func (wi *ActivityWorkItem) Description() string
Description implements core.WorkItem
type Backend ¶
type Backend interface { // CreateTaskHub creates a new task hub for the current backend. It's up to the backend implementation // and its configuration to determine how it gets created. // // If the task hub for this backend already exists, an error of type ErrTaskHubExists is returned. CreateTaskHub(context.Context) error // DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend // implementation to determine how the task hub data is deleted. // // If the task hub for this backend doesn't exist, an error of type ErrTaskHubNotFound is returned. DeleteTaskHub(context.Context) error // Start starts any background processing done by this backend. Start(context.Context) error // Stop stops any background processing done by this backend. Stop(context.Context) error // CreateOrchestrationInstance creates a new orchestration instance with a history event that // wraps a ExecutionStarted event. CreateOrchestrationInstance(context.Context, *protos.HistoryEvent) error // AddNewEvent adds a new orchestration event to the specified orchestration instance. AddNewOrchestrationEvent(context.Context, api.InstanceID, *protos.HistoryEvent) error // GetOrchestrationWorkItem gets a pending work item from the task hub or returns ErrNoOrchWorkItems // if there are no pending work items. GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error) // GetOrchestrationRuntimeState gets the runtime state of an orchestration instance. GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error) // GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID. GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error) // CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage. // // Returns ErrWorkItemLockLost if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue. // // This is called if an internal failure happens in the processing of an orchestration work item. It is // not called if the orchestration work item is processed successfully (note that an orchestration that // completes with a failure is still considered a successfully processed work item). AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // GetActivityWorkItem gets a pending activity work item from the task hub or returns ErrNoWorkItems // if there are no pending activity work items. GetActivityWorkItem(context.Context) (*ActivityWorkItem, error) // CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion. // // Returns ErrWorkItemLockLost if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error // AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances. // // This is called when an internal failure occurs during activity work-item processing. AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error }
type ExecutionResults ¶
type ExecutionResults struct { Response *protos.OrchestratorResponse // contains filtered or unexported fields }
type Executor ¶
type Executor interface { ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error) }
type OrchestrationRuntimeState ¶
type OrchestrationRuntimeState struct { CustomStatus *wrapperspb.StringValue // contains filtered or unexported fields }
func NewOrchestrationRuntimeState ¶
func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*protos.HistoryEvent) *OrchestrationRuntimeState
func (*OrchestrationRuntimeState) AddEvent ¶
func (s *OrchestrationRuntimeState) AddEvent(e *protos.HistoryEvent) error
AddEvent appends a new history event to the orchestration history
func (*OrchestrationRuntimeState) ApplyActions ¶
func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction) (bool, error)
ApplyActions takes a set of actions and updates its internal state, including populating the outbox.
func (*OrchestrationRuntimeState) CompletedTime ¶
func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
func (*OrchestrationRuntimeState) ContinuedAsNew ¶
func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
func (*OrchestrationRuntimeState) CreatedTime ¶
func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) FailureDetails ¶
func (s *OrchestrationRuntimeState) FailureDetails() (*protos.TaskFailureDetails, error)
func (*OrchestrationRuntimeState) Input ¶
func (s *OrchestrationRuntimeState) Input() (string, error)
func (*OrchestrationRuntimeState) InstanceID ¶
func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
func (*OrchestrationRuntimeState) IsCompleted ¶
func (s *OrchestrationRuntimeState) IsCompleted() bool
func (*OrchestrationRuntimeState) IsValid ¶
func (s *OrchestrationRuntimeState) IsValid() bool
func (*OrchestrationRuntimeState) Name ¶
func (s *OrchestrationRuntimeState) Name() (string, error)
func (*OrchestrationRuntimeState) NewEvents ¶
func (s *OrchestrationRuntimeState) NewEvents() []*protos.HistoryEvent
func (*OrchestrationRuntimeState) OldEvents ¶
func (s *OrchestrationRuntimeState) OldEvents() []*protos.HistoryEvent
func (*OrchestrationRuntimeState) Output ¶
func (s *OrchestrationRuntimeState) Output() (string, error)
func (*OrchestrationRuntimeState) PendingMessages ¶
func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
func (*OrchestrationRuntimeState) PendingTasks ¶
func (s *OrchestrationRuntimeState) PendingTasks() []*protos.HistoryEvent
func (*OrchestrationRuntimeState) PendingTimers ¶
func (s *OrchestrationRuntimeState) PendingTimers() []*protos.HistoryEvent
func (*OrchestrationRuntimeState) RuntimeStatus ¶
func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
func (*OrchestrationRuntimeState) String ¶
func (s *OrchestrationRuntimeState) String() string
type OrchestrationWorkItem ¶
type OrchestrationWorkItem struct { InstanceID api.InstanceID NewEvents []*protos.HistoryEvent LockedBy string RetryCount int32 State *OrchestrationRuntimeState }
func (*OrchestrationWorkItem) Description ¶
func (wi *OrchestrationWorkItem) Description() string
func (*OrchestrationWorkItem) GetAbandonDelay ¶
func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration
type OrchestratorExecutor ¶
type OrchestratorExecutor interface { ExecuteOrchestrator( ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) }
type OrchestratorMessage ¶
type OrchestratorMessage struct { HistoryEvent *protos.HistoryEvent TargetInstanceID string }
type TaskHubWorker ¶
type TaskHubWorker interface { // Start starts the backend and the configured internal workers. Start(context.Context) error // Shutdown stops the backend and all internal workers. Shutdown(context.Context) error }
func NewTaskHubWorker ¶
func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker) TaskHubWorker
type TaskProcessor ¶
type TaskWorker ¶
type TaskWorker interface { // Start starts background polling for the activity work items. Start(context.Context) // ProcessNext attempts to fetch and process a work item. This method returns // true if a work item was found and processing started; false otherwise. An // error is returned if the context is cancelled. ProcessNext(context.Context) (bool, error) // StopAndDrain stops the worker and waits for all outstanding work items to finish. StopAndDrain() }
func NewActivityTaskWorker ¶
func NewActivityTaskWorker(be Backend, executor ActivityExecutor) TaskWorker
func NewOrchestrationWorker ¶
func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor) TaskWorker
func NewTaskWorker ¶
func NewTaskWorker(be Backend, p TaskProcessor) TaskWorker
Source Files ¶
Click to show internal directories.
Click to hide internal directories.