backend

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: MIT Imports: 17 Imported by: 26

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateEvent = errors.New("duplicate event")
View Source
var ErrNoWorkItems = errors.New("no work items were found")
View Source
var ErrNotInitialized = errors.New("backend not initialized")
View Source
var ErrTaskHubExists = errors.New("task hub already exists")
View Source
var ErrTaskHubNotFound = errors.New("task hub not found")
View Source
var ErrWorkItemLockLost = errors.New("lock on work-item was lost")

Functions

func FromRuntimeStatusString

func FromRuntimeStatusString(status string) protos.OrchestrationStatus

func ToRuntimeStatusString

func ToRuntimeStatusString(status protos.OrchestrationStatus) string

Types

type ActivityExecutor

type ActivityExecutor interface {
	ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

type ActivityWorkItem

type ActivityWorkItem struct {
	SequenceNumber int64
	InstanceID     api.InstanceID
	NewEvent       *protos.HistoryEvent
	Result         *protos.HistoryEvent
	LockedBy       string
}

func (*ActivityWorkItem) Description

func (wi *ActivityWorkItem) Description() string

Description implements core.WorkItem

type Backend

type Backend interface {
	// CreateTaskHub creates a new task hub for the current backend. It's up to the backend implementation
	// and its configuration to determine how it gets created.
	//
	// If the task hub for this backend already exists, an error of type ErrTaskHubExists is returned.
	CreateTaskHub(context.Context) error

	// DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend
	// implementation to determine how the task hub data is deleted.
	//
	// If the task hub for this backend doesn't exist, an error of type ErrTaskHubNotFound is returned.
	DeleteTaskHub(context.Context) error

	// Start starts any background processing done by this backend.
	Start(context.Context) error

	// Stop stops any background processing done by this backend.
	Stop(context.Context) error

	// CreateOrchestrationInstance creates a new orchestration instance with a history event that
	// wraps a ExecutionStarted event.
	CreateOrchestrationInstance(context.Context, *protos.HistoryEvent) error

	// AddNewEvent adds a new orchestration event to the specified orchestration instance.
	AddNewOrchestrationEvent(context.Context, api.InstanceID, *protos.HistoryEvent) error

	// GetOrchestrationWorkItem gets a pending work item from the task hub or returns ErrNoOrchWorkItems
	// if there are no pending work items.
	GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error)

	// GetOrchestrationRuntimeState gets the runtime state of an orchestration instance.
	GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error)

	// GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID.
	GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error)

	// CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage.
	//
	// Returns ErrWorkItemLockLost if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
	CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error

	// AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue.
	//
	// This is called if an internal failure happens in the processing of an orchestration work item. It is
	// not called if the orchestration work item is processed successfully (note that an orchestration that
	// completes with a failure is still considered a successfully processed work item).
	AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error

	// GetActivityWorkItem gets a pending activity work item from the task hub or returns ErrNoWorkItems
	// if there are no pending activity work items.
	GetActivityWorkItem(context.Context) (*ActivityWorkItem, error)

	// CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion.
	//
	// Returns ErrWorkItemLockLost if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
	CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error

	// AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances.
	//
	// This is called when an internal failure occurs during activity work-item processing.
	AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error
}

type ExecutionResults

type ExecutionResults struct {
	Response *protos.OrchestratorResponse
	// contains filtered or unexported fields
}

type Executor

type Executor interface {
	ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error)

	ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

func NewGrpcExecutor

func NewGrpcExecutor(grpcServer *grpc.Server, be Backend) Executor

type OrchestrationRuntimeState

type OrchestrationRuntimeState struct {
	CustomStatus *wrapperspb.StringValue
	// contains filtered or unexported fields
}

func NewOrchestrationRuntimeState

func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*protos.HistoryEvent) *OrchestrationRuntimeState

func (*OrchestrationRuntimeState) AddEvent

AddEvent appends a new history event to the orchestration history

func (*OrchestrationRuntimeState) ApplyActions

func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction) (bool, error)

ApplyActions takes a set of actions and updates its internal state, including populating the outbox.

func (*OrchestrationRuntimeState) CompletedTime

func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)

func (*OrchestrationRuntimeState) ContinuedAsNew

func (s *OrchestrationRuntimeState) ContinuedAsNew() bool

func (*OrchestrationRuntimeState) CreatedTime

func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)

func (*OrchestrationRuntimeState) FailureDetails

func (s *OrchestrationRuntimeState) FailureDetails() (*protos.TaskFailureDetails, error)

func (*OrchestrationRuntimeState) Input

func (s *OrchestrationRuntimeState) Input() (string, error)

func (*OrchestrationRuntimeState) InstanceID

func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID

func (*OrchestrationRuntimeState) IsCompleted

func (s *OrchestrationRuntimeState) IsCompleted() bool

func (*OrchestrationRuntimeState) IsValid

func (s *OrchestrationRuntimeState) IsValid() bool

func (*OrchestrationRuntimeState) Name

func (s *OrchestrationRuntimeState) Name() (string, error)

func (*OrchestrationRuntimeState) NewEvents

func (s *OrchestrationRuntimeState) NewEvents() []*protos.HistoryEvent

func (*OrchestrationRuntimeState) OldEvents

func (s *OrchestrationRuntimeState) OldEvents() []*protos.HistoryEvent

func (*OrchestrationRuntimeState) Output

func (s *OrchestrationRuntimeState) Output() (string, error)

func (*OrchestrationRuntimeState) PendingMessages

func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage

func (*OrchestrationRuntimeState) PendingTasks

func (s *OrchestrationRuntimeState) PendingTasks() []*protos.HistoryEvent

func (*OrchestrationRuntimeState) PendingTimers

func (s *OrchestrationRuntimeState) PendingTimers() []*protos.HistoryEvent

func (*OrchestrationRuntimeState) RuntimeStatus

func (*OrchestrationRuntimeState) String

func (s *OrchestrationRuntimeState) String() string

type OrchestrationWorkItem

type OrchestrationWorkItem struct {
	InstanceID api.InstanceID
	NewEvents  []*protos.HistoryEvent
	LockedBy   string
	RetryCount int32
	State      *OrchestrationRuntimeState
}

func (*OrchestrationWorkItem) Description

func (wi *OrchestrationWorkItem) Description() string

func (*OrchestrationWorkItem) GetAbandonDelay

func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration

type OrchestratorExecutor

type OrchestratorExecutor interface {
	ExecuteOrchestrator(
		ctx context.Context,
		iid api.InstanceID,
		oldEvents []*protos.HistoryEvent,
		newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
}

type OrchestratorMessage

type OrchestratorMessage struct {
	HistoryEvent     *protos.HistoryEvent
	TargetInstanceID string
}

type TaskHubWorker

type TaskHubWorker interface {
	// Start starts the backend and the configured internal workers.
	Start(context.Context) error

	// Shutdown stops the backend and all internal workers.
	Shutdown(context.Context) error
}

func NewTaskHubWorker

func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker) TaskHubWorker

type TaskProcessor

type TaskProcessor interface {
	Name() string
	FetchWorkItem(context.Context) (WorkItem, error)
	ProcessWorkItem(context.Context, WorkItem) error
	AbandonWorkItem(context.Context, WorkItem) error
	CompleteWorkItem(context.Context, WorkItem) error
}

type TaskWorker

type TaskWorker interface {
	// Start starts background polling for the activity work items.
	Start(context.Context)

	// ProcessNext attempts to fetch and process a work item. This method returns
	// true if a work item was found and processing started; false otherwise. An
	// error is returned if the context is cancelled.
	ProcessNext(context.Context) (bool, error)

	// StopAndDrain stops the worker and waits for all outstanding work items to finish.
	StopAndDrain()
}

func NewActivityTaskWorker

func NewActivityTaskWorker(be Backend, executor ActivityExecutor) TaskWorker

func NewOrchestrationWorker

func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor) TaskWorker

func NewTaskWorker

func NewTaskWorker(be Backend, p TaskProcessor) TaskWorker

type WorkItem

type WorkItem interface {
	Description() string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL