Documentation ¶
Index ¶
- Constants
- Variables
- func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]
- func NewWorkflowClosedTrigger() *workflowpb.CallbackInfo_Trigger
- func RegisterExecutor(registry *hsm.Registry, options ActiveExecutorOptions, config *Config) error
- func RegisterStateMachine(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type ActiveExecutorOptions
- type BackoffTask
- type BackoffTaskSerializer
- type Callback
- type CanGetNexusCompletion
- type Config
- type EventAttemptFailed
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventSucceeded
- type HTTPCaller
- type InvocationTask
- type InvocationTaskSerializer
Constants ¶
const RequestTimeout = dynamicconfig.Key("component.callbacks.request.timeout")
RequestTimeout is the timeout for executing a single callback request.
Variables ¶
var ( TaskTypeInvocation = hsm.TaskType{ ID: 1, Name: "callbacks.Invocation", } TaskTypeBackoff = hsm.TaskType{ ID: 2, Name: "callbacks.Backoff", } )
var Module = fx.Module( "component.callbacks", fx.Provide(ConfigProvider), fx.Provide(CallbackExecutorOptionsProvider), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterStateMachine), fx.Invoke(RegisterExecutor), )
var StateMachineType = hsm.MachineType{
ID: 2,
Name: "callbacks.Callback",
}
Unique type identifier for this state machine.
var TransitionAttemptFailed = hsm.NewTransition( []enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED}, enumspb.CALLBACK_STATE_BACKING_OFF, func(cb Callback, event EventAttemptFailed) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(cb.PublicInfo.Attempt)) nextAttemptScheduleTime := event.Time.Add(nextDelay) cb.PublicInfo.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } return cb.output() }, )
var TransitionFailed = hsm.NewTransition( []enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED}, enumspb.CALLBACK_STATE_FAILED, func(cb Callback, event EventFailed) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return cb.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumspb.CallbackState{enumspb.CALLBACK_STATE_BACKING_OFF}, enumspb.CALLBACK_STATE_SCHEDULED, func(cb Callback, event EventRescheduled) (hsm.TransitionOutput, error) { cb.PublicInfo.NextAttemptScheduleTime = nil return cb.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumspb.CallbackState{enumspb.CALLBACK_STATE_STANDBY}, enumspb.CALLBACK_STATE_SCHEDULED, func(cb Callback, event EventScheduled) (hsm.TransitionOutput, error) { return cb.output() }, )
var TransitionSucceeded = hsm.NewTransition( []enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED}, enumspb.CALLBACK_STATE_SUCCEEDED, func(cb Callback, event EventSucceeded) (hsm.TransitionOutput, error) { cb.recordAttempt(event.Time) cb.PublicInfo.LastAttemptFailure = nil return cb.output() }, )
Functions ¶
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]
MachineCollection creates a new typed [statemachines.Collection] for callbacks.
func NewWorkflowClosedTrigger ¶
func NewWorkflowClosedTrigger() *workflowpb.CallbackInfo_Trigger
NewWorkflowClosedTrigger creates a WorkflowClosed trigger variant.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options ActiveExecutorOptions, config *Config, ) error
func RegisterStateMachine ¶
func RegisterTaskSerializers ¶
Types ¶
type ActiveExecutorOptions ¶
type ActiveExecutorOptions struct {
CallerProvider func(queues.NamespaceIDAndDestination) HTTPCaller
}
func CallbackExecutorOptionsProvider ¶
func CallbackExecutorOptionsProvider() ActiveExecutorOptions
type BackoffTask ¶
func (BackoffTask) Concurrent ¶
func (BackoffTask) Concurrent() bool
func (BackoffTask) Kind ¶
func (t BackoffTask) Kind() hsm.TaskKind
func (BackoffTask) Type ¶
func (BackoffTask) Type() hsm.TaskType
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
type Callback ¶
type Callback struct {
*persistencespb.CallbackInfo
}
Callback state machine.
func NewCallback ¶
func NewCallback(registrationTime *timestamppb.Timestamp, trigger *workflowpb.CallbackInfo_Trigger, cb *commonpb.Callback) Callback
NewCallback creates a new callback in the STANDBY state from given params.
func (Callback) SetState ¶
func (c Callback) SetState(state enumspb.CallbackState)
func (Callback) State ¶
func (c Callback) State() enumspb.CallbackState
type CanGetNexusCompletion ¶
type CanGetNexusCompletion interface {
GetNexusCompletion(ctx context.Context) (nexus.OperationCompletion, error)
}
type Config ¶
type Config struct {
RequestTimeout dynamicconfig.DurationPropertyFn
}
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
EventAttemptFailed is triggered when an attempt is failed with a retryable error.
type EventFailed ¶
EventFailed is triggered when an attempt is failed with a non retryable error.
type EventRescheduled ¶
type EventRescheduled struct{}
EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
type EventScheduled struct{}
EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.
type EventSucceeded ¶
EventSucceeded is triggered when an attempt succeeds.
type HTTPCaller ¶
HTTPCaller is a method that can be used to invoke HTTP requests.
type InvocationTask ¶
type InvocationTask struct {
Destination string
}
func (InvocationTask) Concurrent ¶
func (InvocationTask) Concurrent() bool
func (InvocationTask) Kind ¶
func (t InvocationTask) Kind() hsm.TaskKind
func (InvocationTask) Type ¶
func (InvocationTask) Type() hsm.TaskType
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}