nexusoperations

package
v1.25.0-114.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: MIT Imports: 38 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// OperationMachineType is a unique type identifier for the Operation state machine.
	OperationMachineType = hsm.MachineType{
		ID:   3,
		Name: "nexusoperations.Operation",
	}

	// CancelationMachineType is a unique type identifier for the Cancelation state machine.
	CancelationMachineType = hsm.MachineType{
		ID:   4,
		Name: "nexusoperations.Cancelation",
	}

	// CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
	CancelationMachineKey = hsm.Key{Type: CancelationMachineType.ID, ID: ""}
)
View Source
var (
	TaskTypeTimeout = hsm.TaskType{
		ID:   3,
		Name: "nexusoperations.Timeout",
	}
	TaskTypeInvocation = hsm.TaskType{
		ID:   4,
		Name: "nexusoperations.Invocation",
	}
	TaskTypeBackoff = hsm.TaskType{
		ID:   5,
		Name: "nexusoperations.Backoff",
	}
	TaskTypeCancelation = hsm.TaskType{
		ID:   6,
		Name: "nexusoperations.Cancelation",
	}
	TaskTypeCancelationBackoff = hsm.TaskType{
		ID:   7,
		Name: "nexusoperations.CancelationBackoff",
	}
)
View Source
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
	"component.nexusoperations.callback.endpoint.template",
	"unset",
	`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
View Source
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
View Source
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.concurrency",
	1000,
	`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
View Source
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.name.length",
	1000,
	`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.service.name.length",
	1000,
	`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var OutboundRequestCounter = metrics.NewCounterDef(
	"nexus_outbound_requests",
	metrics.WithDescription("The number of Nexus outbound requests made by the history service."),
)
View Source
var OutboundRequestLatencyHistogram = metrics.NewTimerDef(
	"nexus_outbound_latency",
	metrics.WithDescription("Latency histogram of outbound Nexus requests made by the history service."),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.nexusoperations.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for making a single nexus start or cancel request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.maxInterval",
	time.Second,
	`The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
	func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		op.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return op.output(event.Node)
	},
)
View Source
var TransitionCancelationAttemptFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF,
	func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCancelationFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED,
	func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCanceled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_CANCELED,
	func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) {
		if event.AttemptFailure != nil {
			op.recordAttempt(event.AttemptFailure.Time)
			op.LastAttemptFailure = &failurepb.Failure{
				Message: event.AttemptFailure.Err.Error(),
				FailureInfo: &failurepb.Failure_CanceledFailureInfo{
					CanceledFailureInfo: &failurepb.CanceledFailureInfo{},
				},
			}
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_FAILED,
	func(op Operation, event EventFailed) (hsm.TransitionOutput, error) {
		if event.AttemptFailure != nil {
			op.recordAttempt(event.AttemptFailure.Time)
			op.LastAttemptFailure = &failurepb.Failure{
				Message: event.AttemptFailure.Err.Error(),
				FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
					ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
						NonRetryable: true,
					},
				},
			}
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF},
	enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
	func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) {
		op.NextAttemptScheduleTime = nil
		return op.output(event.Node)
	},
)
View Source
var TransitionStarted = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_STARTED,
	func(op Operation, event EventStarted) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)
		op.OperationId = event.Attributes.OperationId
		return op.output(event.Node)
	},
)
View Source
var TransitionSucceeded = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED,
	func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) {
		if event.AttemptTime != nil {
			op.recordAttempt(*event.AttemptTime)
		}

		return op.output(event.Node)
	},
)

Functions

func AddChild

func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)

AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.

func CallbackTokenGeneratorProvider

func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator

func CompletionHandler

func CompletionHandler(
	ctx context.Context,
	env hsm.Environment,
	ref hsm.Ref,
	result *commonpb.Payload,
	opFailedError *nexus.UnsuccessfulOperationError,
) error

func EndpointRegistryLifetimeHooks added in v1.25.0

func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)

func EndpointRegistryProvider added in v1.25.0

func EndpointRegistryProvider(
	matchingClient resource.MatchingClient,
	endpointManager persistence.NexusEndpointManager,
	logger log.Logger,
	dc *dynamicconfig.Collection,
) commonnexus.EndpointRegistry

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]

MachineCollection creates a new typed [statemachines.Collection] for operations.

func RegisterEventDefinitions

func RegisterEventDefinitions(reg *hsm.Registry) error

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	options ActiveExecutorOptions,
) error

func RegisterStateMachines

func RegisterStateMachines(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type ActiveExecutorOptions

type ActiveExecutorOptions struct {
	fx.In

	Config                 *Config
	NamespaceRegistry      namespace.Registry
	MetricsHandler         metrics.Handler
	CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
	ClientProvider         ClientProvider
	EndpointChecker        EndpointChecker
}

type AttemptFailure

type AttemptFailure struct {
	Time time.Time
	Err  error
}

AttemptFailure carries failure information of an invocation attempt.

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() hsm.TaskType

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CancelRequestedEventDefinition

type CancelRequestedEventDefinition struct{}

func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger

func (n CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestedEventDefinition) Type

type Cancelation

type Cancelation struct {
	*persistencespb.NexusOperationCancellationInfo
}

Cancelation state machine for canceling an operation.

func (Cancelation) RegenerateTasks

func (c Cancelation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Cancelation) SetState

func (Cancelation) State

type CancelationBackoffTask

type CancelationBackoffTask struct {
	Deadline time.Time
}

func (CancelationBackoffTask) Concurrent

func (CancelationBackoffTask) Concurrent() bool

func (CancelationBackoffTask) Kind

func (CancelationBackoffTask) Type

type CancelationBackoffTaskSerializer

type CancelationBackoffTaskSerializer struct{}

func (CancelationBackoffTaskSerializer) Deserialize

func (CancelationBackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationBackoffTaskSerializer) Serialize

type CancelationTask

type CancelationTask struct {
	Destination string
}

func (CancelationTask) Concurrent

func (CancelationTask) Concurrent() bool

func (CancelationTask) Kind

func (t CancelationTask) Kind() hsm.TaskKind

func (CancelationTask) Type

func (CancelationTask) Type() hsm.TaskType

type CancelationTaskSerializer

type CancelationTaskSerializer struct{}

func (CancelationTaskSerializer) Deserialize

func (CancelationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationTaskSerializer) Serialize

func (CancelationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CanceledEventDefinition

type CanceledEventDefinition struct{}

func (CanceledEventDefinition) IsWorkflowTaskTrigger

func (n CanceledEventDefinition) IsWorkflowTaskTrigger() bool

func (CanceledEventDefinition) Type

type ClientProvider

type ClientProvider func(ctx context.Context, key queues.NamespaceIDAndDestination, service string) (*nexus.Client, error)

func ClientProviderFactory

func ClientProviderFactory(
	namespaceRegistry namespace.Registry,
	endpointRegistry commonnexus.EndpointRegistry,
	httpTransportProvider NexusTransportProvider,
	clusterMetadata cluster.Metadata,
	httpClientCache *cluster.FrontendHTTPClientCache,
) ClientProvider

type CompletedEventDefinition

type CompletedEventDefinition struct{}

func (CompletedEventDefinition) IsWorkflowTaskTrigger

func (n CompletedEventDefinition) IsWorkflowTaskTrigger() bool

func (CompletedEventDefinition) Type

type Config

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EndpointChecker

type EndpointChecker func(ctx context.Context, namespaceName, endpointName string) error

EndpointChecker checks if an endpoint exists, should return serviceerror.NotFound if the endpoint does not exist or any other error to indicate lookup has failed.

func EndpointCheckerProvider

func EndpointCheckerProvider(reg commonnexus.EndpointRegistry) EndpointChecker

type EventAttemptFailed

type EventAttemptFailed struct {
	AttemptFailure
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.

type EventCancelationAttemptFailed

type EventCancelationAttemptFailed struct {
	Time        time.Time
	Err         error
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.

type EventCancelationFailed

type EventCancelationFailed struct {
	Time time.Time
	Err  error
	Node *hsm.Node
}

EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.

type EventCancelationRescheduled

type EventCancelationRescheduled struct {
	Node *hsm.Node
}

EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.

type EventCancelationScheduled

type EventCancelationScheduled struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.

type EventCancelationSucceeded

type EventCancelationSucceeded struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationSucceeded is triggered when a cancelation attempt succeeds.

type EventCanceled

type EventCanceled struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptFailure *AttemptFailure
	Node           *hsm.Node
}

EventCanceled is triggered when an invocation attempt succeeds.

type EventFailed

type EventFailed struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptFailure *AttemptFailure
	Node           *hsm.Node
}

EventFailed is triggered when an invocation attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct {
	Node *hsm.Node
}

EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct {
	Node *hsm.Node
}

EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.

type EventStarted

type EventStarted struct {
	Time       time.Time
	Node       *hsm.Node
	Attributes *historypb.NexusOperationStartedEventAttributes
}

EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.

type EventSucceeded

type EventSucceeded struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptTime *time.Time
	Node        *hsm.Node
}

EventSucceeded is triggered when an invocation attempt succeeds.

type EventTimedOut

type EventTimedOut struct {
	Node *hsm.Node
}

EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.

type FailedEventDefinition

type FailedEventDefinition struct{}

func (FailedEventDefinition) IsWorkflowTaskTrigger

func (n FailedEventDefinition) IsWorkflowTaskTrigger() bool

func (FailedEventDefinition) Type

type InvocationTask

type InvocationTask struct {
	Destination string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() hsm.TaskType

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type LimitedReadCloser

type LimitedReadCloser struct {
	R io.ReadCloser
	N int64
}

A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.

func NewLimitedReadCloser

func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser

func (*LimitedReadCloser) Close

func (l *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (l *LimitedReadCloser) Read(p []byte) (n int, err error)

type NexusTransportProvider

type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper

NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.

func DefaultNexusTransportProvider

func DefaultNexusTransportProvider() NexusTransportProvider

type Operation

type Operation struct {
	*persistencespb.NexusOperationInfo
}

Operation state machine.

func (Operation) Cancel

func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)

Cancel marks the Operation machine as canceled by spawning a child Cancelation machine and transitioning the child to the SCHEDULED state.

func (Operation) Cancelation

func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)

func (Operation) RegenerateTasks

func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Operation) SetState

func (o Operation) SetState(state enumsspb.NexusOperationState)

func (Operation) State

type ResponseSizeLimiter

type ResponseSizeLimiter struct {
	// contains filtered or unexported fields
}

func (ResponseSizeLimiter) RoundTrip

func (r ResponseSizeLimiter) RoundTrip(request *http.Request) (*http.Response, error)

type ScheduledEventDefinition

type ScheduledEventDefinition struct{}

func (ScheduledEventDefinition) IsWorkflowTaskTrigger

func (n ScheduledEventDefinition) IsWorkflowTaskTrigger() bool

func (ScheduledEventDefinition) Type

type StartedEventDefinition

type StartedEventDefinition struct{}

func (StartedEventDefinition) IsWorkflowTaskTrigger

func (n StartedEventDefinition) IsWorkflowTaskTrigger() bool

func (StartedEventDefinition) Type

type TimedOutEventDefinition

type TimedOutEventDefinition struct{}

func (TimedOutEventDefinition) IsWorkflowTaskTrigger

func (n TimedOutEventDefinition) IsWorkflowTaskTrigger() bool

func (TimedOutEventDefinition) Type

type TimeoutTask

type TimeoutTask struct {
	Deadline time.Time
}

func (TimeoutTask) Concurrent

func (TimeoutTask) Concurrent() bool

func (TimeoutTask) Kind

func (t TimeoutTask) Kind() hsm.TaskKind

func (TimeoutTask) Type

func (TimeoutTask) Type() hsm.TaskType

func (TimeoutTask) Validate

func (t TimeoutTask) Validate(node *hsm.Node) error

Validate checks if the timeout task is still valid to execute for the given node state.

type TimeoutTaskSerializer

type TimeoutTaskSerializer struct{}

func (TimeoutTaskSerializer) Deserialize

func (TimeoutTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (TimeoutTaskSerializer) Serialize

func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL