backend

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskHubExists         = errors.New("task hub already exists")
	ErrTaskHubNotFound       = errors.New("task hub not found")
	ErrNotInitialized        = errors.New("backend not initialized")
	ErrWorkItemLockLost      = errors.New("lock on work-item was lost")
	ErrBackendAlreadyStarted = errors.New("backend is already started")
)
View Source
var ErrDuplicateEvent = errors.New("duplicate event")
View Source
var ErrNoWorkItems = errors.New("no work items were found")

Functions

func IsDurableTaskGrpcRequest

func IsDurableTaskGrpcRequest(fullMethodName string) bool

IsDurableTaskGrpcRequest returns true if the specified gRPC method name represents an operation that is compatible with the gRPC executor.

func MarshalHistoryEvent

func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)

MarshalHistoryEvent serializes the HistoryEvent into a protobuf byte array.

func WithOnGetWorkItemsConnectionCallback

func WithOnGetWorkItemsConnectionCallback(callback func(context.Context) error) grpcExecutorOptions

WithOnGetWorkItemsConnectionCallback allows the caller to get a notification when an external process connects over gRPC and invokes the GetWorkItems operation. This can be useful for doing things like lazily auto-starting the task hub worker only when necessary.

func WithStreamShutdownChannel

func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions

Types

type ActivityExecutor

type ActivityExecutor interface {
	ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

type ActivityWorkItem

type ActivityWorkItem struct {
	SequenceNumber int64
	InstanceID     api.InstanceID
	NewEvent       *HistoryEvent
	Result         *HistoryEvent
	LockedBy       string
	Properties     map[string]interface{}
}

func (ActivityWorkItem) IsWorkItem

func (wi ActivityWorkItem) IsWorkItem() bool

IsWorkItem implements core.WorkItem

func (ActivityWorkItem) String

func (wi ActivityWorkItem) String() string

String implements core.WorkItem and fmt.Stringer

type Backend

type Backend interface {
	// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
	//
	// If the task hub for this backend already exists, an error of type [ErrTaskHubExists] is returned.
	CreateTaskHub(context.Context) error

	// DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend
	// implementation to determine how the task hub data is deleted.
	//
	// If the task hub for this backend doesn't exist, an error of type [ErrTaskHubNotFound] is returned.
	DeleteTaskHub(context.Context) error

	// Start starts any background processing done by this backend.
	Start(context.Context) error

	// Stop stops any background processing done by this backend.
	Stop(context.Context) error

	// CreateOrchestrationInstance creates a new orchestration instance with a history event that
	// wraps a ExecutionStarted event.
	CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error

	// AddNewEvent adds a new orchestration event to the specified orchestration instance.
	AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error

	// GetOrchestrationWorkItem gets a pending work item from the task hub or returns [ErrNoOrchWorkItems]
	// if there are no pending work items.
	GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error)

	// GetOrchestrationRuntimeState gets the runtime state of an orchestration instance.
	GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error)

	// GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID.
	//
	// Returns [api.ErrInstanceNotFound] if the orchestration instance doesn't exist.
	GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error)

	// CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage.
	//
	// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
	CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error

	// AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue.
	//
	// This is called if an internal failure happens in the processing of an orchestration work item. It is
	// not called if the orchestration work item is processed successfully (note that an orchestration that
	// completes with a failure is still considered a successfully processed work item).
	AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error

	// GetActivityWorkItem gets a pending activity work item from the task hub or returns [ErrNoWorkItems]
	// if there are no pending activity work items.
	GetActivityWorkItem(context.Context) (*ActivityWorkItem, error)

	// CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion.
	//
	// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
	CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error

	// AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances.
	//
	// This is called when an internal failure occurs during activity work-item processing.
	AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error

	// PurgeOrchestrationState deletes all saved state for the specified orchestration instance.
	//
	// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
	// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
	PurgeOrchestrationState(context.Context, api.InstanceID) error
}

type ExecutionResults

type ExecutionResults struct {
	Response *protos.OrchestratorResponse
	// contains filtered or unexported fields
}

type Executor

type Executor interface {
	ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
	ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
	Shutdown(ctx context.Context) error
}

func NewGrpcExecutor

func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar))

NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.

type HistoryEvent

type HistoryEvent = protos.HistoryEvent

func UnmarshalHistoryEvent

func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error)

UnmarshalHistoryEvent deserializes a HistoryEvent from a protobuf byte array.

type Logger

type Logger interface {
	// Debug logs a message at level Debug.
	Debug(v ...any)
	// Debugf logs a message at level Debug.
	Debugf(format string, v ...any)
	// Info logs a message at level Info.
	Info(v ...any)
	// Infof logs a message at level Info.
	Infof(format string, v ...any)
	// Warn logs a message at level Warn.
	Warn(v ...any)
	// Warnf logs a message at level Warn.
	Warnf(format string, v ...any)
	// Error logs a message at level Error.
	Error(v ...any)
	// Errorf logs a message at level Error.
	Errorf(format string, v ...any)
}

func DefaultLogger

func DefaultLogger() Logger

type NewTaskWorkerOptions

type NewTaskWorkerOptions func(*WorkerOptions)

func WithMaxParallelism

func WithMaxParallelism(n int32) NewTaskWorkerOptions

type OrchestrationRuntimeState

type OrchestrationRuntimeState struct {
	CustomStatus *wrapperspb.StringValue
	// contains filtered or unexported fields
}

func NewOrchestrationRuntimeState

func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*HistoryEvent) *OrchestrationRuntimeState

func (*OrchestrationRuntimeState) AddEvent

AddEvent appends a new history event to the orchestration history

func (*OrchestrationRuntimeState) ApplyActions

func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)

ApplyActions takes a set of actions and updates its internal state, including populating the outbox.

func (*OrchestrationRuntimeState) CompletedTime

func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)

func (*OrchestrationRuntimeState) ContinuedAsNew

func (s *OrchestrationRuntimeState) ContinuedAsNew() bool

func (*OrchestrationRuntimeState) CreatedTime

func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)

func (*OrchestrationRuntimeState) FailureDetails

func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)

func (*OrchestrationRuntimeState) Input

func (s *OrchestrationRuntimeState) Input() (string, error)

func (*OrchestrationRuntimeState) InstanceID

func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID

func (*OrchestrationRuntimeState) IsCompleted

func (s *OrchestrationRuntimeState) IsCompleted() bool

func (*OrchestrationRuntimeState) IsValid

func (s *OrchestrationRuntimeState) IsValid() bool

func (*OrchestrationRuntimeState) LastUpdatedTime

func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)

func (*OrchestrationRuntimeState) Name

func (s *OrchestrationRuntimeState) Name() (string, error)

func (*OrchestrationRuntimeState) NewEvents

func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent

func (*OrchestrationRuntimeState) OldEvents

func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent

func (*OrchestrationRuntimeState) Output

func (s *OrchestrationRuntimeState) Output() (string, error)

func (*OrchestrationRuntimeState) PendingMessages

func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage

func (*OrchestrationRuntimeState) PendingTasks

func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent

func (*OrchestrationRuntimeState) PendingTimers

func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent

func (*OrchestrationRuntimeState) RuntimeStatus

func (*OrchestrationRuntimeState) String

func (s *OrchestrationRuntimeState) String() string

type OrchestrationWorkItem

type OrchestrationWorkItem struct {
	InstanceID api.InstanceID
	NewEvents  []*HistoryEvent
	LockedBy   string
	RetryCount int32
	State      *OrchestrationRuntimeState
	Properties map[string]interface{}
}

func (*OrchestrationWorkItem) GetAbandonDelay

func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration

func (OrchestrationWorkItem) IsWorkItem

func (wi OrchestrationWorkItem) IsWorkItem() bool

IsWorkItem implements core.WorkItem

func (OrchestrationWorkItem) String

func (wi OrchestrationWorkItem) String() string

String implements core.WorkItem and fmt.Stringer

type OrchestratorExecutor

type OrchestratorExecutor interface {
	ExecuteOrchestrator(
		ctx context.Context,
		iid api.InstanceID,
		oldEvents []*protos.HistoryEvent,
		newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
}

type OrchestratorMessage

type OrchestratorMessage struct {
	HistoryEvent     *HistoryEvent
	TargetInstanceID string
}

type TaskFailureDetails

type TaskFailureDetails = protos.TaskFailureDetails

type TaskHubClient

type TaskHubClient interface {
	ScheduleNewOrchestration(ctx context.Context, orchestrator interface{}, opts ...api.NewOrchestrationOptions) (api.InstanceID, error)
	FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
	WaitForOrchestrationStart(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
	WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
	TerminateOrchestration(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error
	RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error
	SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error
	ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error
	PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error
}

func NewTaskHubClient

func NewTaskHubClient(be Backend) TaskHubClient

type TaskHubWorker

type TaskHubWorker interface {
	// Start starts the backend and the configured internal workers.
	Start(context.Context) error

	// Shutdown stops the backend and all internal workers.
	Shutdown(context.Context) error
}

func NewTaskHubWorker

func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker, logger Logger) TaskHubWorker

type TaskProcessor

type TaskProcessor interface {
	Name() string
	FetchWorkItem(context.Context) (WorkItem, error)
	ProcessWorkItem(context.Context, WorkItem) error
	AbandonWorkItem(context.Context, WorkItem) error
	CompleteWorkItem(context.Context, WorkItem) error
}

type TaskWorker

type TaskWorker interface {
	// Start starts background polling for the activity work items.
	Start(context.Context)

	// ProcessNext attempts to fetch and process a work item. This method returns
	// true if a work item was found and processing started; false otherwise. An
	// error is returned if the context is cancelled.
	ProcessNext(context.Context) (bool, error)

	// StopAndDrain stops the worker and waits for all outstanding work items to finish.
	StopAndDrain()
}

func NewActivityTaskWorker

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker

func NewOrchestrationWorker

func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker

func NewTaskWorker

func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker

type WorkItem

type WorkItem interface {
	fmt.Stringer
	IsWorkItem() bool
}

type WorkerOptions

type WorkerOptions struct {
	MaxParallelWorkItems int32
}

func NewWorkerOptions

func NewWorkerOptions() *WorkerOptions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL