callbacks

package
v1.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 34 Imported by: 1

Documentation

Index

Constants

View Source
const (
	TaskTypeInvocation = "callbacks.Invocation"
	TaskTypeBackoff    = "callbacks.Backoff"
)
View Source
const (
	// StateMachineType is a unique type identifier for this state machine.
	StateMachineType = "callbacks.Callback"
)

Variables

View Source
var AllowedAddresses = dynamicconfig.NewNamespaceTypedSettingWithConverter(
	"component.callbacks.allowedAddresses",
	allowedAddressConverter,
	[]AddressMatchRule(nil),
	`The per-namespace list of addresses that are allowed for callbacks and whether secure connections (https) are required.
URLs are checked against each in order when starting a workflow with attached callbacks and only need to match one to pass validation.
Default is no address rules, meaning all callbacks will be rejected. Any invalid entries are ignored. Each entry is a map with possible values:
	 - "Pattern":string (required) the host:port pattern to which this config applies.
		Wildcards, '*', are supported and can match any number of characters (e.g. '*' matches everything, 'prefix.*.domain' matches 'prefix.a.domain' as well as 'prefix.a.b.domain').
	 - "AllowInsecure":bool (optional, default=false) indicates whether https is required`)
View Source
var RequestCounter = metrics.NewCounterDef(
	"callback_outbound_requests",
	metrics.WithDescription("The number of callback outbound requests made by the history service."),
)
View Source
var RequestLatencyHistogram = metrics.NewTimerDef(
	"callback_outbound_latency",
	metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.callbacks.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for executing a single callback request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every callback request attempt for a given callback.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.maxInterval",
	time.Hour,
	`The maximum backoff interval between every callback request attempt for a given callback.`,
)
View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED},
	enumsspb.CALLBACK_STATE_BACKING_OFF,
	func(cb Callback, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		cb.CallbackInfo.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		cb.CallbackInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED},
	enumsspb.CALLBACK_STATE_FAILED,
	func(cb Callback, event EventFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.CallbackInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumsspb.CallbackState{enumsspb.CALLBACK_STATE_BACKING_OFF},
	enumsspb.CALLBACK_STATE_SCHEDULED,
	func(cb Callback, event EventRescheduled) (hsm.TransitionOutput, error) {
		cb.CallbackInfo.NextAttemptScheduleTime = nil
		return cb.output()
	},
)
View Source
var TransitionSucceeded = hsm.NewTransition(
	[]enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED},
	enumsspb.CALLBACK_STATE_SUCCEEDED,
	func(cb Callback, event EventSucceeded) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.CallbackInfo.LastAttemptFailure = nil
		return cb.output()
	},
)

Functions

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]

MachineCollection creates a new typed [statemachines.Collection] for callbacks.

func NewWorkflowClosedTrigger

func NewWorkflowClosedTrigger() *persistencespb.CallbackInfo_Trigger

NewWorkflowClosedTrigger creates a WorkflowClosed trigger variant.

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	executorOptions TaskExecutorOptions,
) error

func RegisterStateMachine

func RegisterStateMachine(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type AddressMatchRule added in v1.25.0

type AddressMatchRule struct {
	Regexp        *regexp.Regexp
	AllowInsecure bool
}

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() string

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type Callback

type Callback struct {
	*persistencespb.CallbackInfo
}

Callback state machine.

func NewCallback

func NewCallback(registrationTime *timestamppb.Timestamp, trigger *persistencespb.CallbackInfo_Trigger, cb *persistencespb.Callback) Callback

NewCallback creates a new callback in the STANDBY state from given params.

func (Callback) RegenerateTasks

func (c Callback) RegenerateTasks(*hsm.Node) ([]hsm.Task, error)

func (Callback) SetState

func (c Callback) SetState(state enumsspb.CallbackState)

func (Callback) State

func (c Callback) State() enumsspb.CallbackState

type CanGetHSMCompletionCallbackArg added in v1.25.0

type CanGetHSMCompletionCallbackArg interface {
	GetHSMCompletionCallbackArg(ctx context.Context) (*persistencespb.HSMCompletionCallbackArg, error)
}

type CanGetNexusCompletion

type CanGetNexusCompletion interface {
	GetNexusCompletion(ctx context.Context) (nexus.OperationCompletion, error)
}

type Config

type Config struct {
	RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
	RetryPolicy    func() backoff.RetryPolicy
}

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Err         error
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an attempt is failed with a retryable error.

type EventFailed

type EventFailed struct {
	Time time.Time
	Err  error
}

EventFailed is triggered when an attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct{}

EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct{}

EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.

type EventSucceeded

type EventSucceeded struct {
	Time time.Time
}

EventSucceeded is triggered when an attempt succeeds.

type HTTPCaller

type HTTPCaller func(*http.Request) (*http.Response, error)

HTTPCaller is a method that can be used to invoke HTTP requests.

type HTTPCallerProvider added in v1.25.0

type HTTPCallerProvider func(queues.NamespaceIDAndDestination) HTTPCaller

func HTTPCallerProviderProvider added in v1.25.0

func HTTPCallerProviderProvider(
	clusterMetadata cluster.Metadata,
	rpcFactory common.RPCFactory,
	httpClientCache *cluster.FrontendHTTPClientCache,
	logger log.Logger,
) (HTTPCallerProvider, error)

type InvocationTask

type InvocationTask struct {
	// The base URL for nexus callbacks.
	// Will have other meanings as more callback use cases are added.
	Destination string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() string

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type TaskExecutorOptions added in v1.25.0

type TaskExecutorOptions struct {
	fx.In

	Config             *Config
	NamespaceRegistry  namespace.Registry
	MetricsHandler     metrics.Handler
	Logger             log.Logger
	HTTPCallerProvider HTTPCallerProvider
	HistoryClient      resource.HistoryClient
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL