Documentation ¶
Index ¶
- Constants
- Variables
- func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, ...) (*hsm.Node, error)
- func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
- func CompletionHandler(ctx context.Context, env hsm.Environment, ref hsm.Ref, ...) error
- func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
- func RegisterEventDefinitions(reg *hsm.Registry) error
- func RegisterExecutor(registry *hsm.Registry, options ActiveExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type ActiveExecutorOptions
- type AttemptFailure
- type BackoffTask
- type BackoffTaskSerializer
- type CancelRequestedEventDefinition
- type Cancelation
- type CancelationBackoffTask
- type CancelationBackoffTaskSerializer
- type CancelationTask
- type CancelationTaskSerializer
- type CanceledEventDefinition
- type ClientProvider
- type CompletedEventDefinition
- type Config
- type EventAttemptFailed
- type EventCancelationAttemptFailed
- type EventCancelationFailed
- type EventCancelationRescheduled
- type EventCancelationScheduled
- type EventCancelationSucceeded
- type EventCanceled
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type FailedEventDefinition
- type InvocationTask
- type InvocationTaskSerializer
- type LimitedReadCloser
- type NexusTransportProvider
- type Operation
- func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)
- func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
- func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)
- func (o Operation) SetState(state enumsspb.NexusOperationState)
- func (o Operation) State() enumsspb.NexusOperationState
- type ResponseSizeLimiter
- type ScheduledEventDefinition
- type StartedEventDefinition
- type TimedOutEventDefinition
- type TimeoutTask
- type TimeoutTaskSerializer
Constants ¶
const Enabled = dynamicconfig.Key("component.nexusoperations.enabled")
Enabled toggles accepting of API requests and workflow commands that create or modify Nexus operations.
const MaxConcurrentOperations = dynamicconfig.Key("component.nexusoperations.limit.operation.concurrency")
MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution. Once the limit is reached, ScheduleNexusOperation commands will be rejected.
const MaxOperationNameLength = dynamicconfig.Key("component.nexusoperations.limit.operation.name.length")
MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name. ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected. Uses Go's len() function to determine the length.
const RequestTimeout = dynamicconfig.Key("component.nexusoperations.request.timeout")
RequestTimeout is the timeout for making a single nexus start or cancel request.
Variables ¶
var ( // OperationMachineType is a unique type identifier for the Operation state machine. OperationMachineType = hsm.MachineType{ ID: 3, Name: "nexusoperations.Operation", } // CancelationMachineType is a unique type identifier for the Cancelation state machine. CancelationMachineType = hsm.MachineType{ ID: 4, Name: "nexusoperations.Cancelation", } // CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine. CancelationMachineKey = hsm.Key{Type: CancelationMachineType.ID, ID: ""} )
var ( TaskTypeTimeout = hsm.TaskType{ ID: 3, Name: "nexusoperations.Timeout", } TaskTypeInvocation = hsm.TaskType{ ID: 4, Name: "nexusoperations.Invocation", } TaskTypeBackoff = hsm.TaskType{ ID: 5, Name: "nexusoperations.Backoff", } TaskTypeCancelation = hsm.TaskType{ ID: 6, Name: "nexusoperations.Cancelation", } TaskTypeCancelationBackoff = hsm.TaskType{ ID: 7, Name: "nexusoperations.CancelationBackoff", } )
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
var Module = fx.Module( "component.nexusoperations", fx.Provide(ConfigProvider), fx.Provide(ClientProviderFactory), fx.Provide(DefaultNexusTransportProvider), fx.Provide(CallbackTokenGeneratorProvider), fx.Invoke(RegisterStateMachines), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterEventDefinitions), fx.Invoke(RegisterExecutor), )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(op.Attempt)) nextAttemptScheduleTime := event.Time.Add(nextDelay) op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) op.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return op.output(event.Node) }, )
var TransitionCancelationAttemptFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(c.Attempt)) nextAttemptScheduleTime := event.Time.Add(nextDelay) c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) c.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return c.output(event.Node) }, )
var TransitionCancelationFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) c.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return c.output(event.Node) }, )
var TransitionCancelationRescheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(c Cancelation, event EventCancelationRescheduled) (hsm.TransitionOutput, error) { c.NextAttemptScheduleTime = nil return c.output(event.Node) }, )
var TransitionCancelationSucceeded = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, func(c Cancelation, event EventCancelationSucceeded) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) return c.output(event.Node) }, )
var TransitionCanceled = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_CANCELED, func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) { if event.AttemptFailure != nil { op.recordAttempt(event.AttemptFailure.Time) op.LastAttemptFailure = &failurepb.Failure{ Message: event.AttemptFailure.Err.Error(), FailureInfo: &failurepb.Failure_CanceledFailureInfo{ CanceledFailureInfo: &failurepb.CanceledFailureInfo{}, }, } } return op.output(event.Node) }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_FAILED, func(op Operation, event EventFailed) (hsm.TransitionOutput, error) { if event.AttemptFailure != nil { op.recordAttempt(event.AttemptFailure.Time) op.LastAttemptFailure = &failurepb.Failure{ Message: event.AttemptFailure.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } } return op.output(event.Node) }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) { op.NextAttemptScheduleTime = nil return op.output(event.Node) }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) { return op.output(event.Node) }, )
var TransitionStarted = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_STARTED, func(op Operation, event EventStarted) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) op.OperationId = event.Attributes.OperationId return op.output(event.Node) }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED, func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) { if event.AttemptTime != nil { op.recordAttempt(*event.AttemptTime) } return op.output(event.Node) }, )
var TransitionTimedOut = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) { return op.output(event.Node) }, )
var TranstionCancelationScheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(op Cancelation, event EventCancelationScheduled) (hsm.TransitionOutput, error) { op.RequestedTime = timestamppb.New(event.Time) return op.output(event.Node) }, )
Functions ¶
func AddChild ¶
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)
AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func CallbackTokenGeneratorProvider ¶
func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
func CompletionHandler ¶
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
MachineCollection creates a new typed [statemachines.Collection] for operations.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options ActiveExecutorOptions, ) error
func RegisterStateMachines ¶
func RegisterTaskSerializers ¶
Types ¶
type ActiveExecutorOptions ¶
type ActiveExecutorOptions struct { fx.In Config *Config NamespaceRegistry namespace.Registry CallbackTokenGenerator *commonnexus.CallbackTokenGenerator ClientProvider ClientProvider }
type AttemptFailure ¶
AttemptFailure carries failure information of an invocation attempt.
type BackoffTask ¶
func (BackoffTask) Concurrent ¶
func (BackoffTask) Concurrent() bool
func (BackoffTask) Kind ¶
func (t BackoffTask) Kind() hsm.TaskKind
func (BackoffTask) Type ¶
func (BackoffTask) Type() hsm.TaskType
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
type CancelRequestedEventDefinition ¶
type CancelRequestedEventDefinition struct{}
func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger ¶
func (n CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestedEventDefinition) Type ¶
func (n CancelRequestedEventDefinition) Type() enumspb.EventType
type Cancelation ¶
type Cancelation struct {
*persistencespb.NexusOperationCancellationInfo
}
Cancelation state machine for canceling an operation.
func (Cancelation) RegenerateTasks ¶
func (Cancelation) SetState ¶
func (c Cancelation) SetState(state enumspb.NexusOperationCancellationState)
func (Cancelation) State ¶
func (c Cancelation) State() enumspb.NexusOperationCancellationState
type CancelationBackoffTask ¶
func (CancelationBackoffTask) Concurrent ¶
func (CancelationBackoffTask) Concurrent() bool
func (CancelationBackoffTask) Kind ¶
func (t CancelationBackoffTask) Kind() hsm.TaskKind
func (CancelationBackoffTask) Type ¶
func (CancelationBackoffTask) Type() hsm.TaskType
type CancelationBackoffTaskSerializer ¶
type CancelationBackoffTaskSerializer struct{}
func (CancelationBackoffTaskSerializer) Deserialize ¶
type CancelationTask ¶
type CancelationTask struct {
Destination string
}
func (CancelationTask) Concurrent ¶
func (CancelationTask) Concurrent() bool
func (CancelationTask) Kind ¶
func (t CancelationTask) Kind() hsm.TaskKind
func (CancelationTask) Type ¶
func (CancelationTask) Type() hsm.TaskType
type CancelationTaskSerializer ¶
type CancelationTaskSerializer struct{}
func (CancelationTaskSerializer) Deserialize ¶
type CanceledEventDefinition ¶
type CanceledEventDefinition struct{}
func (CanceledEventDefinition) IsWorkflowTaskTrigger ¶
func (n CanceledEventDefinition) IsWorkflowTaskTrigger() bool
func (CanceledEventDefinition) Type ¶
func (n CanceledEventDefinition) Type() enumspb.EventType
type ClientProvider ¶
type ClientProvider func(queues.NamespaceIDAndDestination, *nexuspb.OutgoingServiceSpec) (*nexus.Client, error)
func ClientProviderFactory ¶
func ClientProviderFactory(namespaceRegistry namespace.Registry, httpTransportProvider NexusTransportProvider) ClientProvider
type CompletedEventDefinition ¶
type CompletedEventDefinition struct{}
func (CompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (n CompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CompletedEventDefinition) Type ¶
func (n CompletedEventDefinition) Type() enumspb.EventType
type Config ¶
type Config struct { Enabled dynamicconfig.BoolPropertyFnWithNamespaceFilter RequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter }
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct { AttemptFailure Node *hsm.Node }
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCancelationAttemptFailed ¶
EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.
type EventCancelationFailed ¶
EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.
type EventCancelationRescheduled ¶
EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.
type EventCancelationScheduled ¶
EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancelationSucceeded ¶
EventCancelationSucceeded is triggered when a cancelation attempt succeeds.
type EventCanceled ¶
type EventCanceled struct { // Only set if the operation completed synchronously, as a response to a StartOperation RPC. AttemptFailure *AttemptFailure Node *hsm.Node }
EventCanceled is triggered when an invocation attempt succeeds.
type EventFailed ¶
type EventFailed struct { // Only set if the operation completed synchronously, as a response to a StartOperation RPC. AttemptFailure *AttemptFailure Node *hsm.Node }
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct { Time time.Time Node *hsm.Node Attributes *historypb.NexusOperationStartedEventAttributes }
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct { // Only set if the operation completed synchronously, as a response to a StartOperation RPC. AttemptTime *time.Time Node *hsm.Node }
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type FailedEventDefinition ¶
type FailedEventDefinition struct{}
func (FailedEventDefinition) IsWorkflowTaskTrigger ¶
func (n FailedEventDefinition) IsWorkflowTaskTrigger() bool
func (FailedEventDefinition) Type ¶
func (n FailedEventDefinition) Type() enumspb.EventType
type InvocationTask ¶
type InvocationTask struct {
Destination string
}
func (InvocationTask) Concurrent ¶
func (InvocationTask) Concurrent() bool
func (InvocationTask) Kind ¶
func (t InvocationTask) Kind() hsm.TaskKind
func (InvocationTask) Type ¶
func (InvocationTask) Type() hsm.TaskType
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}
func (InvocationTaskSerializer) Deserialize ¶
type LimitedReadCloser ¶
type LimitedReadCloser struct { R io.ReadCloser N int64 }
A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type NexusTransportProvider ¶
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.
func DefaultNexusTransportProvider ¶
func DefaultNexusTransportProvider() NexusTransportProvider
type Operation ¶
type Operation struct {
*persistencespb.NexusOperationInfo
}
Operation state machine.
func (Operation) Cancel ¶
Cancel marks the Operation machine as canceled by spawning a child Cancelation machine and transitioning the child to the SCHEDULED state.
func (Operation) Cancelation ¶
func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
func (Operation) RegenerateTasks ¶
func (Operation) SetState ¶
func (o Operation) SetState(state enumsspb.NexusOperationState)
func (Operation) State ¶
func (o Operation) State() enumsspb.NexusOperationState
type ResponseSizeLimiter ¶
type ResponseSizeLimiter struct {
// contains filtered or unexported fields
}
type ScheduledEventDefinition ¶
type ScheduledEventDefinition struct{}
func (ScheduledEventDefinition) IsWorkflowTaskTrigger ¶
func (n ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
func (ScheduledEventDefinition) Type ¶
func (n ScheduledEventDefinition) Type() enumspb.EventType
type StartedEventDefinition ¶
type StartedEventDefinition struct{}
func (StartedEventDefinition) IsWorkflowTaskTrigger ¶
func (n StartedEventDefinition) IsWorkflowTaskTrigger() bool
func (StartedEventDefinition) Type ¶
func (n StartedEventDefinition) Type() enumspb.EventType
type TimedOutEventDefinition ¶
type TimedOutEventDefinition struct{}
func (TimedOutEventDefinition) IsWorkflowTaskTrigger ¶
func (n TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
func (TimedOutEventDefinition) Type ¶
func (n TimedOutEventDefinition) Type() enumspb.EventType
type TimeoutTask ¶
func (TimeoutTask) Concurrent ¶
func (TimeoutTask) Concurrent() bool
func (TimeoutTask) Kind ¶
func (t TimeoutTask) Kind() hsm.TaskKind
func (TimeoutTask) Type ¶
func (TimeoutTask) Type() hsm.TaskType
type TimeoutTaskSerializer ¶
type TimeoutTaskSerializer struct{}