Documentation ¶
Index ¶
- Constants
- Variables
- func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, ...) (*hsm.Node, error)
- func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
- func CompletionHandler(ctx context.Context, env hsm.Environment, ref hsm.Ref, requestID string, ...) error
- func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
- func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
- func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
- func EndpointRegistryProvider(matchingClient resource.MatchingClient, ...) commonnexus.EndpointRegistry
- func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
- func RegisterEventDefinitions(reg *hsm.Registry) error
- func RegisterExecutor(registry *hsm.Registry, options TaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type BackoffTask
- type BackoffTaskSerializer
- type CancelRequestedEventDefinition
- func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestedEventDefinition) Type() enumspb.EventType
- type Cancelation
- type CancelationBackoffTask
- type CancelationBackoffTaskSerializer
- type CancelationTask
- type CancelationTaskSerializer
- type CanceledEventDefinition
- func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CanceledEventDefinition) Type() enumspb.EventType
- type ClientProvider
- type CompletedEventDefinition
- func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CompletedEventDefinition) Type() enumspb.EventType
- type CompletionSource
- type Config
- type EventAttemptFailed
- type EventCancelationAttemptFailed
- type EventCancelationFailed
- type EventCancelationRescheduled
- type EventCancelationScheduled
- type EventCancelationSucceeded
- type EventCanceled
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type FailedEventDefinition
- func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d FailedEventDefinition) Type() enumspb.EventType
- type InvocationTask
- type InvocationTaskSerializer
- type LimitedReadCloser
- type NexusTransportProvider
- type Operation
- func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)
- func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
- func (o Operation) CancelationNode(node *hsm.Node) (*hsm.Node, error)
- func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)
- func (o Operation) SetState(state enumsspb.NexusOperationState)
- func (o Operation) State() enumsspb.NexusOperationState
- type ResponseSizeLimiter
- type ScheduledEventDefinition
- func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d ScheduledEventDefinition) Type() enumspb.EventType
- type StartedEventDefinition
- func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d StartedEventDefinition) Type() enumspb.EventType
- type TaskExecutorOptions
- type TimedOutEventDefinition
- func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
- func (d TimedOutEventDefinition) Type() enumspb.EventType
- type TimeoutTask
- type TimeoutTaskSerializer
Constants ¶
const ( // OperationMachineType is a unique type identifier for the Operation state machine. OperationMachineType = "nexusoperations.Operation" // CancelationMachineType is a unique type identifier for the Cancelation state machine. CancelationMachineType = "nexusoperations.Cancelation" )
const ( // CompletionSourceUnspecified indicates that the source is unspecified (e.g. when reapplying a history event that // doesn't record this information). CompletionSourceUnspecified = CompletionSource(iota) // CompletionSourceResponse indicates that a completion came synchronously from a response to a StartOperation // request. CompletionSourceResponse // CompletionSourceResponse indicates that a completion came asynchronously from a callback. CompletionSourceCallback // CompletionSourceCancelRequested indicates that the operation was canceled due to workflow cancelation request. CompletionSourceCancelRequested )
const ( TaskTypeTimeout = "nexusoperations.Timeout" TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" )
const NexusCallbackSourceHeader = "Nexus-Callback-Source"
Variables ¶
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
"component.nexusoperations.callback.endpoint.template",
"unset",
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}
CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
var DisallowedOperationHeaders = dynamicconfig.NewNamespaceTypedSettingWithConverter( "component.nexusoperations.disallowedHeaders", func(a any) ([]string, error) { keys, ok := a.([]string) if !ok { return nil, fmt.Errorf("expected a string slice, got: %v", a) } for i, k := range keys { keys[i] = strings.ToLower(k) } return keys, nil }, []string{ "request-timeout", interceptor.DCRedirectionApiHeaderName, interceptor.DCRedirectionContextHeaderName, headers.CallerNameHeaderName, headers.CallerTypeHeaderName, headers.CallOriginHeaderName, }, `Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be rejected.`, )
var EndpointNotFoundAlwaysNonRetryable = dynamicconfig.NewNamespaceBoolSetting( "component.nexusoperations.endpointNotFoundAlwaysNonRetryable", false, `When set to true, if an endpoint is not found when processing a ScheduleNexusOperation command, the command will be accepted and the operation will fail on the first attempt. This defaults to false to prevent endpoint registry propagation delay from failing operations.`, )
var ErrOperationTimeoutBelowMin = errors.New("remaining operation timeout is less than required minimum")
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.concurrency",
30,
`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.header.size",
4096,
`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.name.length",
1000,
`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
"component.nexusoperations.limit.scheduleToCloseTimeout",
0,
`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.service.name.length",
1000,
`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MinOperationTimeout = dynamicconfig.NewNamespaceDurationSetting(
"componenet.nexusoperations.limit.operation.timeout.min",
0,
`MinOperationTimeout is the minimum time remaining for an operation to complete for the server to make
RPCs. If the remaining operation timeout is less than this value, a non-retryable timeout error will be returned.`,
)
var Module = fx.Module( "component.nexusoperations", fx.Provide(ConfigProvider), fx.Provide(ClientProviderFactory), fx.Provide(DefaultNexusTransportProvider), fx.Provide(CallbackTokenGeneratorProvider), fx.Provide(EndpointRegistryProvider), fx.Invoke(EndpointRegistryLifetimeHooks), fx.Invoke(RegisterStateMachines), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterEventDefinitions), fx.Invoke(RegisterExecutor), )
var OutboundRequestCounter = metrics.NewCounterDef( "nexus_outbound_requests", metrics.WithDescription("The number of Nexus outbound requests made by the history service."), )
var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.nexusoperations.request.timeout", time.Second*10, `RequestTimeout is the timeout for making a single nexus start or cancel request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), event.Err) nextAttemptScheduleTime := event.Time.Add(nextDelay) op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) op.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return op.output() }, )
var TransitionCancelationAttemptFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), event.Err) nextAttemptScheduleTime := event.Time.Add(nextDelay) c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) c.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return c.output(event.Node) }, )
var TransitionCancelationFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{ enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, }, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) c.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return c.output(event.Node) }, )
var TransitionCancelationRescheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(c Cancelation, event EventCancelationRescheduled) (hsm.TransitionOutput, error) { c.NextAttemptScheduleTime = nil return c.output(event.Node) }, )
var TransitionCancelationScheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(op Cancelation, event EventCancelationScheduled) (hsm.TransitionOutput, error) { op.RequestedTime = timestamppb.New(event.Time) return op.output(event.Node) }, )
var TransitionCancelationSucceeded = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, func(c Cancelation, event EventCancelationSucceeded) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) return c.output(event.Node) }, )
var TransitionCanceled = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_CANCELED, func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) op.LastAttemptFailure = nil } return op.output() }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_FAILED, func(op Operation, event EventFailed) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) op.LastAttemptFailure = &failurepb.Failure{ Message: event.Attributes.GetFailure().GetCause().GetMessage(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } } return op.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) { op.NextAttemptScheduleTime = nil return op.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionStarted = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_STARTED, func(op Operation, event EventStarted) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) op.OperationId = event.Attributes.OperationId child, err := op.CancelationNode(event.Node) if err != nil { return hsm.TransitionOutput{}, err } if child != nil { return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) { return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{ Time: event.Time, Node: child, }) }) } return op.output() }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED, func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) { if event.CompletionSource == CompletionSourceResponse || event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { op.recordAttempt(event.Time) } return op.output() }, )
var TransitionTimedOut = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) { return op.output() }, )
Functions ¶
func AddChild ¶
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)
AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func CallbackTokenGeneratorProvider ¶
func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
func CompletionHandler ¶
func ConvertLinkWorkflowEventToNexusLink ¶ added in v1.25.1
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
func ConvertNexusLinkToLinkWorkflowEvent ¶ added in v1.25.1
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
func EndpointRegistryLifetimeHooks ¶ added in v1.25.0
func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
func EndpointRegistryProvider ¶ added in v1.25.0
func EndpointRegistryProvider( matchingClient resource.MatchingClient, endpointManager persistence.NexusEndpointManager, dc *dynamicconfig.Collection, logger log.Logger, metricsHandler metrics.Handler, ) commonnexus.EndpointRegistry
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
MachineCollection creates a new typed [statemachines.Collection] for operations.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options TaskExecutorOptions, ) error
func RegisterStateMachines ¶
func RegisterTaskSerializers ¶
Types ¶
type BackoffTask ¶
type BackoffTask struct {
// contains filtered or unexported fields
}
func (BackoffTask) Deadline ¶
func (t BackoffTask) Deadline() time.Time
func (BackoffTask) Destination ¶ added in v1.26.2
func (t BackoffTask) Destination() string
func (BackoffTask) Type ¶
func (BackoffTask) Type() string
func (BackoffTask) Validate ¶ added in v1.25.2
func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
func (BackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelRequestedEventDefinition ¶
type CancelRequestedEventDefinition struct{}
func (CancelRequestedEventDefinition) Apply ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestedEventDefinition) Type ¶
func (d CancelRequestedEventDefinition) Type() enumspb.EventType
type Cancelation ¶
type Cancelation struct {
*persistencespb.NexusOperationCancellationInfo
}
Cancelation state machine for canceling an operation.
func (Cancelation) RegenerateTasks ¶
func (Cancelation) SetState ¶
func (c Cancelation) SetState(state enumspb.NexusOperationCancellationState)
func (Cancelation) State ¶
func (c Cancelation) State() enumspb.NexusOperationCancellationState
type CancelationBackoffTask ¶
type CancelationBackoffTask struct {
// contains filtered or unexported fields
}
func (CancelationBackoffTask) Deadline ¶
func (t CancelationBackoffTask) Deadline() time.Time
func (CancelationBackoffTask) Destination ¶ added in v1.26.2
func (CancelationBackoffTask) Destination() string
func (CancelationBackoffTask) Type ¶
func (CancelationBackoffTask) Type() string
func (CancelationBackoffTask) Validate ¶ added in v1.26.2
func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationBackoffTaskSerializer ¶
type CancelationBackoffTaskSerializer struct{}
func (CancelationBackoffTaskSerializer) Deserialize ¶
func (CancelationBackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelationTask ¶
type CancelationTask struct {
EndpointName string
}
func (CancelationTask) Deadline ¶ added in v1.26.2
func (CancelationTask) Deadline() time.Time
func (CancelationTask) Destination ¶
func (t CancelationTask) Destination() string
func (CancelationTask) Type ¶
func (CancelationTask) Type() string
func (CancelationTask) Validate ¶ added in v1.26.2
func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationTaskSerializer ¶
type CancelationTaskSerializer struct{}
func (CancelationTaskSerializer) Deserialize ¶
func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CanceledEventDefinition ¶
type CanceledEventDefinition struct{}
func (CanceledEventDefinition) Apply ¶ added in v1.25.0
func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CanceledEventDefinition) CherryPick ¶ added in v1.25.0
func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CanceledEventDefinition) IsWorkflowTaskTrigger ¶
func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
func (CanceledEventDefinition) Type ¶
func (d CanceledEventDefinition) Type() enumspb.EventType
type ClientProvider ¶
type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexus.Client, error)
ClientProvider provides a nexus client for a given endpoint.
func ClientProviderFactory ¶
func ClientProviderFactory( namespaceRegistry namespace.Registry, endpointRegistry commonnexus.EndpointRegistry, httpTransportProvider NexusTransportProvider, clusterMetadata cluster.Metadata, rpcFactory common.RPCFactory, ) (ClientProvider, error)
type CompletedEventDefinition ¶
type CompletedEventDefinition struct{}
func (CompletedEventDefinition) Apply ¶ added in v1.25.0
func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CompletedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CompletedEventDefinition) Type ¶
func (d CompletedEventDefinition) Type() enumspb.EventType
type CompletionSource ¶ added in v1.25.0
type CompletionSource int
CompletionSource is an enum specifying where an operation completion originated from.
type Config ¶
type Config struct { Enabled dynamicconfig.BoolPropertyFn RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter MinOperationTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter DisallowedOperationHeaders dynamicconfig.TypedPropertyFnWithNamespaceFilter[[]string] MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter CallbackURLTemplate dynamicconfig.StringPropertyFn EndpointNotFoundAlwaysNonRetryable dynamicconfig.BoolPropertyFnWithNamespaceFilter RetryPolicy func() backoff.RetryPolicy }
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct { Time time.Time Err error Node *hsm.Node RetryPolicy backoff.RetryPolicy }
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCancelationAttemptFailed ¶
type EventCancelationAttemptFailed struct { Time time.Time Err error Node *hsm.Node RetryPolicy backoff.RetryPolicy }
EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.
type EventCancelationFailed ¶
EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.
type EventCancelationRescheduled ¶
EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.
type EventCancelationScheduled ¶
EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancelationSucceeded ¶
EventCancelationSucceeded is triggered when a cancelation attempt succeeds.
type EventCanceled ¶
type EventCanceled struct { Time time.Time Node *hsm.Node CompletionSource CompletionSource }
EventCanceled is triggered when an invocation attempt succeeds.
type EventFailed ¶
type EventFailed struct { Time time.Time Node *hsm.Node Attributes *historypb.NexusOperationFailedEventAttributes CompletionSource CompletionSource }
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct { Time time.Time Node *hsm.Node Attributes *historypb.NexusOperationStartedEventAttributes }
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct { // Only set if the operation completed synchronously, as a response to a StartOperation RPC. Time time.Time Node *hsm.Node CompletionSource CompletionSource }
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type FailedEventDefinition ¶
type FailedEventDefinition struct{}
func (FailedEventDefinition) Apply ¶ added in v1.25.0
func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (FailedEventDefinition) CherryPick ¶ added in v1.25.0
func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (FailedEventDefinition) IsWorkflowTaskTrigger ¶
func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
func (FailedEventDefinition) Type ¶
func (d FailedEventDefinition) Type() enumspb.EventType
type InvocationTask ¶
type InvocationTask struct {
EndpointName string
}
func (InvocationTask) Deadline ¶ added in v1.26.2
func (InvocationTask) Deadline() time.Time
func (InvocationTask) Destination ¶
func (t InvocationTask) Destination() string
func (InvocationTask) Type ¶
func (InvocationTask) Type() string
func (InvocationTask) Validate ¶ added in v1.25.2
func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}
func (InvocationTaskSerializer) Deserialize ¶
func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type LimitedReadCloser ¶
type LimitedReadCloser struct { R io.ReadCloser N int64 }
A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type NexusTransportProvider ¶
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.
func DefaultNexusTransportProvider ¶
func DefaultNexusTransportProvider() NexusTransportProvider
type Operation ¶
type Operation struct {
*persistencespb.NexusOperationInfo
}
Operation state machine.
func (Operation) Cancel ¶
Cancel marks the Operation machine as canceled by spawning a child Cancelation machine and transitioning the child to the SCHEDULED state.
func (Operation) Cancelation ¶
func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
func (Operation) CancelationNode ¶ added in v1.25.2
func (Operation) RegenerateTasks ¶
func (Operation) SetState ¶
func (o Operation) SetState(state enumsspb.NexusOperationState)
func (Operation) State ¶
func (o Operation) State() enumsspb.NexusOperationState
type ResponseSizeLimiter ¶
type ResponseSizeLimiter struct {
// contains filtered or unexported fields
}
type ScheduledEventDefinition ¶
type ScheduledEventDefinition struct{}
func (ScheduledEventDefinition) Apply ¶ added in v1.25.0
func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (ScheduledEventDefinition) CherryPick ¶ added in v1.25.0
func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (ScheduledEventDefinition) IsWorkflowTaskTrigger ¶
func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
func (ScheduledEventDefinition) Type ¶
func (d ScheduledEventDefinition) Type() enumspb.EventType
type StartedEventDefinition ¶
type StartedEventDefinition struct{}
func (StartedEventDefinition) Apply ¶ added in v1.25.0
func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (StartedEventDefinition) CherryPick ¶ added in v1.25.0
func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (StartedEventDefinition) IsWorkflowTaskTrigger ¶
func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
func (StartedEventDefinition) Type ¶
func (d StartedEventDefinition) Type() enumspb.EventType
type TaskExecutorOptions ¶ added in v1.25.0
type TaskExecutorOptions struct { fx.In Config *Config NamespaceRegistry namespace.Registry MetricsHandler metrics.Handler Logger log.Logger CallbackTokenGenerator *commonnexus.CallbackTokenGenerator ClientProvider ClientProvider EndpointRegistry commonnexus.EndpointRegistry }
type TimedOutEventDefinition ¶
type TimedOutEventDefinition struct{}
func (TimedOutEventDefinition) Apply ¶ added in v1.25.0
func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (TimedOutEventDefinition) CherryPick ¶ added in v1.25.0
func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (TimedOutEventDefinition) IsWorkflowTaskTrigger ¶
func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
func (TimedOutEventDefinition) Type ¶
func (d TimedOutEventDefinition) Type() enumspb.EventType
type TimeoutTask ¶
type TimeoutTask struct {
// contains filtered or unexported fields
}
func (TimeoutTask) Deadline ¶
func (t TimeoutTask) Deadline() time.Time
func (TimeoutTask) Destination ¶ added in v1.26.2
func (TimeoutTask) Destination() string
func (TimeoutTask) Type ¶
func (TimeoutTask) Type() string
func (TimeoutTask) Validate ¶
func (t TimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the timeout task is still valid to execute for the given node state.
type TimeoutTaskSerializer ¶
type TimeoutTaskSerializer struct{}
func (TimeoutTaskSerializer) Deserialize ¶
func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)