callbacks

package
v1.25.0-114.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: MIT Imports: 24 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TaskTypeInvocation = hsm.TaskType{
		ID:   1,
		Name: "callbacks.Invocation",
	}
	TaskTypeBackoff = hsm.TaskType{
		ID:   2,
		Name: "callbacks.Backoff",
	}
)
View Source
var RequestCounter = metrics.NewCounterDef(
	"callback_outbound_requests",
	metrics.WithDescription("The number of callback outbound requests made by the history service."),
)
View Source
var RequestLatencyHistogram = metrics.NewTimerDef(
	"callback_outbound_latency",
	metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.callbacks.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for executing a single callback request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every callback request attempt for a given callback.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.maxInterval",
	time.Second,
	`The maximum backoff interval between every callback request attempt for a given callback.`,
)
View Source
var StateMachineType = hsm.MachineType{
	ID:   2,
	Name: "callbacks.Callback",
}

Unique type identifier for this state machine.

View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_BACKING_OFF,
	func(cb Callback, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.PublicInfo.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		cb.PublicInfo.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_FAILED,
	func(cb Callback, event EventFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_BACKING_OFF},
	enumspb.CALLBACK_STATE_SCHEDULED,
	func(cb Callback, event EventRescheduled) (hsm.TransitionOutput, error) {
		cb.PublicInfo.NextAttemptScheduleTime = nil
		return cb.output()
	},
)
View Source
var TransitionSucceeded = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_SUCCEEDED,
	func(cb Callback, event EventSucceeded) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.PublicInfo.LastAttemptFailure = nil
		return cb.output()
	},
)

Functions

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]

MachineCollection creates a new typed [statemachines.Collection] for callbacks.

func NewWorkflowClosedTrigger

func NewWorkflowClosedTrigger() *workflowpb.CallbackInfo_Trigger

NewWorkflowClosedTrigger creates a WorkflowClosed trigger variant.

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	activeExecutorOptions ActiveExecutorOptions,
	standbyExecutorOptions StandbyExecutorOptions,
	config *Config,
) error

func RegisterStateMachine

func RegisterStateMachine(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type ActiveExecutorOptions

type ActiveExecutorOptions struct {
	NamespaceRegistry namespace.Registry
	MetricsHandler    metrics.Handler
	CallerProvider    func(queues.NamespaceIDAndDestination) HTTPCaller
}

func ActiveExecutorOptionsProvider

func ActiveExecutorOptionsProvider(
	namespaceRegistry namespace.Registry,
	metricsHandler metrics.Handler,
) ActiveExecutorOptions

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() hsm.TaskType

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type Callback

type Callback struct {
	*persistencespb.CallbackInfo
}

Callback state machine.

func NewCallback

func NewCallback(registrationTime *timestamppb.Timestamp, trigger *workflowpb.CallbackInfo_Trigger, cb *commonpb.Callback) Callback

NewCallback creates a new callback in the STANDBY state from given params.

func (Callback) RegenerateTasks

func (c Callback) RegenerateTasks(*hsm.Node) ([]hsm.Task, error)

func (Callback) SetState

func (c Callback) SetState(state enumspb.CallbackState)

func (Callback) State

func (c Callback) State() enumspb.CallbackState

type CanGetNexusCompletion

type CanGetNexusCompletion interface {
	GetNexusCompletion(ctx context.Context) (nexus.OperationCompletion, error)
}

type Config

type Config struct {
	RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
	RetryPolicy    func() backoff.RetryPolicy
}

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Err         error
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an attempt is failed with a retryable error.

type EventFailed

type EventFailed struct {
	Time time.Time
	Err  error
}

EventFailed is triggered when an attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct{}

EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct{}

EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.

type EventSucceeded

type EventSucceeded struct {
	Time time.Time
}

EventSucceeded is triggered when an attempt succeeds.

type HTTPCaller

type HTTPCaller func(*http.Request) (*http.Response, error)

HTTPCaller is a method that can be used to invoke HTTP requests.

type InvocationTask

type InvocationTask struct {
	Destination string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() hsm.TaskType

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type StandbyExecutorOptions

type StandbyExecutorOptions struct{}

func StandbyExecutorOptionsProvider

func StandbyExecutorOptionsProvider() StandbyExecutorOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL