nexusoperations

package
v1.25.0-116.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: MIT Imports: 41 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// OperationMachineType is a unique type identifier for the Operation state machine.
	OperationMachineType = "nexusoperations.Operation"

	// CancelationMachineType is a unique type identifier for the Cancelation state machine.
	CancelationMachineType = "nexusoperations.Cancelation"
)
View Source
const (
	// CompletionSourceUnspecified indicates that the source is unspecified (e.g. when reapplying a history event that
	// doesn't record this information).
	CompletionSourceUnspecified = CompletionSource(iota)
	// CompletionSourceResponse indicates that a completion came synchronously from a response to a StartOperation
	// request.
	CompletionSourceResponse
	// CompletionSourceResponse indicates that a completion came asynchronously from a callback.
	CompletionSourceCallback
	// CompletionSourceCancelRequested indicates that the operation was canceled due to workflow cancelation request.
	CompletionSourceCancelRequested
)
View Source
const (
	TaskTypeTimeout            = "nexusoperations.Timeout"
	TaskTypeInvocation         = "nexusoperations.Invocation"
	TaskTypeBackoff            = "nexusoperations.Backoff"
	TaskTypeCancelation        = "nexusoperations.Cancelation"
	TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff"
)
View Source
const NexusCallbackSourceHeader = "Nexus-Callback-Source"

Variables

View Source
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
	"component.nexusoperations.callback.endpoint.template",
	"unset",
	`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
View Source
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}

CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.

View Source
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
View Source
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.concurrency",
	1000,
	`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
View Source
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.name.length",
	1000,
	`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.nexusoperations.limit.scheduleToCloseTimeout",
	0,
	`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
View Source
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.service.name.length",
	1000,
	`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var OutboundRequestCounter = metrics.NewCounterDef(
	"nexus_outbound_requests",
	metrics.WithDescription("The number of Nexus outbound requests made by the history service."),
)
View Source
var OutboundRequestLatencyHistogram = metrics.NewTimerDef(
	"nexus_outbound_latency",
	metrics.WithDescription("Latency histogram of outbound Nexus requests made by the history service."),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.nexusoperations.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for making a single nexus start or cancel request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.maxInterval",
	time.Hour,
	`The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
	func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), event.Err)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		op.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return op.output(event.Node)
	},
)
View Source
var TransitionCancelationAttemptFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF,
	func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), event.Err)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCancelationFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED,
	func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCanceled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_CANCELED,
	func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) {

		if event.CompletionSource == CompletionSourceResponse ||
			event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED {
			op.recordAttempt(event.Time)
			op.LastAttemptFailure = nil
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_FAILED,
	func(op Operation, event EventFailed) (hsm.TransitionOutput, error) {

		if event.CompletionSource == CompletionSourceResponse ||
			event.CompletionSource == CompletionSourceUnspecified && op.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED {
			op.recordAttempt(event.Time)
			op.LastAttemptFailure = &failurepb.Failure{

				Message: event.Attributes.GetFailure().GetCause().GetMessage(),
				FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
					ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
						NonRetryable: true,
					},
				},
			}
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF},
	enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
	func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) {
		op.NextAttemptScheduleTime = nil
		return op.output(event.Node)
	},
)
View Source
var TransitionStarted = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_STARTED,
	func(op Operation, event EventStarted) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)
		op.OperationId = event.Attributes.OperationId
		return op.output(event.Node)
	},
)

Functions

func AddChild

func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)

AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.

func CallbackTokenGeneratorProvider

func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator

func CompletionHandler

func CompletionHandler(
	ctx context.Context,
	env hsm.Environment,
	ref hsm.Ref,
	result *commonpb.Payload,
	opFailedError *nexus.UnsuccessfulOperationError,
) error

func EndpointRegistryLifetimeHooks added in v1.25.0

func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)

func EndpointRegistryProvider added in v1.25.0

func EndpointRegistryProvider(
	matchingClient resource.MatchingClient,
	endpointManager persistence.NexusEndpointManager,
	logger log.Logger,
	dc *dynamicconfig.Collection,
) commonnexus.EndpointRegistry

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]

MachineCollection creates a new typed [statemachines.Collection] for operations.

func RegisterEventDefinitions

func RegisterEventDefinitions(reg *hsm.Registry) error

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	options TaskExecutorOptions,
) error

func RegisterStateMachines

func RegisterStateMachines(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() string

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CancelRequestedEventDefinition

type CancelRequestedEventDefinition struct{}

func (CancelRequestedEventDefinition) Apply added in v1.25.0

func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger

func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestedEventDefinition) Type

type Cancelation

type Cancelation struct {
	*persistencespb.NexusOperationCancellationInfo
}

Cancelation state machine for canceling an operation.

func (Cancelation) RegenerateTasks

func (c Cancelation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Cancelation) SetState

func (Cancelation) State

type CancelationBackoffTask

type CancelationBackoffTask struct {
	Deadline time.Time
}

func (CancelationBackoffTask) Concurrent

func (CancelationBackoffTask) Concurrent() bool

func (CancelationBackoffTask) Kind

func (CancelationBackoffTask) Type

type CancelationBackoffTaskSerializer

type CancelationBackoffTaskSerializer struct{}

func (CancelationBackoffTaskSerializer) Deserialize

func (CancelationBackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationBackoffTaskSerializer) Serialize

type CancelationTask

type CancelationTask struct {
	EndpointName string
}

func (CancelationTask) Concurrent

func (CancelationTask) Concurrent() bool

func (CancelationTask) Kind

func (t CancelationTask) Kind() hsm.TaskKind

func (CancelationTask) Type

func (CancelationTask) Type() string

type CancelationTaskSerializer

type CancelationTaskSerializer struct{}

func (CancelationTaskSerializer) Deserialize

func (CancelationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationTaskSerializer) Serialize

func (CancelationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CanceledEventDefinition

type CanceledEventDefinition struct{}

func (CanceledEventDefinition) Apply added in v1.25.0

func (CanceledEventDefinition) IsWorkflowTaskTrigger

func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool

func (CanceledEventDefinition) Type

type ClientProvider

type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexus.Client, error)

ClientProvider provides a nexus client for a given endpoint.

func ClientProviderFactory

func ClientProviderFactory(
	namespaceRegistry namespace.Registry,
	endpointRegistry commonnexus.EndpointRegistry,
	httpTransportProvider NexusTransportProvider,
	clusterMetadata cluster.Metadata,
	rpcFactory common.RPCFactory,
) (ClientProvider, error)

type CompletedEventDefinition

type CompletedEventDefinition struct{}

func (CompletedEventDefinition) Apply added in v1.25.0

func (CompletedEventDefinition) IsWorkflowTaskTrigger

func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool

func (CompletedEventDefinition) Type

type CompletionSource added in v1.25.0

type CompletionSource int

CompletionSource is an enum specifying where an operation completion originated from.

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Err         error
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.

type EventCancelationAttemptFailed

type EventCancelationAttemptFailed struct {
	Time        time.Time
	Err         error
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.

type EventCancelationFailed

type EventCancelationFailed struct {
	Time time.Time
	Err  error
	Node *hsm.Node
}

EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.

type EventCancelationRescheduled

type EventCancelationRescheduled struct {
	Node *hsm.Node
}

EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.

type EventCancelationScheduled

type EventCancelationScheduled struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.

type EventCancelationSucceeded

type EventCancelationSucceeded struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationSucceeded is triggered when a cancelation attempt succeeds.

type EventCanceled

type EventCanceled struct {
	Time             time.Time
	Node             *hsm.Node
	CompletionSource CompletionSource
}

EventCanceled is triggered when an invocation attempt succeeds.

type EventFailed

type EventFailed struct {
	Time             time.Time
	Node             *hsm.Node
	Attributes       *historypb.NexusOperationFailedEventAttributes
	CompletionSource CompletionSource
}

EventFailed is triggered when an invocation attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct {
	Node *hsm.Node
}

EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct {
	Node *hsm.Node
}

EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.

type EventStarted

type EventStarted struct {
	Time       time.Time
	Node       *hsm.Node
	Attributes *historypb.NexusOperationStartedEventAttributes
}

EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.

type EventSucceeded

type EventSucceeded struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	Time             time.Time
	Node             *hsm.Node
	CompletionSource CompletionSource
}

EventSucceeded is triggered when an invocation attempt succeeds.

type EventTimedOut

type EventTimedOut struct {
	Node *hsm.Node
}

EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.

type FailedEventDefinition

type FailedEventDefinition struct{}

func (FailedEventDefinition) Apply added in v1.25.0

func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error

func (FailedEventDefinition) IsWorkflowTaskTrigger

func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool

func (FailedEventDefinition) Type

type InvocationTask

type InvocationTask struct {
	EndpointName string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() string

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type LimitedReadCloser

type LimitedReadCloser struct {
	R io.ReadCloser
	N int64
}

A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.

func NewLimitedReadCloser

func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser

func (*LimitedReadCloser) Close

func (l *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (l *LimitedReadCloser) Read(p []byte) (n int, err error)

type NexusTransportProvider

type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper

NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.

func DefaultNexusTransportProvider

func DefaultNexusTransportProvider() NexusTransportProvider

type Operation

type Operation struct {
	*persistencespb.NexusOperationInfo
}

Operation state machine.

func (Operation) Cancel

func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)

Cancel marks the Operation machine as canceled by spawning a child Cancelation machine and transitioning the child to the SCHEDULED state.

func (Operation) Cancelation

func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)

func (Operation) RegenerateTasks

func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Operation) SetState

func (o Operation) SetState(state enumsspb.NexusOperationState)

func (Operation) State

type ResponseSizeLimiter

type ResponseSizeLimiter struct {
	// contains filtered or unexported fields
}

func (ResponseSizeLimiter) RoundTrip

func (r ResponseSizeLimiter) RoundTrip(request *http.Request) (*http.Response, error)

type ScheduledEventDefinition

type ScheduledEventDefinition struct{}

func (ScheduledEventDefinition) Apply added in v1.25.0

func (ScheduledEventDefinition) IsWorkflowTaskTrigger

func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool

func (ScheduledEventDefinition) Type

type StartedEventDefinition

type StartedEventDefinition struct{}

func (StartedEventDefinition) Apply added in v1.25.0

func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error

func (StartedEventDefinition) IsWorkflowTaskTrigger

func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool

func (StartedEventDefinition) Type

type TaskExecutorOptions added in v1.25.0

type TaskExecutorOptions struct {
	fx.In

	Config                 *Config
	NamespaceRegistry      namespace.Registry
	MetricsHandler         metrics.Handler
	Logger                 log.Logger
	CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
	ClientProvider         ClientProvider
	EndpointRegistry       commonnexus.EndpointRegistry
}

type TimedOutEventDefinition

type TimedOutEventDefinition struct{}

func (TimedOutEventDefinition) Apply added in v1.25.0

func (TimedOutEventDefinition) IsWorkflowTaskTrigger

func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool

func (TimedOutEventDefinition) Type

type TimeoutTask

type TimeoutTask struct {
	Deadline time.Time
}

func (TimeoutTask) Concurrent

func (TimeoutTask) Concurrent() bool

func (TimeoutTask) Kind

func (t TimeoutTask) Kind() hsm.TaskKind

func (TimeoutTask) Type

func (TimeoutTask) Type() string

func (TimeoutTask) Validate

func (t TimeoutTask) Validate(node *hsm.Node) error

Validate checks if the timeout task is still valid to execute for the given node state.

type TimeoutTaskSerializer

type TimeoutTaskSerializer struct{}

func (TimeoutTaskSerializer) Deserialize

func (TimeoutTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (TimeoutTaskSerializer) Serialize

func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL