Documentation ¶
Index ¶
- Variables
- func IsDurableTaskGrpcRequest(fullMethodName string) bool
- func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
- func WithOnGetWorkItemsConnectionCallback(callback func(context.Context) error) grpcExecutorOptions
- func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
- type ActivityExecutor
- type ActivityWorkItem
- type Backend
- type ExecutionResults
- type Executor
- type HistoryEvent
- type Logger
- type NewTaskWorkerOptions
- type OrchestrationIdReusePolicyOptions
- type OrchestrationRuntimeState
- func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
- func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
- func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
- func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
- func (s *OrchestrationRuntimeState) Input() (string, error)
- func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
- func (s *OrchestrationRuntimeState) IsCompleted() bool
- func (s *OrchestrationRuntimeState) IsValid() bool
- func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) Name() (string, error)
- func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) Output() (string, error)
- func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
- func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
- func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
- func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
- func (s *OrchestrationRuntimeState) String() string
- type OrchestrationWorkItem
- type OrchestratorExecutor
- type OrchestratorMessage
- type TaskFailureDetails
- type TaskHubClient
- type TaskHubWorker
- type TaskProcessor
- type TaskWorker
- type WorkItem
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskHubExists = errors.New("task hub already exists") ErrTaskHubNotFound = errors.New("task hub not found") ErrNotInitialized = errors.New("backend not initialized") ErrWorkItemLockLost = errors.New("lock on work-item was lost") ErrBackendAlreadyStarted = errors.New("backend is already started") )
var ErrDuplicateEvent = errors.New("duplicate event")
var ErrNoWorkItems = errors.New("no work items were found")
Functions ¶
func IsDurableTaskGrpcRequest ¶ added in v0.1.1
IsDurableTaskGrpcRequest returns true if the specified gRPC method name represents an operation that is compatible with the gRPC executor.
func MarshalHistoryEvent ¶ added in v0.1.1
func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
MarshalHistoryEvent serializes the HistoryEvent into a protobuf byte array.
func WithOnGetWorkItemsConnectionCallback ¶ added in v0.1.1
WithOnGetWorkItemsConnectionCallback allows the caller to get a notification when an external process connects over gRPC and invokes the GetWorkItems operation. This can be useful for doing things like lazily auto-starting the task hub worker only when necessary.
func WithStreamShutdownChannel ¶ added in v0.1.2
func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
Types ¶
type ActivityExecutor ¶
type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}
type ActivityWorkItem ¶
type ActivityWorkItem struct { SequenceNumber int64 InstanceID api.InstanceID NewEvent *HistoryEvent Result *HistoryEvent LockedBy string Properties map[string]interface{} }
func (ActivityWorkItem) IsWorkItem ¶ added in v0.5.0
func (wi ActivityWorkItem) IsWorkItem() bool
IsWorkItem implements core.WorkItem
func (ActivityWorkItem) String ¶ added in v0.5.0
func (wi ActivityWorkItem) String() string
String implements core.WorkItem and fmt.Stringer
type Backend ¶
type Backend interface { // CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent. // // If the task hub for this backend already exists, an error of type [ErrTaskHubExists] is returned. CreateTaskHub(context.Context) error // DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend // implementation to determine how the task hub data is deleted. // // If the task hub for this backend doesn't exist, an error of type [ErrTaskHubNotFound] is returned. DeleteTaskHub(context.Context) error // Start starts any background processing done by this backend. Start(context.Context) error // Stop stops any background processing done by this backend. Stop(context.Context) error // CreateOrchestrationInstance creates a new orchestration instance with a history event that // wraps a ExecutionStarted event. CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error // AddNewEvent adds a new orchestration event to the specified orchestration instance. AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error // GetOrchestrationWorkItem gets a pending work item from the task hub or returns [ErrNoOrchWorkItems] // if there are no pending work items. GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error) // GetOrchestrationRuntimeState gets the runtime state of an orchestration instance. GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error) // GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID. // // Returns [api.ErrInstanceNotFound] if the orchestration instance doesn't exist. GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error) // CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage. // // Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue. // // This is called if an internal failure happens in the processing of an orchestration work item. It is // not called if the orchestration work item is processed successfully (note that an orchestration that // completes with a failure is still considered a successfully processed work item). AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error // GetActivityWorkItem gets a pending activity work item from the task hub or returns [ErrNoWorkItems] // if there are no pending activity work items. GetActivityWorkItem(context.Context) (*ActivityWorkItem, error) // CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion. // // Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain). CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error // AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances. // // This is called when an internal failure occurs during activity work-item processing. AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error // PurgeOrchestrationState deletes all saved state for the specified orchestration instance. // // [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist. // [api.ErrNotCompleted] is returned if the specified orchestration instance is still running. PurgeOrchestrationState(context.Context, api.InstanceID) error }
type ExecutionResults ¶
type ExecutionResults struct { Response *protos.OrchestratorResponse // contains filtered or unexported fields }
type Executor ¶
type Executor interface { ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error) Shutdown(ctx context.Context) error }
func NewGrpcExecutor ¶
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar))
NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
type HistoryEvent ¶ added in v0.1.1
type HistoryEvent = protos.HistoryEvent
func UnmarshalHistoryEvent ¶ added in v0.1.1
func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error)
UnmarshalHistoryEvent deserializes a HistoryEvent from a protobuf byte array.
type Logger ¶ added in v0.1.1
type Logger interface { // Debug logs a message at level Debug. Debug(v ...any) // Debugf logs a message at level Debug. Debugf(format string, v ...any) // Info logs a message at level Info. Info(v ...any) // Infof logs a message at level Info. Infof(format string, v ...any) // Warn logs a message at level Warn. Warn(v ...any) // Warnf logs a message at level Warn. Warnf(format string, v ...any) // Error logs a message at level Error. Error(v ...any) // Errorf logs a message at level Error. Errorf(format string, v ...any) }
func DefaultLogger ¶ added in v0.1.1
func DefaultLogger() Logger
type NewTaskWorkerOptions ¶ added in v0.1.1
type NewTaskWorkerOptions func(*WorkerOptions)
func WithMaxParallelism ¶ added in v0.1.1
func WithMaxParallelism(n int32) NewTaskWorkerOptions
type OrchestrationIdReusePolicyOptions ¶ added in v0.3.2
type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error
func WithOrchestrationIdReusePolicy ¶ added in v0.3.2
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions
type OrchestrationRuntimeState ¶
type OrchestrationRuntimeState struct { CustomStatus *wrapperspb.StringValue // contains filtered or unexported fields }
func NewOrchestrationRuntimeState ¶
func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*HistoryEvent) *OrchestrationRuntimeState
func (*OrchestrationRuntimeState) AddEvent ¶
func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
AddEvent appends a new history event to the orchestration history
func (*OrchestrationRuntimeState) ApplyActions ¶
func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
ApplyActions takes a set of actions and updates its internal state, including populating the outbox.
func (*OrchestrationRuntimeState) CompletedTime ¶
func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
func (*OrchestrationRuntimeState) ContinuedAsNew ¶
func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
func (*OrchestrationRuntimeState) CreatedTime ¶
func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) FailureDetails ¶
func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
func (*OrchestrationRuntimeState) Input ¶
func (s *OrchestrationRuntimeState) Input() (string, error)
func (*OrchestrationRuntimeState) InstanceID ¶
func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
func (*OrchestrationRuntimeState) IsCompleted ¶
func (s *OrchestrationRuntimeState) IsCompleted() bool
func (*OrchestrationRuntimeState) IsValid ¶
func (s *OrchestrationRuntimeState) IsValid() bool
func (*OrchestrationRuntimeState) LastUpdatedTime ¶ added in v0.1.1
func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) Name ¶
func (s *OrchestrationRuntimeState) Name() (string, error)
func (*OrchestrationRuntimeState) NewEvents ¶
func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) OldEvents ¶
func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) Output ¶
func (s *OrchestrationRuntimeState) Output() (string, error)
func (*OrchestrationRuntimeState) PendingMessages ¶
func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
func (*OrchestrationRuntimeState) PendingTasks ¶
func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
func (*OrchestrationRuntimeState) PendingTimers ¶
func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
func (*OrchestrationRuntimeState) RuntimeStatus ¶
func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
func (*OrchestrationRuntimeState) String ¶
func (s *OrchestrationRuntimeState) String() string
type OrchestrationWorkItem ¶
type OrchestrationWorkItem struct { InstanceID api.InstanceID NewEvents []*HistoryEvent LockedBy string RetryCount int32 State *OrchestrationRuntimeState Properties map[string]interface{} }
func (*OrchestrationWorkItem) GetAbandonDelay ¶
func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration
func (OrchestrationWorkItem) IsWorkItem ¶ added in v0.5.0
func (wi OrchestrationWorkItem) IsWorkItem() bool
IsWorkItem implements core.WorkItem
func (OrchestrationWorkItem) String ¶ added in v0.5.0
func (wi OrchestrationWorkItem) String() string
String implements core.WorkItem and fmt.Stringer
type OrchestratorExecutor ¶
type OrchestratorExecutor interface { ExecuteOrchestrator( ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) }
type OrchestratorMessage ¶
type OrchestratorMessage struct { HistoryEvent *HistoryEvent TargetInstanceID string }
type TaskFailureDetails ¶ added in v0.1.1
type TaskFailureDetails = protos.TaskFailureDetails
type TaskHubClient ¶ added in v0.1.1
type TaskHubClient interface { ScheduleNewOrchestration(ctx context.Context, orchestrator interface{}, opts ...api.NewOrchestrationOptions) (api.InstanceID, error) FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) WaitForOrchestrationStart(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) TerminateOrchestration(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error }
func NewTaskHubClient ¶ added in v0.1.1
func NewTaskHubClient(be Backend) TaskHubClient
type TaskHubWorker ¶
type TaskHubWorker interface { // Start starts the backend and the configured internal workers. Start(context.Context) error // Shutdown stops the backend and all internal workers. Shutdown(context.Context) error }
func NewTaskHubWorker ¶
func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker, logger Logger) TaskHubWorker
type TaskProcessor ¶
type TaskWorker ¶
type TaskWorker interface { // Start starts background polling for the activity work items. Start(context.Context) // ProcessNext attempts to fetch and process a work item. This method returns // true if a work item was found and processing started; false otherwise. An // error is returned if the context is cancelled. ProcessNext(context.Context) (bool, error) // StopAndDrain stops the worker and waits for all outstanding work items to finish. StopAndDrain() }
func NewActivityTaskWorker ¶
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewOrchestrationWorker ¶
func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewTaskWorker ¶
func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
type WorkerOptions ¶ added in v0.1.1
type WorkerOptions struct {
MaxParallelWorkItems int32
}
func NewWorkerOptions ¶ added in v0.1.1
func NewWorkerOptions() *WorkerOptions