Documentation ¶
Index ¶
- Constants
- Variables
- func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, ...) (*hsm.Node, error)
- func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
- func CompletionHandler(ctx context.Context, env hsm.Environment, ref hsm.Ref, requestID string, ...) error
- func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
- func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
- func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
- func EndpointRegistryProvider(matchingClient resource.MatchingClient, ...) commonnexus.EndpointRegistry
- func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
- func RegisterEventDefinitions(reg *hsm.Registry) error
- func RegisterExecutor(registry *hsm.Registry, options TaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type BackoffTask
- type BackoffTaskSerializer
- type CancelRequestedEventDefinition
- func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestedEventDefinition) Type() enumspb.EventType
- type Cancelation
- type CancelationBackoffTask
- type CancelationBackoffTaskSerializer
- type CancelationTask
- type CancelationTaskSerializer
- type CanceledEventDefinition
- func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CanceledEventDefinition) Type() enumspb.EventType
- type ClientProvider
- type CompletedEventDefinition
- func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CompletedEventDefinition) Type() enumspb.EventType
- type CompletionSource
- type Config
- type EventAttemptFailed
- type EventCancelationAttemptFailed
- type EventCancelationFailed
- type EventCancelationRescheduled
- type EventCancelationScheduled
- type EventCancelationSucceeded
- type EventCanceled
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type FailedEventDefinition
- func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d FailedEventDefinition) Type() enumspb.EventType
- type InvocationTask
- type InvocationTaskSerializer
- type LimitedReadCloser
- type NexusTransportProvider
- type Operation
- func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)
- func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
- func (o Operation) CancelationNode(node *hsm.Node) (*hsm.Node, error)
- func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)
- func (o Operation) SetState(state enumsspb.NexusOperationState)
- func (o Operation) State() enumsspb.NexusOperationState
- type ResponseSizeLimiter
- type ScheduledEventDefinition
- func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d ScheduledEventDefinition) Type() enumspb.EventType
- type StartedEventDefinition
- func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d StartedEventDefinition) Type() enumspb.EventType
- type TaskExecutorOptions
- type TimedOutEventDefinition
- func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
- func (d TimedOutEventDefinition) Type() enumspb.EventType
- type TimeoutTask
- type TimeoutTaskSerializer
Constants ¶
const ( // OperationMachineType is a unique type identifier for the Operation state machine. OperationMachineType = "nexusoperations.Operation" // CancelationMachineType is a unique type identifier for the Cancelation state machine. CancelationMachineType = "nexusoperations.Cancelation" )
const ( // CompletionSourceUnspecified indicates that the source is unspecified (e.g. when reapplying a history event that // doesn't record this information). CompletionSourceUnspecified = CompletionSource(iota) // CompletionSourceResponse indicates that a completion came synchronously from a response to a StartOperation // request. CompletionSourceResponse // CompletionSourceResponse indicates that a completion came asynchronously from a callback. CompletionSourceCallback // CompletionSourceCancelRequested indicates that the operation was canceled due to workflow cancelation request. CompletionSourceCancelRequested )
const ( TaskTypeTimeout = "nexusoperations.Timeout" TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" )
const NexusCallbackSourceHeader = "Nexus-Callback-Source"
Variables ¶
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
"component.nexusoperations.callback.endpoint.template",
"unset",
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}
CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
var DisallowedOperationHeaders = dynamicconfig.NewNamespaceTypedSettingWithConverter( "component.nexusoperations.disallowedHeaders", func(a any) ([]string, error) { keys, ok := a.([]string) if !ok { return nil, fmt.Errorf("expected a string slice, got: %v", a) } for i, k := range keys { keys[i] = strings.ToLower(k) } return keys, nil }, []string{ "request-timeout", interceptor.DCRedirectionApiHeaderName, interceptor.DCRedirectionContextHeaderName, headers.CallerNameHeaderName, headers.CallerTypeHeaderName, headers.CallOriginHeaderName, }, `Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be rejected.`, )
var EndpointNotFoundAlwaysNonRetryable = dynamicconfig.NewNamespaceBoolSetting( "component.nexusoperations.endpointNotFoundAlwaysNonRetryable", false, `When set to true, if an endpoint is not found when processing a ScheduleNexusOperation command, the command will be accepted and the operation will fail on the first attempt. This defaults to false to prevent endpoint registry propagation delay from failing operations.`, )
var ErrOperationTimeoutBelowMin = errors.New("remaining operation timeout is less than required minimum")
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.concurrency",
30,
`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.header.size",
4096,
`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.name.length",
1000,
`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
"component.nexusoperations.limit.scheduleToCloseTimeout",
0,
`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.service.name.length",
1000,
`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MinOperationTimeout = dynamicconfig.NewNamespaceDurationSetting(
"componenet.nexusoperations.limit.operation.timeout.min",
0,
`MinOperationTimeout is the minimum time remaining for an operation to complete for the server to make
RPCs. If the remaining operation timeout is less than this value, a non-retryable timeout error will be returned.`,
)
var Module = fx.Module( "component.nexusoperations", fx.Provide(ConfigProvider), fx.Provide(ClientProviderFactory), fx.Provide(DefaultNexusTransportProvider), fx.Provide(CallbackTokenGeneratorProvider), fx.Provide(EndpointRegistryProvider), fx.Invoke(EndpointRegistryLifetimeHooks), fx.Invoke(RegisterStateMachines), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterEventDefinitions), fx.Invoke(RegisterExecutor), )
var OutboundRequestCounter = metrics.NewCounterDef( "nexus_outbound_requests", metrics.WithDescription("The number of Nexus outbound requests made by the history service."), )
var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.nexusoperations.request.timeout", time.Second*10, `RequestTimeout is the timeout for making a single nexus start or cancel request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) op.LastAttemptFailure = event.Failure return op.output() }, )
var TransitionCancelationAttemptFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{ enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, }, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationRescheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(c Cancelation, event EventCancelationRescheduled) (hsm.TransitionOutput, error) { c.NextAttemptScheduleTime = nil return c.output(event.Node) }, )
var TransitionCancelationScheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(op Cancelation, event EventCancelationScheduled) (hsm.TransitionOutput, error) { op.RequestedTime = timestamppb.New(event.Time) return op.output(event.Node) }, )
var TransitionCancelationSucceeded = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, func(c Cancelation, event EventCancelationSucceeded) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) return c.output(event.Node) }, )
var TransitionCanceled = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_CANCELED, func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) op.LastAttemptFailure = nil } return op.output() }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_FAILED, func(op Operation, event EventFailed) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) op.LastAttemptFailure = &failurepb.Failure{ Message: event.Attributes.GetFailure().GetCause().GetMessage(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } } return op.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) { op.NextAttemptScheduleTime = nil return op.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionStarted = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_STARTED, func(op Operation, event EventStarted) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) op.OperationId = event.Attributes.OperationId child, err := op.CancelationNode(event.Node) if err != nil { return hsm.TransitionOutput{}, err } if child != nil { return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) { return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{ Time: event.Time, Node: child, }) }) } return op.output() }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED, func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) } return op.output() }, )
var TransitionTimedOut = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) { return op.output() }, )
Functions ¶
func AddChild ¶
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)
AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func CallbackTokenGeneratorProvider ¶
func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
func CompletionHandler ¶
func ConvertLinkWorkflowEventToNexusLink ¶ added in v1.25.1
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
func ConvertNexusLinkToLinkWorkflowEvent ¶ added in v1.25.1
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
func EndpointRegistryLifetimeHooks ¶ added in v1.25.0
func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
func EndpointRegistryProvider ¶ added in v1.25.0
func EndpointRegistryProvider( matchingClient resource.MatchingClient, endpointManager persistence.NexusEndpointManager, dc *dynamicconfig.Collection, logger log.Logger, metricsHandler metrics.Handler, ) commonnexus.EndpointRegistry
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
MachineCollection creates a new typed [statemachines.Collection] for operations.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options TaskExecutorOptions, ) error
func RegisterStateMachines ¶
func RegisterTaskSerializers ¶
Types ¶
type BackoffTask ¶
type BackoffTask struct {
// contains filtered or unexported fields
}
func (BackoffTask) Deadline ¶
func (t BackoffTask) Deadline() time.Time
func (BackoffTask) Destination ¶ added in v1.26.2
func (t BackoffTask) Destination() string
func (BackoffTask) Type ¶
func (BackoffTask) Type() string
func (BackoffTask) Validate ¶ added in v1.25.2
func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
func (BackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelRequestedEventDefinition ¶
type CancelRequestedEventDefinition struct{}
func (CancelRequestedEventDefinition) Apply ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestedEventDefinition) Type ¶
func (d CancelRequestedEventDefinition) Type() enumspb.EventType
type Cancelation ¶
type Cancelation struct {
*persistencespb.NexusOperationCancellationInfo
}
Cancelation state machine for canceling an operation.
func (Cancelation) RegenerateTasks ¶
func (Cancelation) SetState ¶
func (c Cancelation) SetState(state enumspb.NexusOperationCancellationState)
func (Cancelation) State ¶
func (c Cancelation) State() enumspb.NexusOperationCancellationState
type CancelationBackoffTask ¶
type CancelationBackoffTask struct {
// contains filtered or unexported fields
}
func (CancelationBackoffTask) Deadline ¶
func (t CancelationBackoffTask) Deadline() time.Time
func (CancelationBackoffTask) Destination ¶ added in v1.26.2
func (CancelationBackoffTask) Destination() string
func (CancelationBackoffTask) Type ¶
func (CancelationBackoffTask) Type() string
func (CancelationBackoffTask) Validate ¶ added in v1.26.2
func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationBackoffTaskSerializer ¶
type CancelationBackoffTaskSerializer struct{}
func (CancelationBackoffTaskSerializer) Deserialize ¶
func (CancelationBackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelationTask ¶
type CancelationTask struct {
EndpointName string
}
func (CancelationTask) Deadline ¶ added in v1.26.2
func (CancelationTask) Deadline() time.Time
func (CancelationTask) Destination ¶
func (t CancelationTask) Destination() string
func (CancelationTask) Type ¶
func (CancelationTask) Type() string
func (CancelationTask) Validate ¶ added in v1.26.2
func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationTaskSerializer ¶
type CancelationTaskSerializer struct{}
func (CancelationTaskSerializer) Deserialize ¶
func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CanceledEventDefinition ¶
type CanceledEventDefinition struct{}
func (CanceledEventDefinition) Apply ¶ added in v1.25.0
func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CanceledEventDefinition) CherryPick ¶ added in v1.25.0
func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CanceledEventDefinition) IsWorkflowTaskTrigger ¶
func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
func (CanceledEventDefinition) Type ¶
func (d CanceledEventDefinition) Type() enumspb.EventType
type ClientProvider ¶
type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexus.HTTPClient, error)
ClientProvider provides a nexus client for a given endpoint.
func ClientProviderFactory ¶
func ClientProviderFactory( namespaceRegistry namespace.Registry, endpointRegistry commonnexus.EndpointRegistry, httpTransportProvider NexusTransportProvider, clusterMetadata cluster.Metadata, rpcFactory common.RPCFactory, ) (ClientProvider, error)
type CompletedEventDefinition ¶
type CompletedEventDefinition struct{}
func (CompletedEventDefinition) Apply ¶ added in v1.25.0
func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CompletedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CompletedEventDefinition) Type ¶
func (d CompletedEventDefinition) Type() enumspb.EventType
type CompletionSource ¶ added in v1.25.0
type CompletionSource int
CompletionSource is an enum specifying where an operation completion originated from.
type Config ¶
type Config struct { Enabled dynamicconfig.BoolPropertyFn RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter MinOperationTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter DisallowedOperationHeaders dynamicconfig.TypedPropertyFnWithNamespaceFilter[[]string] MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter CallbackURLTemplate dynamicconfig.StringPropertyFn EndpointNotFoundAlwaysNonRetryable dynamicconfig.BoolPropertyFnWithNamespaceFilter RetryPolicy func() backoff.RetryPolicy }
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct { Time time.Time Failure *failurepb.Failure Node *hsm.Node RetryPolicy backoff.RetryPolicy }
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCancelationAttemptFailed ¶
type EventCancelationAttemptFailed struct { Time time.Time Failure *failurepb.Failure Node *hsm.Node RetryPolicy backoff.RetryPolicy }
EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.
type EventCancelationFailed ¶
EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.
type EventCancelationRescheduled ¶
EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.
type EventCancelationScheduled ¶
EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancelationSucceeded ¶
EventCancelationSucceeded is triggered when a cancelation attempt succeeds.
type EventCanceled ¶
type EventCanceled struct { Time time.Time Node *hsm.Node CompletionSource CompletionSource }
EventCanceled is triggered when an invocation attempt succeeds.
type EventFailed ¶
type EventFailed struct { Time time.Time Node *hsm.Node Attributes *historypb.NexusOperationFailedEventAttributes CompletionSource CompletionSource }
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct { Time time.Time Node *hsm.Node Attributes *historypb.NexusOperationStartedEventAttributes }
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct { // Only set if the operation completed synchronously, as a response to a StartOperation RPC. Time time.Time Node *hsm.Node CompletionSource CompletionSource }
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type FailedEventDefinition ¶
type FailedEventDefinition struct{}
func (FailedEventDefinition) Apply ¶ added in v1.25.0
func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (FailedEventDefinition) CherryPick ¶ added in v1.25.0
func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (FailedEventDefinition) IsWorkflowTaskTrigger ¶
func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
func (FailedEventDefinition) Type ¶
func (d FailedEventDefinition) Type() enumspb.EventType
type InvocationTask ¶
type InvocationTask struct {
EndpointName string
}
func (InvocationTask) Deadline ¶ added in v1.26.2
func (InvocationTask) Deadline() time.Time
func (InvocationTask) Destination ¶
func (t InvocationTask) Destination() string
func (InvocationTask) Type ¶
func (InvocationTask) Type() string
func (InvocationTask) Validate ¶ added in v1.25.2
func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}
func (InvocationTaskSerializer) Deserialize ¶
func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type LimitedReadCloser ¶
type LimitedReadCloser struct { R io.ReadCloser N int64 }
A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type NexusTransportProvider ¶
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.
func DefaultNexusTransportProvider ¶
func DefaultNexusTransportProvider() NexusTransportProvider
type Operation ¶
type Operation struct {
*persistencespb.NexusOperationInfo
}
Operation state machine.
func (Operation) Cancel ¶
Cancel marks the Operation machine as canceled by spawning a child Cancelation machine. If the Operation already completed, then the Operation cannot be canceled anymore, and the Cancelation machine will stay in UNSPECIFIED state. If the Operation is in STARTED state, then transition the Cancelation machine to the SCHEDULED state. Otherwise, the Cancelation machine will wait the Operation machine transition to the STARTED state.
func (Operation) Cancelation ¶
func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
func (Operation) CancelationNode ¶ added in v1.25.2
func (Operation) RegenerateTasks ¶
func (Operation) SetState ¶
func (o Operation) SetState(state enumsspb.NexusOperationState)
func (Operation) State ¶
func (o Operation) State() enumsspb.NexusOperationState
type ResponseSizeLimiter ¶
type ResponseSizeLimiter struct {
// contains filtered or unexported fields
}
type ScheduledEventDefinition ¶
type ScheduledEventDefinition struct{}
func (ScheduledEventDefinition) Apply ¶ added in v1.25.0
func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (ScheduledEventDefinition) CherryPick ¶ added in v1.25.0
func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (ScheduledEventDefinition) IsWorkflowTaskTrigger ¶
func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
func (ScheduledEventDefinition) Type ¶
func (d ScheduledEventDefinition) Type() enumspb.EventType
type StartedEventDefinition ¶
type StartedEventDefinition struct{}
func (StartedEventDefinition) Apply ¶ added in v1.25.0
func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (StartedEventDefinition) CherryPick ¶ added in v1.25.0
func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (StartedEventDefinition) IsWorkflowTaskTrigger ¶
func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
func (StartedEventDefinition) Type ¶
func (d StartedEventDefinition) Type() enumspb.EventType
type TaskExecutorOptions ¶ added in v1.25.0
type TaskExecutorOptions struct { fx.In Config *Config NamespaceRegistry namespace.Registry MetricsHandler metrics.Handler Logger log.Logger CallbackTokenGenerator *commonnexus.CallbackTokenGenerator ClientProvider ClientProvider EndpointRegistry commonnexus.EndpointRegistry }
type TimedOutEventDefinition ¶
type TimedOutEventDefinition struct{}
func (TimedOutEventDefinition) Apply ¶ added in v1.25.0
func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (TimedOutEventDefinition) CherryPick ¶ added in v1.25.0
func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (TimedOutEventDefinition) IsWorkflowTaskTrigger ¶
func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
func (TimedOutEventDefinition) Type ¶
func (d TimedOutEventDefinition) Type() enumspb.EventType
type TimeoutTask ¶
type TimeoutTask struct {
// contains filtered or unexported fields
}
func (TimeoutTask) Deadline ¶
func (t TimeoutTask) Deadline() time.Time
func (TimeoutTask) Destination ¶ added in v1.26.2
func (TimeoutTask) Destination() string
func (TimeoutTask) Type ¶
func (TimeoutTask) Type() string
func (TimeoutTask) Validate ¶
func (t TimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the timeout task is still valid to execute for the given node state.
type TimeoutTaskSerializer ¶
type TimeoutTaskSerializer struct{}
func (TimeoutTaskSerializer) Deserialize ¶
func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)