Documentation ¶
Index ¶
- Variables
- func IsDurableTaskGrpcRequest(fullMethodName string) bool
- func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
- func WithOnGetWorkItemsConnectionCallback(callback func(context.Context) error) grpcExecutorOptions
- func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
- type ActivityExecutor
- type ActivityWorkItem
- type Backend
- type ExecutionResults
- type Executor
- type HistoryEvent
- type Logger
- type NewTaskWorkerOptions
- type OrchestrationRuntimeState
- func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
- func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
- func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
- func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
- func (s *OrchestrationRuntimeState) Input() (string, error)
- func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
- func (s *OrchestrationRuntimeState) IsCompleted() bool
- func (s *OrchestrationRuntimeState) IsValid() bool
- func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) Name() (string, error)
- func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) Output() (string, error)
- func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
- func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
- func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
- func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
- func (s *OrchestrationRuntimeState) String() string
- type OrchestrationWorkItem
- type OrchestratorExecutor
- type OrchestratorMessage
- type TaskFailureDetails
- type TaskHubClient
- type TaskHubWorker
- type TaskProcessor
- type TaskWorker
- func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, ...) TaskWorker
- func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, ...) TaskWorker
- func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
- type WorkItem
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskHubExists = errors.New("task hub already exists") ErrTaskHubNotFound = errors.New("task hub not found") ErrNotInitialized = errors.New("backend not initialized") ErrWorkItemLockLost = errors.New("lock on work-item was lost") ErrBackendAlreadyStarted = errors.New("backend is already started") )
var ErrDuplicateEvent = errors.New("duplicate event")
var ErrNoWorkItems = errors.New("no work items were found")
Functions ¶
func IsDurableTaskGrpcRequest ¶ added in v0.1.1
IsDurableTaskGrpcRequest returns true if the specified gRPC method name represents an operation that is compatible with the gRPC executor.
func MarshalHistoryEvent ¶ added in v0.1.1
func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
MarshalHistoryEvent serializes the HistoryEvent into a protobuf byte array.
func WithOnGetWorkItemsConnectionCallback ¶ added in v0.1.1
WithOnGetWorkItemsConnectionCallback allows the caller to get a notification when an external process connects over gRPC and invokes the GetWorkItems operation. This can be useful for doing things like lazily auto-starting the task hub worker only when necessary.
func WithStreamShutdownChannel ¶ added in v0.1.2
func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
Types ¶
type ActivityExecutor ¶
type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}
type ActivityWorkItem ¶
type ActivityWorkItem struct { SequenceNumber int64 InstanceID api.InstanceID NewEvent *HistoryEvent Result *HistoryEvent LockedBy string Properties map[string]interface{} }
func (*ActivityWorkItem) Description ¶
func (wi *ActivityWorkItem) Description() string
Description implements core.WorkItem
type Backend ¶
type Backend interface { // CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent. // // If the task hub for this backend already exists, an error of type [ErrTaskHubExists] is returned. CreateTaskHub(context.Context) error // DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend // implementation to determine how the task hub data is deleted. // // If the task hub for this backend doesn't exist, an error of type [ErrTaskHubNotFound] is returned. DeleteTaskHub(context.Context) error // Start starts any background processing done by this backend. Start(context.Context) error // Stop stops any background processing done by this backend. Stop(context.Context) error // CreateOrchestrationInstance creates a new orchestration instance with a history event that // wraps a ExecutionStarted event. CreateOrchestrationInstance(context.Context, *HistoryEvent) error // AddNewEvent adds a new orchestration event to the specified orchestration instance. AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error // GetOrchestrationWorkItem gets a pending work item from the task hub or returns [ErrNoOrchWorkItems] // if there are no pending work items. GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error) // GetOrchestrationRuntimeState gets the runtime state of an orchestration instance. GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error) // GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID. // // Returns [api.ErrInstanceNotFound] if the orchestration instance doesn't exist. GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error) // CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage. // // Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue. // // This is called if an internal failure happens in the processing of an orchestration work item. It is // not called if the orchestration work item is processed successfully (note that an orchestration that // completes with a failure is still considered a successfully processed work item). AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // GetActivityWorkItem gets a pending activity work item from the task hub or returns [ErrNoWorkItems] // if there are no pending activity work items. GetActivityWorkItem(context.Context) (*ActivityWorkItem, error) // CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion. // // Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error // AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances. // // This is called when an internal failure occurs during activity work-item processing. AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error }
type ExecutionResults ¶
type ExecutionResults struct { Response *protos.OrchestratorResponse // contains filtered or unexported fields }
type Executor ¶
type Executor interface { ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error) }
type HistoryEvent ¶ added in v0.1.1
type HistoryEvent = protos.HistoryEvent
func UnmarshalHistoryEvent ¶ added in v0.1.1
func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error)
UnmarshalHistoryEvent deserializes a HistoryEvent from a protobuf byte array.
type Logger ¶ added in v0.1.1
type Logger interface { // Debug logs a message at level Debug. Debug(v ...any) // Debugf logs a message at level Debug. Debugf(format string, v ...any) // Info logs a message at level Info. Info(v ...any) // Infof logs a message at level Info. Infof(format string, v ...any) // Warn logs a message at level Warn. Warn(v ...any) // Warnf logs a message at level Warn. Warnf(format string, v ...any) // Error logs a message at level Error. Error(v ...any) // Errorf logs a message at level Error. Errorf(format string, v ...any) }
func DefaultLogger ¶ added in v0.1.1
func DefaultLogger() Logger
type NewTaskWorkerOptions ¶ added in v0.1.1
type NewTaskWorkerOptions func(*WorkerOptions)
func WithMaxParallelism ¶ added in v0.1.1
func WithMaxParallelism(n int32) NewTaskWorkerOptions
type OrchestrationRuntimeState ¶
type OrchestrationRuntimeState struct { CustomStatus *wrapperspb.StringValue // contains filtered or unexported fields }
func NewOrchestrationRuntimeState ¶
func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*HistoryEvent) *OrchestrationRuntimeState
func (*OrchestrationRuntimeState) AddEvent ¶
func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
AddEvent appends a new history event to the orchestration history
func (*OrchestrationRuntimeState) ApplyActions ¶
func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
ApplyActions takes a set of actions and updates its internal state, including populating the outbox.
func (*OrchestrationRuntimeState) CompletedTime ¶
func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
func (*OrchestrationRuntimeState) ContinuedAsNew ¶
func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
func (*OrchestrationRuntimeState) CreatedTime ¶
func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) FailureDetails ¶
func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
func (*OrchestrationRuntimeState) Input ¶
func (s *OrchestrationRuntimeState) Input() (string, error)
func (*OrchestrationRuntimeState) InstanceID ¶
func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
func (*OrchestrationRuntimeState) IsCompleted ¶
func (s *OrchestrationRuntimeState) IsCompleted() bool
func (*OrchestrationRuntimeState) IsValid ¶
func (s *OrchestrationRuntimeState) IsValid() bool
func (*OrchestrationRuntimeState) LastUpdatedTime ¶ added in v0.1.1
func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) Name ¶
func (s *OrchestrationRuntimeState) Name() (string, error)
func (*OrchestrationRuntimeState) NewEvents ¶
func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) OldEvents ¶
func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) Output ¶
func (s *OrchestrationRuntimeState) Output() (string, error)
func (*OrchestrationRuntimeState) PendingMessages ¶
func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
func (*OrchestrationRuntimeState) PendingTasks ¶
func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
func (*OrchestrationRuntimeState) PendingTimers ¶
func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
func (*OrchestrationRuntimeState) RuntimeStatus ¶
func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
func (*OrchestrationRuntimeState) String ¶
func (s *OrchestrationRuntimeState) String() string
type OrchestrationWorkItem ¶
type OrchestrationWorkItem struct { InstanceID api.InstanceID NewEvents []*HistoryEvent LockedBy string RetryCount int32 State *OrchestrationRuntimeState Properties map[string]interface{} }
func (*OrchestrationWorkItem) Description ¶
func (wi *OrchestrationWorkItem) Description() string
func (*OrchestrationWorkItem) GetAbandonDelay ¶
func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration
type OrchestratorExecutor ¶
type OrchestratorExecutor interface { ExecuteOrchestrator( ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) }
type OrchestratorMessage ¶
type OrchestratorMessage struct { HistoryEvent *HistoryEvent TargetInstanceID string }
type TaskFailureDetails ¶ added in v0.1.1
type TaskFailureDetails = protos.TaskFailureDetails
type TaskHubClient ¶ added in v0.1.1
type TaskHubClient interface { ScheduleNewOrchestration(ctx context.Context, orchestrator interface{}, opts ...api.NewOrchestrationOptions) (api.InstanceID, error) FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) WaitForOrchestrationStart(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) TerminateOrchestration(ctx context.Context, id api.InstanceID, reason string) error RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, data any) error }
func NewTaskHubClient ¶ added in v0.1.1
func NewTaskHubClient(be Backend) TaskHubClient
type TaskHubWorker ¶
type TaskHubWorker interface { // Start starts the backend and the configured internal workers. Start(context.Context) error // Shutdown stops the backend and all internal workers. Shutdown(context.Context) error }
func NewTaskHubWorker ¶
func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker, logger Logger) TaskHubWorker
type TaskProcessor ¶
type TaskWorker ¶
type TaskWorker interface { // Start starts background polling for the activity work items. Start(context.Context) // ProcessNext attempts to fetch and process a work item. This method returns // true if a work item was found and processing started; false otherwise. An // error is returned if the context is cancelled. ProcessNext(context.Context) (bool, error) // StopAndDrain stops the worker and waits for all outstanding work items to finish. StopAndDrain() }
func NewActivityTaskWorker ¶
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewOrchestrationWorker ¶
func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewTaskWorker ¶
func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
type WorkerOptions ¶ added in v0.1.1
type WorkerOptions struct {
MaxParallelWorkItems int32
}
func NewWorkerOptions ¶ added in v0.1.1
func NewWorkerOptions() *WorkerOptions