resiliency

package
v1.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: Apache-2.0 Imports: 25 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const (
	BuiltInServiceRetries         BuiltInPolicyName     = "DaprBuiltInServiceRetries"
	BuiltInActorRetries           BuiltInPolicyName     = "DaprBuiltInActorRetries"
	BuiltInActorReminderRetries   BuiltInPolicyName     = "DaprBuiltInActorReminderRetries"
	BuiltInActorNotFoundRetries   BuiltInPolicyName     = "DaprBuiltInActorNotFoundRetries"
	BuiltInInitializationRetries  BuiltInPolicyName     = "DaprBuiltInInitializationRetries"
	DefaultRetryTemplate          DefaultPolicyTemplate = "Default%sRetryPolicy"
	DefaultTimeoutTemplate        DefaultPolicyTemplate = "Default%sTimeoutPolicy"
	DefaultCircuitBreakerTemplate DefaultPolicyTemplate = "Default%sCircuitBreakerPolicy"
	Endpoint                      PolicyTypeName        = "App"
	Component                     PolicyTypeName        = "Component"
	Actor                         PolicyTypeName        = "Actor"
	Binding                       ComponentType         = "Binding"
	Configuration                 ComponentType         = "Configuration"
	Lock                          ComponentType         = "Lock"
	Pubsub                        ComponentType         = "Pubsub"
	Crypto                        ComponentType         = "Crypto"
	Secretstore                   ComponentType         = "Secretstore"
	Statestore                    ComponentType         = "Statestore"
	Inbound                       ComponentDirection    = "Inbound"
	Outbound                      ComponentDirection    = "Outbound"
)

Variables

View Source
var ComponentInboundPolicy = ComponentPolicy{
	// contains filtered or unexported fields
}
View Source
var ComponentOutboundPolicy = ComponentPolicy{
	// contains filtered or unexported fields
}

Functions

func DisposerCloser added in v1.10.0

func DisposerCloser[T io.Closer](obj T)

DisposerCloser is a Disposer function for RunnerOpts that invokes Close() on the object.

func GetAttempt added in v1.13.0

func GetAttempt(ctx context.Context) int32

GetAttempt returns the attempt number from a context Attempts are numbered from 1 onwards. If the context doesn't have an attempt number, returns 0

func IsCircuitBreakerError added in v1.10.0

func IsCircuitBreakerError(err error) bool

IsCircuitBreakerError returns true if the error is cicuit breaker open or too many requests in half-open state.

func IsTimeoutExeceeded added in v1.10.0

func IsTimeoutExeceeded(err error) bool

IsTimeExceeded returns true if the context timeout has elapsed.

func LoadKubernetesResiliency

func LoadKubernetesResiliency(log logger.Logger, runtimeID, namespace string, operatorClient operatorv1pb.OperatorClient) []*resiliencyV1alpha.Resiliency

LoadKubernetesResiliency loads resiliency configurations from the Kubernetes operator.

func LoadLocalResiliency added in v1.11.0

func LoadLocalResiliency(log logger.Logger, runtimeID string, paths ...string) []*resiliencyV1alpha.Resiliency

LoadLocalResiliency loads resiliency configurations from local folders.

Types

type ActorCircuitBreakerScope

type ActorCircuitBreakerScope int

ActorCircuitBreakerScope indicates the scope of the circuit breaker for an actor.

const (
	// ActorCircuitBreakerScopeType indicates the type scope (less granular).
	ActorCircuitBreakerScopeType ActorCircuitBreakerScope = iota
	// ActorCircuitBreakerScopeID indicates the type+id scope (more granular).
	ActorCircuitBreakerScopeID
	// ActorCircuitBreakerScopeBoth indicates both type and type+id are used for scope.
	ActorCircuitBreakerScopeBoth // Usage is TODO.

)

func ParseActorCircuitBreakerScope

func ParseActorCircuitBreakerScope(val string) (ActorCircuitBreakerScope, error)

ParseActorCircuitBreakerScope parses a string to a `ActorCircuitBreakerScope`.

type ActorPolicies

type ActorPolicies struct {
	PreLockPolicies  ActorPreLockPolicyNames
	PostLockPolicies ActorPostLockPolicyNames
}

Actors have different behavior before and after locking.

type ActorPolicy added in v1.9.0

type ActorPolicy struct{}

type ActorPostLockPolicyNames

type ActorPostLockPolicyNames struct {
	Timeout string
}

Policy used after an actor is locked. It only uses timeout as retry/circuit breaker is handled before locking.

type ActorPreLockPolicyNames

type ActorPreLockPolicyNames struct {
	Retry               string
	CircuitBreaker      string
	CircuitBreakerScope ActorCircuitBreakerScope
}

Policy used before an actor is locked. It does not include a timeout as we want to wait forever for the actor.

type BuiltInPolicyName added in v1.8.0

type BuiltInPolicyName string

type ComponentDirection added in v1.10.0

type ComponentDirection string

type ComponentPolicy added in v1.9.0

type ComponentPolicy struct {
	// contains filtered or unexported fields
}

type ComponentPolicyNames

type ComponentPolicyNames struct {
	Inbound  PolicyNames
	Outbound PolicyNames
}

ComponentPolicyNames contains the policies for component input and output.

type ComponentType added in v1.9.0

type ComponentType string

type DefaultPolicyTemplate added in v1.9.0

type DefaultPolicyTemplate string

type EndpointPolicy added in v1.9.0

type EndpointPolicy struct{}

type NoOp

type NoOp struct{}

NoOp is a true bypass implementation of `Provider`.

func (NoOp) ActorPostLockPolicy

func (NoOp) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition

ActorPostLockPolicy returns a NoOp policy definition for an actor instance.

func (NoOp) ActorPreLockPolicy

func (NoOp) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition

ActorPreLockPolicy returns a NoOp policy definition for an actor instance.

func (NoOp) BuiltInPolicy added in v1.8.0

func (NoOp) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition

BuildInPolicy returns a NoOp policy definition for a built-in policy.

func (NoOp) ComponentInboundPolicy

func (NoOp) ComponentInboundPolicy(name string, componentName ComponentType) *PolicyDefinition

ComponentInboundPolicy returns a NoOp inbound policy definition for a component.

func (NoOp) ComponentOutboundPolicy

func (NoOp) ComponentOutboundPolicy(name string, componentName ComponentType) *PolicyDefinition

ComponentOutboundPolicy returns a NoOp outbound policy definition for a component.

func (NoOp) EndpointPolicy

func (NoOp) EndpointPolicy(service string, endpoint string) *PolicyDefinition

EndpointPolicy returns a NoOp policy definition for a service endpoint.

func (NoOp) PolicyDefined added in v1.8.0

func (NoOp) PolicyDefined(target string, policyType PolicyType) bool

func (NoOp) RoutePolicy

func (NoOp) RoutePolicy(name string) *PolicyDefinition

RoutePolicy returns a NoOp policy definition for a route.

type Operation

type Operation[T any] func(ctx context.Context) (T, error)

Operation represents a function to invoke with resiliency policies applied.

type PolicyDefinition added in v1.10.0

type PolicyDefinition struct {
	// contains filtered or unexported fields
}

PolicyDefinition contains a definition for a policy, used to create a Runner.

func NewPolicyDefinition added in v1.10.0

func NewPolicyDefinition(log logger.Logger, name string, t time.Duration, r *retry.Config, cb *breaker.CircuitBreaker) *PolicyDefinition

NewPolicyDefinition returns a PolicyDefinition object with the given parameters.

func (PolicyDefinition) HasRetries added in v1.10.0

func (p PolicyDefinition) HasRetries() bool

HasRetries returns true if the policy is configured to have more than 1 retry.

func (PolicyDefinition) String added in v1.10.0

func (p PolicyDefinition) String() string

String implements fmt.Stringer and is used for debugging.

type PolicyNames

type PolicyNames struct {
	Timeout        string
	Retry          string
	CircuitBreaker string
}

PolicyNames contains the policy names for a timeout, retry, and circuit breaker. Empty values mean that no policy is configured.

type PolicyType added in v1.8.0

type PolicyType interface {
	// contains filtered or unexported methods
}

PolicyTypes have to return an array of their possible levels. Ex. [App], Actor, [Component, Inbound|Outbound, ComponentType]

type PolicyTypeName added in v1.9.0

type PolicyTypeName string

type Provider

type Provider interface {
	// EndpointPolicy returns the policy for a service endpoint.
	EndpointPolicy(service string, endpoint string) *PolicyDefinition
	// ActorPolicy returns the policy for an actor instance to be used before the lock is acquired.
	ActorPreLockPolicy(actorType string, id string) *PolicyDefinition
	// ActorPolicy returns the policy for an actor instance to be used after the lock is acquired.
	ActorPostLockPolicy(actorType string, id string) *PolicyDefinition
	// ComponentOutboundPolicy returns the outbound policy for a component.
	ComponentOutboundPolicy(name string, componentType ComponentType) *PolicyDefinition
	// ComponentInboundPolicy returns the inbound policy for a component.
	ComponentInboundPolicy(name string, componentType ComponentType) *PolicyDefinition
	// BuiltInPolicy are used to replace existing retries in Dapr which may not bind specifically to one of the above categories.
	BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition
	// PolicyDefined returns true if there's policy that applies to the target.
	PolicyDefined(target string, policyType PolicyType) (exists bool)
}

Provider is the interface for returning a `*PolicyDefinition` for the various resiliency scenarios in the runtime.

type Resiliency

type Resiliency struct {
	// contains filtered or unexported fields
}

Resiliency encapsulates configuration for timeouts, retries, and circuit breakers. It maps services, actors, components, and routes to each of these configurations. Lastly, it maintains circuit breaker state across invocations.

func FromConfigurations

func FromConfigurations(log logger.Logger, c ...*resiliencyV1alpha.Resiliency) *Resiliency

FromConfigurations creates a resiliency provider and decodes the configurations from `c`.

func New

func New(log logger.Logger) *Resiliency

New creates a new Resiliency.

func (*Resiliency) ActorPostLockPolicy

func (r *Resiliency) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition

ActorPostLockPolicy returns the policy for an actor instance to be used after an actor lock is acquired.

func (*Resiliency) ActorPreLockPolicy

func (r *Resiliency) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition

ActorPreLockPolicy returns the policy for an actor instance to be used before an actor lock is acquired.

func (*Resiliency) BuiltInPolicy added in v1.8.0

func (r *Resiliency) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition

BuiltInPolicy returns a policy that represents a specific built-in retry scenario.

func (*Resiliency) ComponentInboundPolicy

func (r *Resiliency) ComponentInboundPolicy(name string, componentType ComponentType) *PolicyDefinition

ComponentInboundPolicy returns the inbound policy for a component.

func (*Resiliency) ComponentOutboundPolicy

func (r *Resiliency) ComponentOutboundPolicy(name string, componentType ComponentType) *PolicyDefinition

ComponentOutboundPolicy returns the outbound policy for a component.

func (*Resiliency) DecodeConfiguration

func (r *Resiliency) DecodeConfiguration(c *resiliencyV1alpha.Resiliency) error

DecodeConfiguration reads in a single resiliency configuration.

func (*Resiliency) EndpointPolicy

func (r *Resiliency) EndpointPolicy(app string, endpoint string) *PolicyDefinition

EndpointPolicy returns the policy for a service endpoint.

func (*Resiliency) PolicyDefined added in v1.8.0

func (r *Resiliency) PolicyDefined(target string, policyType PolicyType) (exists bool)

PolicyDefined returns true if there's policy that applies to the target.

type Runner

type Runner[T any] func(oper Operation[T]) (T, error)

Runner represents a function to invoke `oper` with resiliency policies applied.

func NewRunner added in v1.10.0

func NewRunner[T any](ctx context.Context, def *PolicyDefinition) Runner[T]

NewRunner returns a policy runner that encapsulates the configured resiliency policies in a simple execution wrapper. We can't implement this as a method of the Resiliency struct because we can't yet use generic methods in structs.

func NewRunnerWithOptions added in v1.10.0

func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opts RunnerOpts[T]) Runner[T]

NewRunnerWithOptions is like NewRunner but allows setting additional options

Example (Accumulator)

Example of using NewRunnerWithOptions with an Accumulator function

// Example polocy definition
policyDef := &PolicyDefinition{
	log:  testLog,
	name: "retry",
	t:    10 * time.Millisecond,
	r:    &retry.Config{MaxRetries: 6},
}

// Handler function
val := atomic.Int32{}
fn := func(ctx context.Context) (int32, error) {
	v := val.Add(1)
	// When the value is "2", we add a sleep that will trip the timeout
	// As a consequence, the accumulator is not called, so the value "2" should not be included in the result
	if v == 2 {
		time.Sleep(50 * time.Millisecond)
	}
	// Make this method be executed 4 times in total
	if v <= 3 {
		return v, errors.New("continue")
	}
	return v, nil
}

// Invoke the policy and collect all received values
received := []int32{}
policy := NewRunnerWithOptions(context.Background(), policyDef, RunnerOpts[int32]{
	Accumulator: func(i int32) {
		// Safe to use non-atomic operations in the next line because "received" is not used in the operation function ("fn")
		received = append(received, i)
	},
})

// When using accumulators, the result only contains the last value and is normally ignored
res, err := policy(fn)

fmt.Println(res, err, received)
Output:

4 <nil> [1 3 4]
Example (Disposer)

Example of using NewRunnerWithOptions with a Disposer function

// Example polocy definition
policyDef := &PolicyDefinition{
	log:  testLog,
	name: "retry",
	t:    10 * time.Millisecond,
	r:    &retry.Config{MaxRetries: 6},
}

// Handler function
counter := atomic.Int32{}
fn := func(ctx context.Context) (int32, error) {
	v := counter.Add(1)
	// When the value is "2", we add a sleep that will trip the timeout
	if v == 2 {
		time.Sleep(50 * time.Millisecond)
	}
	if v <= 3 {
		return v, errors.New("continue")
	}
	return v, nil
}

// Invoke the policy and collect all disposed values
disposerCalled := make(chan int32, 5)
policy := NewRunnerWithOptions(context.Background(), policyDef, RunnerOpts[int32]{
	Disposer: func(val int32) {
		// Dispose the object as needed, for example calling things like:
		// val.Close()

		// Use a buffered channel here because the disposer method should not block
		disposerCalled <- val
	},
})

// Execute the policy
res, err := policy(fn)

// The disposer should be 3 times called with values 1, 2, 3
disposed := []int32{}
for i := 0; i < 3; i++ {
	disposed = append(disposed, <-disposerCalled)
}
slices.Sort(disposed)

fmt.Println(res, err, disposed)
Output:

4 <nil> [1 2 3]

type RunnerOpts added in v1.10.0

type RunnerOpts[T any] struct {
	// The disposer is a function which is invoked when the operation fails, including due to timing out in a background goroutine. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types).
	// The disposer can be used to perform cleanup tasks on values returned by the operation function that would otherwise leak (because they're not returned by the result of the runner).
	Disposer func(T)

	// The accumulator is a function that is invoked synchronously when an operation completes without timing out, whether successfully or not. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types).
	// The accumulator can be used to collect intermediate results and not just the final ones, for example in case of working with batched operations.
	Accumulator func(T)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL