callbacks

package
v1.25.0-115.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: MIT Imports: 24 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TaskTypeInvocation = hsm.TaskType{
		ID:   1,
		Name: "callbacks.Invocation",
	}
	TaskTypeBackoff = hsm.TaskType{
		ID:   2,
		Name: "callbacks.Backoff",
	}
)
View Source
var RequestCounter = metrics.NewCounterDef(
	"callback_outbound_requests",
	metrics.WithDescription("The number of callback outbound requests made by the history service."),
)
View Source
var RequestLatencyHistogram = metrics.NewTimerDef(
	"callback_outbound_latency",
	metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.callbacks.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for executing a single callback request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every callback request attempt for a given callback.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.callbacks.retryPolicy.maxInterval",
	time.Hour,
	`The maximum backoff interval between every callback request attempt for a given callback.`,
)
View Source
var StateMachineType = hsm.MachineType{
	ID:   2,
	Name: "callbacks.Callback",
}

Unique type identifier for this state machine.

View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_BACKING_OFF,
	func(cb Callback, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.PublicInfo.Attempt))
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		cb.PublicInfo.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionFailed = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_FAILED,
	func(cb Callback, event EventFailed) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.PublicInfo.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return cb.output()
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_BACKING_OFF},
	enumspb.CALLBACK_STATE_SCHEDULED,
	func(cb Callback, event EventRescheduled) (hsm.TransitionOutput, error) {
		cb.PublicInfo.NextAttemptScheduleTime = nil
		return cb.output()
	},
)
View Source
var TransitionSucceeded = hsm.NewTransition(
	[]enumspb.CallbackState{enumspb.CALLBACK_STATE_SCHEDULED},
	enumspb.CALLBACK_STATE_SUCCEEDED,
	func(cb Callback, event EventSucceeded) (hsm.TransitionOutput, error) {
		cb.recordAttempt(event.Time)
		cb.PublicInfo.LastAttemptFailure = nil
		return cb.output()
	},
)

Functions

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Callback]

MachineCollection creates a new typed [statemachines.Collection] for callbacks.

func NewWorkflowClosedTrigger

func NewWorkflowClosedTrigger() *workflowpb.CallbackInfo_Trigger

NewWorkflowClosedTrigger creates a WorkflowClosed trigger variant.

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	executorOptions TaskExecutorOptions,
	config *Config,
) error

func RegisterStateMachine

func RegisterStateMachine(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type BackoffTask

type BackoffTask struct {
	Deadline time.Time
}

func (BackoffTask) Concurrent

func (BackoffTask) Concurrent() bool

func (BackoffTask) Kind

func (t BackoffTask) Kind() hsm.TaskKind

func (BackoffTask) Type

func (BackoffTask) Type() hsm.TaskType

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type Callback

type Callback struct {
	*persistencespb.CallbackInfo
}

Callback state machine.

func NewCallback

func NewCallback(registrationTime *timestamppb.Timestamp, trigger *workflowpb.CallbackInfo_Trigger, cb *commonpb.Callback) Callback

NewCallback creates a new callback in the STANDBY state from given params.

func (Callback) RegenerateTasks

func (c Callback) RegenerateTasks(*hsm.Node) ([]hsm.Task, error)

func (Callback) SetState

func (c Callback) SetState(state enumspb.CallbackState)

func (Callback) State

func (c Callback) State() enumspb.CallbackState

type CanGetNexusCompletion

type CanGetNexusCompletion interface {
	GetNexusCompletion(ctx context.Context) (nexus.OperationCompletion, error)
}

type Config

type Config struct {
	RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
	RetryPolicy    func() backoff.RetryPolicy
}

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Err         error
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an attempt is failed with a retryable error.

type EventFailed

type EventFailed struct {
	Time time.Time
	Err  error
}

EventFailed is triggered when an attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct{}

EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct{}

EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.

type EventSucceeded

type EventSucceeded struct {
	Time time.Time
}

EventSucceeded is triggered when an attempt succeeds.

type HTTPCaller

type HTTPCaller func(*http.Request) (*http.Response, error)

HTTPCaller is a method that can be used to invoke HTTP requests.

type HTTPCallerProvider added in v1.25.0

type HTTPCallerProvider func(queues.NamespaceIDAndDestination) HTTPCaller

func HTTPCallerProviderProvider added in v1.25.0

func HTTPCallerProviderProvider() HTTPCallerProvider

type InvocationTask

type InvocationTask struct {
	Destination string
}

func (InvocationTask) Concurrent

func (InvocationTask) Concurrent() bool

func (InvocationTask) Kind

func (t InvocationTask) Kind() hsm.TaskKind

func (InvocationTask) Type

func (InvocationTask) Type() hsm.TaskType

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type TaskExecutorOptions added in v1.25.0

type TaskExecutorOptions struct {
	fx.In

	NamespaceRegistry namespace.Registry
	MetricsHandler    metrics.Handler
	CallerProvider    HTTPCallerProvider
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL