nexusoperations

package
v1.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: MIT Imports: 32 Imported by: 1

Documentation

Index

Constants

View Source
const Enabled = dynamicconfig.Key("component.nexusoperations.enabled")

Enabled toggles accepting of API requests and workflow commands that create or modify Nexus operations.

View Source
const MaxConcurrentOperations = dynamicconfig.Key("component.nexusoperations.limit.operation.concurrency")

MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution. Once the limit is reached, ScheduleNexusOperation commands will be rejected.

View Source
const MaxOperationNameLength = dynamicconfig.Key("component.nexusoperations.limit.operation.name.length")

MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name. ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected. Uses Go's len() function to determine the length.

View Source
const RequestTimeout = dynamicconfig.Key("component.nexusoperations.request.timeout")

RequestTimeout is the timeout for making a single nexus start or cancel request.

Variables

View Source
var (
	// OperationMachineType is a unique type identifier for the Operation state machine.
	OperationMachineType = hsm.MachineType{
		ID:   3,
		Name: "nexusoperations.Operation",
	}

	// CancelationMachineType is a unique type identifier for the Cancelation state machine.
	CancelationMachineType = hsm.MachineType{
		ID:   4,
		Name: "nexusoperations.Cancelation",
	}

	// CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
	CancelationMachineKey = hsm.Key{Type: CancelationMachineType.ID, ID: ""}
)
View Source
var (
	TaskTypeTimeout = hsm.TaskType{
		ID:   3,
		Name: "nexusoperations.Timeout",
	}
	TaskTypeInvocation = hsm.TaskType{
		ID:   4,
		Name: "nexusoperations.Invocation",
	}
	TaskTypeBackoff = hsm.TaskType{
		ID:   5,
		Name: "nexusoperations.Backoff",
	}
	TaskTypeCancelation = hsm.TaskType{
		ID:   6,
		Name: "nexusoperations.Cancelation",
	}
	TaskTypeCancelationBackoff = hsm.TaskType{
		ID:   7,
		Name: "nexusoperations.CancelationBackoff",
	}
)
View Source
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
	func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)

		nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(op.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		op.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return op.output(event.Node)
	},
)
View Source
var TransitionCancelationAttemptFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF,
	func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)

		nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(c.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCancelationFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED,
	func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)
		c.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return c.output(event.Node)
	},
)
View Source
var TransitionCanceled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_CANCELED,
	func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) {
		if event.AttemptFailure != nil {
			op.recordAttempt(event.AttemptFailure.Time)
			op.LastAttemptFailure = &failurepb.Failure{
				Message: event.AttemptFailure.Err.Error(),
				FailureInfo: &failurepb.Failure_CanceledFailureInfo{
					CanceledFailureInfo: &failurepb.CanceledFailureInfo{},
				},
			}
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_FAILED,
	func(op Operation, event EventFailed) (hsm.TransitionOutput, error) {
		if event.AttemptFailure != nil {
			op.recordAttempt(event.AttemptFailure.Time)
			op.LastAttemptFailure = &failurepb.Failure{
				Message: event.AttemptFailure.Err.Error(),
				FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
					ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
						NonRetryable: true,
					},
				},
			}
		}

		return op.output(event.Node)
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF},
	enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
	func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) {
		op.NextAttemptScheduleTime = nil
		return op.output(event.Node)
	},
)
View Source
var TransitionStarted = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_STARTED,
	func(op Operation, event EventStarted) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)
		op.OperationId = event.Attributes.OperationId
		return op.output(event.Node)
	},
)
View Source
var TransitionSucceeded = hsm.NewTransition(
	[]enumsspb.NexusOperationState{
		enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
		enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
		enumsspb.NEXUS_OPERATION_STATE_STARTED,
	},
	enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED,
	func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) {
		if event.AttemptTime != nil {
			op.recordAttempt(*event.AttemptTime)
		}

		return op.output(event.Node)
	},
)

Functions

func AddChild

func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error)

AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.

func CallbackTokenGeneratorProvider

func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator

func CompletionHandler

func CompletionHandler(
	ctx context.Context,
	env hsm.Environment,
	ref hsm.Ref,
	result *commonpb.Payload,
	opFailedError *nexus.UnsuccessfulOperationError,
) error

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]

MachineCollection creates a new typed [statemachines.Collection] for operations.

func RegisterEventDefinitions

func RegisterEventDefinitions(reg *hsm.Registry) error

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	options ActiveExecutorOptions,
) error

func RegisterStateMachines

func RegisterStateMachines(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type ActiveExecutorOptions

type ActiveExecutorOptions struct {
	fx.In

	Config                 *Config
	NamespaceRegistry      namespace.Registry
	CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
	ClientProvider         ClientProvider
}

type AttemptFailure

type AttemptFailure struct {
	Time time.Time
	Err  error
}

AttemptFailure carries failure information of an invocation attempt.

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() hsm.TaskType

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CancelRequestedEventDefinition

type CancelRequestedEventDefinition struct{}

func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger

func (n CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestedEventDefinition) Type

type Cancelation

type Cancelation struct {
	*persistencespb.NexusOperationCancellationInfo
}

Cancelation state machine for canceling an operation.

func (Cancelation) RegenerateTasks

func (c Cancelation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Cancelation) SetState

func (Cancelation) State

type CancelationBackoffTask

type CancelationBackoffTask struct {
	Deadline time.Time
}

func (CancelationBackoffTask) Concurrent

func (CancelationBackoffTask) Concurrent() bool

func (CancelationBackoffTask) Kind

func (CancelationBackoffTask) Type

type CancelationBackoffTaskSerializer

type CancelationBackoffTaskSerializer struct{}

func (CancelationBackoffTaskSerializer) Deserialize

func (CancelationBackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationBackoffTaskSerializer) Serialize

type CancelationTask

type CancelationTask struct {
	Destination string
}

func (CancelationTask) Concurrent

func (CancelationTask) Concurrent() bool

func (CancelationTask) Kind

func (t CancelationTask) Kind() hsm.TaskKind

func (CancelationTask) Type

func (CancelationTask) Type() hsm.TaskType

type CancelationTaskSerializer

type CancelationTaskSerializer struct{}

func (CancelationTaskSerializer) Deserialize

func (CancelationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (CancelationTaskSerializer) Serialize

func (CancelationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CanceledEventDefinition

type CanceledEventDefinition struct{}

func (CanceledEventDefinition) IsWorkflowTaskTrigger

func (n CanceledEventDefinition) IsWorkflowTaskTrigger() bool

func (CanceledEventDefinition) Type

type ClientProvider

func ClientProviderFactory

func ClientProviderFactory(namespaceRegistry namespace.Registry, httpTransportProvider NexusTransportProvider) ClientProvider

type CompletedEventDefinition

type CompletedEventDefinition struct{}

func (CompletedEventDefinition) IsWorkflowTaskTrigger

func (n CompletedEventDefinition) IsWorkflowTaskTrigger() bool

func (CompletedEventDefinition) Type

type EventAttemptFailed

type EventAttemptFailed struct {
	AttemptFailure
	Node *hsm.Node
}

EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.

type EventCancelationAttemptFailed

type EventCancelationAttemptFailed struct {
	Time time.Time
	Err  error
	Node *hsm.Node
}

EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.

type EventCancelationFailed

type EventCancelationFailed struct {
	Time time.Time
	Err  error
	Node *hsm.Node
}

EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.

type EventCancelationRescheduled

type EventCancelationRescheduled struct {
	Node *hsm.Node
}

EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.

type EventCancelationScheduled

type EventCancelationScheduled struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.

type EventCancelationSucceeded

type EventCancelationSucceeded struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationSucceeded is triggered when a cancelation attempt succeeds.

type EventCanceled

type EventCanceled struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptFailure *AttemptFailure
	Node           *hsm.Node
}

EventCanceled is triggered when an invocation attempt succeeds.

type EventFailed

type EventFailed struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptFailure *AttemptFailure
	Node           *hsm.Node
}

EventFailed is triggered when an invocation attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct {
	Node *hsm.Node
}

EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct {
	Node *hsm.Node
}

EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.

type EventStarted

type EventStarted struct {
	Time       time.Time
	Node       *hsm.Node
	Attributes *historypb.NexusOperationStartedEventAttributes
}

EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.

type EventSucceeded

type EventSucceeded struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	AttemptTime *time.Time
	Node        *hsm.Node
}

EventSucceeded is triggered when an invocation attempt succeeds.

type EventTimedOut

type EventTimedOut struct {
	Node *hsm.Node
}

EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.

type FailedEventDefinition

type FailedEventDefinition struct{}

func (FailedEventDefinition) IsWorkflowTaskTrigger

func (n FailedEventDefinition) IsWorkflowTaskTrigger() bool

func (FailedEventDefinition) Type

type InvocationTask

type InvocationTask struct {
	Destination string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() hsm.TaskType

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type LimitedReadCloser

type LimitedReadCloser struct {
	R io.ReadCloser
	N int64
}

A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.

func NewLimitedReadCloser

func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser

func (*LimitedReadCloser) Close

func (l *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (l *LimitedReadCloser) Read(p []byte) (n int, err error)

type NexusTransportProvider

type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper

NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.

func DefaultNexusTransportProvider

func DefaultNexusTransportProvider() NexusTransportProvider

type Operation

type Operation struct {
	*persistencespb.NexusOperationInfo
}

Operation state machine.

func (Operation) Cancel

func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error)

Cancel marks the Operation machine as canceled by spawning a child Cancelation machine and transitioning the child to the SCHEDULED state.

func (Operation) Cancelation

func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)

func (Operation) RegenerateTasks

func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Operation) SetState

func (o Operation) SetState(state enumsspb.NexusOperationState)

func (Operation) State

type ResponseSizeLimiter

type ResponseSizeLimiter struct {
	// contains filtered or unexported fields
}

func (ResponseSizeLimiter) RoundTrip

func (r ResponseSizeLimiter) RoundTrip(request *http.Request) (*http.Response, error)

type ScheduledEventDefinition

type ScheduledEventDefinition struct{}

func (ScheduledEventDefinition) IsWorkflowTaskTrigger

func (n ScheduledEventDefinition) IsWorkflowTaskTrigger() bool

func (ScheduledEventDefinition) Type

type StartedEventDefinition

type StartedEventDefinition struct{}

func (StartedEventDefinition) IsWorkflowTaskTrigger

func (n StartedEventDefinition) IsWorkflowTaskTrigger() bool

func (StartedEventDefinition) Type

type TimedOutEventDefinition

type TimedOutEventDefinition struct{}

func (TimedOutEventDefinition) IsWorkflowTaskTrigger

func (n TimedOutEventDefinition) IsWorkflowTaskTrigger() bool

func (TimedOutEventDefinition) Type

type TimeoutTask

type TimeoutTask struct {
	Deadline time.Time
}

func (TimeoutTask) Concurrent

func (TimeoutTask) Concurrent() bool

func (TimeoutTask) Kind

func (t TimeoutTask) Kind() hsm.TaskKind

func (TimeoutTask) Type

func (TimeoutTask) Type() hsm.TaskType

func (TimeoutTask) Validate

func (t TimeoutTask) Validate(node *hsm.Node) error

Validate checks if the timeout task is still valid to execute for the given node state.

type TimeoutTaskSerializer

type TimeoutTaskSerializer struct{}

func (TimeoutTaskSerializer) Deserialize

func (TimeoutTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (TimeoutTaskSerializer) Serialize

func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL