Documentation ¶
Index ¶
- Constants
- Variables
- func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]
- func NewWorkflowClosedTrigger() *persistencespb.CallbackInfo_Trigger
- func RegisterExecutor(registry *hsm.Registry, executorOptions TaskExecutorOptions) error
- func RegisterStateMachine(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type AddressMatchRule
- type BackoffTask
- type BackoffTaskSerializer
- type Callback
- type CanGetHSMCompletionCallbackArg
- type CanGetNexusCompletion
- type Config
- type EventAttemptFailed
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventSucceeded
- type HTTPCaller
- type HTTPCallerProvider
- type InvocationTask
- type InvocationTaskSerializer
- type TaskExecutorOptions
Constants ¶
const ( TaskTypeInvocation = "callbacks.Invocation" TaskTypeBackoff = "callbacks.Backoff" )
const (
// StateMachineType is a unique type identifier for this state machine.
StateMachineType = "callbacks.Callback"
)
Variables ¶
var AllowedAddresses = dynamicconfig.NewNamespaceTypedSettingWithConverter( "component.callbacks.allowedAddresses", allowedAddressConverter, []AddressMatchRule(nil), `The per-namespace list of addresses that are allowed for callbacks and whether secure connections (https) are required. URLs are checked against each in order when starting a workflow with attached callbacks and only need to match one to pass validation. Default is no address rules, meaning all callbacks will be rejected. Any invalid entries are ignored. Each entry is a map with possible values: - "Pattern":string (required) the host:port pattern to which this config applies. Wildcards, '*', are supported and can match any number of characters (e.g. '*' matches everything, 'prefix.*.domain' matches 'prefix.a.domain' as well as 'prefix.a.b.domain'). - "AllowInsecure":bool (optional, default=false) indicates whether https is required`)
var Module = fx.Module( "component.callbacks", fx.Provide(ConfigProvider), fx.Provide(HTTPCallerProviderProvider), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterStateMachine), fx.Invoke(RegisterExecutor), )
var RequestCounter = metrics.NewCounterDef( "callback_outbound_requests", metrics.WithDescription("The number of callback outbound requests made by the history service."), )
var RequestLatencyHistogram = metrics.NewTimerDef( "callback_outbound_latency", metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.callbacks.request.timeout", time.Second*10, `RequestTimeout is the timeout for executing a single callback request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.callbacks.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every callback request attempt for a given callback.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.callbacks.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every callback request attempt for a given callback.`, )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED}, enumsspb.CALLBACK_STATE_BACKING_OFF, func(cb Callback, event EventAttemptFailed) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err) nextAttemptScheduleTime := event.Time.Add(nextDelay) cb.CallbackInfo.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) cb.CallbackInfo.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return cb.output() }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED}, enumsspb.CALLBACK_STATE_FAILED, func(cb Callback, event EventFailed) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) cb.CallbackInfo.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return cb.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.CallbackState{enumsspb.CALLBACK_STATE_BACKING_OFF}, enumsspb.CALLBACK_STATE_SCHEDULED, func(cb Callback, event EventRescheduled) (hsm.TransitionOutput, error) { cb.CallbackInfo.NextAttemptScheduleTime = nil return cb.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.CallbackState{enumsspb.CALLBACK_STATE_STANDBY}, enumsspb.CALLBACK_STATE_SCHEDULED, func(cb Callback, event EventScheduled) (hsm.TransitionOutput, error) { return cb.output() }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.CallbackState{enumsspb.CALLBACK_STATE_SCHEDULED}, enumsspb.CALLBACK_STATE_SUCCEEDED, func(cb Callback, event EventSucceeded) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) cb.CallbackInfo.LastAttemptFailure = nil return cb.output() }, )
Functions ¶
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]
MachineCollection creates a new typed [statemachines.Collection] for callbacks.
func NewWorkflowClosedTrigger ¶
func NewWorkflowClosedTrigger() *persistencespb.CallbackInfo_Trigger
NewWorkflowClosedTrigger creates a WorkflowClosed trigger variant.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, executorOptions TaskExecutorOptions, ) error
func RegisterStateMachine ¶
func RegisterTaskSerializers ¶
Types ¶
type AddressMatchRule ¶ added in v1.25.0
type BackoffTask ¶
func (BackoffTask) Concurrent ¶
func (BackoffTask) Concurrent() bool
func (BackoffTask) Kind ¶
func (t BackoffTask) Kind() hsm.TaskKind
func (BackoffTask) Type ¶
func (BackoffTask) Type() string
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
type Callback ¶
type Callback struct {
*persistencespb.CallbackInfo
}
Callback state machine.
func NewCallback ¶
func NewCallback(registrationTime *timestamppb.Timestamp, trigger *persistencespb.CallbackInfo_Trigger, cb *persistencespb.Callback) Callback
NewCallback creates a new callback in the STANDBY state from given params.
func (Callback) SetState ¶
func (c Callback) SetState(state enumsspb.CallbackState)
func (Callback) State ¶
func (c Callback) State() enumsspb.CallbackState
type CanGetHSMCompletionCallbackArg ¶ added in v1.25.0
type CanGetHSMCompletionCallbackArg interface {
GetHSMCompletionCallbackArg(ctx context.Context) (*persistencespb.HSMCompletionCallbackArg, error)
}
type CanGetNexusCompletion ¶
type CanGetNexusCompletion interface {
GetNexusCompletion(ctx context.Context) (nexus.OperationCompletion, error)
}
type Config ¶
type Config struct { RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter RetryPolicy func() backoff.RetryPolicy }
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct { Time time.Time Err error RetryPolicy backoff.RetryPolicy }
EventAttemptFailed is triggered when an attempt is failed with a retryable error.
type EventFailed ¶
EventFailed is triggered when an attempt is failed with a non retryable error.
type EventRescheduled ¶
type EventRescheduled struct{}
EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
type EventScheduled struct{}
EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.
type EventSucceeded ¶
EventSucceeded is triggered when an attempt succeeds.
type HTTPCaller ¶
HTTPCaller is a method that can be used to invoke HTTP requests.
type HTTPCallerProvider ¶ added in v1.25.0
type HTTPCallerProvider func(queues.NamespaceIDAndDestination) HTTPCaller
func HTTPCallerProviderProvider ¶ added in v1.25.0
func HTTPCallerProviderProvider( clusterMetadata cluster.Metadata, rpcFactory common.RPCFactory, httpClientCache *cluster.FrontendHTTPClientCache, logger log.Logger, ) (HTTPCallerProvider, error)
type InvocationTask ¶
type InvocationTask struct { // The base URL for nexus callbacks. // Will have other meanings as more callback use cases are added. Destination string }
func (InvocationTask) Concurrent ¶
func (InvocationTask) Concurrent() bool
func (InvocationTask) Kind ¶
func (t InvocationTask) Kind() hsm.TaskKind
func (InvocationTask) Type ¶
func (InvocationTask) Type() string
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}