Documentation ¶
Index ¶
- Constants
- Variables
- func DisposerCloser[T io.Closer](obj T)
- func GetAttempt(ctx context.Context) int32
- func IsCircuitBreakerError(err error) bool
- func IsTimeoutExeceeded(err error) bool
- func LoadKubernetesResiliency(log logger.Logger, runtimeID, namespace string, ...) []*resiliencyV1alpha.Resiliency
- func LoadLocalResiliency(log logger.Logger, runtimeID string, paths ...string) []*resiliencyV1alpha.Resiliency
- type ActorCircuitBreakerScope
- type ActorPolicies
- type ActorPolicy
- type ActorPostLockPolicyNames
- type ActorPreLockPolicyNames
- type BuiltInPolicyName
- type ComponentDirection
- type ComponentPolicy
- type ComponentPolicyNames
- type ComponentType
- type DefaultPolicyTemplate
- type EndpointPolicy
- type NoOp
- func (NoOp) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition
- func (NoOp) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition
- func (NoOp) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition
- func (NoOp) ComponentInboundPolicy(name string, componentName ComponentType) *PolicyDefinition
- func (NoOp) ComponentOutboundPolicy(name string, componentName ComponentType) *PolicyDefinition
- func (NoOp) EndpointPolicy(service string, endpoint string) *PolicyDefinition
- func (NoOp) PolicyDefined(target string, policyType PolicyType) bool
- func (NoOp) RoutePolicy(name string) *PolicyDefinition
- type Operation
- type PolicyDefinition
- type PolicyNames
- type PolicyType
- type PolicyTypeName
- type Provider
- type Resiliency
- func (r *Resiliency) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition
- func (r *Resiliency) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition
- func (r *Resiliency) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition
- func (r *Resiliency) ComponentInboundPolicy(name string, componentType ComponentType) *PolicyDefinition
- func (r *Resiliency) ComponentOutboundPolicy(name string, componentType ComponentType) *PolicyDefinition
- func (r *Resiliency) DecodeConfiguration(c *resiliencyV1alpha.Resiliency) error
- func (r *Resiliency) EndpointPolicy(app string, endpoint string) *PolicyDefinition
- func (r *Resiliency) PolicyDefined(target string, policyType PolicyType) (exists bool)
- type Runner
- type RunnerOpts
Examples ¶
Constants ¶
const ( BuiltInServiceRetries BuiltInPolicyName = "DaprBuiltInServiceRetries" BuiltInActorRetries BuiltInPolicyName = "DaprBuiltInActorRetries" BuiltInActorReminderRetries BuiltInPolicyName = "DaprBuiltInActorReminderRetries" BuiltInActorNotFoundRetries BuiltInPolicyName = "DaprBuiltInActorNotFoundRetries" BuiltInInitializationRetries BuiltInPolicyName = "DaprBuiltInInitializationRetries" DefaultRetryTemplate DefaultPolicyTemplate = "Default%sRetryPolicy" DefaultTimeoutTemplate DefaultPolicyTemplate = "Default%sTimeoutPolicy" DefaultCircuitBreakerTemplate DefaultPolicyTemplate = "Default%sCircuitBreakerPolicy" Endpoint PolicyTypeName = "App" Component PolicyTypeName = "Component" Actor PolicyTypeName = "Actor" Binding ComponentType = "Binding" Configuration ComponentType = "Configuration" Lock ComponentType = "Lock" Pubsub ComponentType = "Pubsub" Crypto ComponentType = "Crypto" Conversation ComponentType = "Conversation" Secretstore ComponentType = "Secretstore" Statestore ComponentType = "Statestore" Inbound ComponentDirection = "Inbound" Outbound ComponentDirection = "Outbound" )
Variables ¶
var ComponentInboundPolicy = ComponentPolicy{ // contains filtered or unexported fields }
var ComponentOutboundPolicy = ComponentPolicy{ // contains filtered or unexported fields }
Functions ¶
func DisposerCloser ¶
DisposerCloser is a Disposer function for RunnerOpts that invokes Close() on the object.
func GetAttempt ¶
GetAttempt returns the attempt number from a context Attempts are numbered from 1 onwards. If the context doesn't have an attempt number, returns 0
func IsCircuitBreakerError ¶
IsCircuitBreakerError returns true if the error is cicuit breaker open or too many requests in half-open state.
func IsTimeoutExeceeded ¶
IsTimeExceeded returns true if the context timeout has elapsed.
func LoadKubernetesResiliency ¶
func LoadKubernetesResiliency(log logger.Logger, runtimeID, namespace string, operatorClient operatorv1pb.OperatorClient) []*resiliencyV1alpha.Resiliency
LoadKubernetesResiliency loads resiliency configurations from the Kubernetes operator.
func LoadLocalResiliency ¶
func LoadLocalResiliency(log logger.Logger, runtimeID string, paths ...string) []*resiliencyV1alpha.Resiliency
LoadLocalResiliency loads resiliency configurations from local folders.
Types ¶
type ActorCircuitBreakerScope ¶
type ActorCircuitBreakerScope int
ActorCircuitBreakerScope indicates the scope of the circuit breaker for an actor.
const ( // ActorCircuitBreakerScopeType indicates the type scope (less granular). ActorCircuitBreakerScopeType ActorCircuitBreakerScope = iota // ActorCircuitBreakerScopeID indicates the type+id scope (more granular). ActorCircuitBreakerScopeID // ActorCircuitBreakerScopeBoth indicates both type and type+id are used for scope. ActorCircuitBreakerScopeBoth // Usage is TODO. )
func ParseActorCircuitBreakerScope ¶
func ParseActorCircuitBreakerScope(val string) (ActorCircuitBreakerScope, error)
ParseActorCircuitBreakerScope parses a string to a `ActorCircuitBreakerScope`.
type ActorPolicies ¶
type ActorPolicies struct { PreLockPolicies ActorPreLockPolicyNames PostLockPolicies ActorPostLockPolicyNames }
Actors have different behavior before and after locking.
type ActorPolicy ¶
type ActorPolicy struct{}
type ActorPostLockPolicyNames ¶
type ActorPostLockPolicyNames struct {
Timeout string
}
Policy used after an actor is locked. It only uses timeout as retry/circuit breaker is handled before locking.
type ActorPreLockPolicyNames ¶
type ActorPreLockPolicyNames struct { Retry string CircuitBreaker string CircuitBreakerScope ActorCircuitBreakerScope }
Policy used before an actor is locked. It does not include a timeout as we want to wait forever for the actor.
type BuiltInPolicyName ¶
type BuiltInPolicyName string
type ComponentDirection ¶
type ComponentDirection string
type ComponentPolicy ¶
type ComponentPolicy struct {
// contains filtered or unexported fields
}
type ComponentPolicyNames ¶
type ComponentPolicyNames struct { Inbound PolicyNames Outbound PolicyNames }
ComponentPolicyNames contains the policies for component input and output.
type ComponentType ¶
type ComponentType string
type DefaultPolicyTemplate ¶
type DefaultPolicyTemplate string
type EndpointPolicy ¶
type EndpointPolicy struct{}
type NoOp ¶
type NoOp struct{}
NoOp is a true bypass implementation of `Provider`.
func (NoOp) ActorPostLockPolicy ¶
func (NoOp) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition
ActorPostLockPolicy returns a NoOp policy definition for an actor instance.
func (NoOp) ActorPreLockPolicy ¶
func (NoOp) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition
ActorPreLockPolicy returns a NoOp policy definition for an actor instance.
func (NoOp) BuiltInPolicy ¶
func (NoOp) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition
BuiltInPolicy returns a NoOp policy definition for a built-in policy.
func (NoOp) ComponentInboundPolicy ¶
func (NoOp) ComponentInboundPolicy(name string, componentName ComponentType) *PolicyDefinition
ComponentInboundPolicy returns a NoOp inbound policy definition for a component.
func (NoOp) ComponentOutboundPolicy ¶
func (NoOp) ComponentOutboundPolicy(name string, componentName ComponentType) *PolicyDefinition
ComponentOutboundPolicy returns a NoOp outbound policy definition for a component.
func (NoOp) EndpointPolicy ¶
func (NoOp) EndpointPolicy(service string, endpoint string) *PolicyDefinition
EndpointPolicy returns a NoOp policy definition for a service endpoint.
func (NoOp) PolicyDefined ¶
func (NoOp) PolicyDefined(target string, policyType PolicyType) bool
func (NoOp) RoutePolicy ¶
func (NoOp) RoutePolicy(name string) *PolicyDefinition
RoutePolicy returns a NoOp policy definition for a route.
type PolicyDefinition ¶
type PolicyDefinition struct {
// contains filtered or unexported fields
}
PolicyDefinition contains a definition for a policy, used to create a Runner.
func NewPolicyDefinition ¶
func NewPolicyDefinition(log logger.Logger, name string, t time.Duration, r *retry.Config, cb *breaker.CircuitBreaker) *PolicyDefinition
NewPolicyDefinition returns a PolicyDefinition object with the given parameters.
func (PolicyDefinition) HasRetries ¶
func (p PolicyDefinition) HasRetries() bool
HasRetries returns true if the policy is configured to have more than 1 retry.
func (PolicyDefinition) String ¶
func (p PolicyDefinition) String() string
String implements fmt.Stringer and is used for debugging.
type PolicyNames ¶
PolicyNames contains the policy names for a timeout, retry, and circuit breaker. Empty values mean that no policy is configured.
type PolicyType ¶
type PolicyType interface {
// contains filtered or unexported methods
}
PolicyTypes have to return an array of their possible levels. Ex. [App], Actor, [Component, Inbound|Outbound, ComponentType]
type PolicyTypeName ¶
type PolicyTypeName string
type Provider ¶
type Provider interface { // EndpointPolicy returns the policy for a service endpoint. EndpointPolicy(service string, endpoint string) *PolicyDefinition // ActorPolicy returns the policy for an actor instance to be used before the lock is acquired. ActorPreLockPolicy(actorType string, id string) *PolicyDefinition // ActorPolicy returns the policy for an actor instance to be used after the lock is acquired. ActorPostLockPolicy(actorType string, id string) *PolicyDefinition // ComponentOutboundPolicy returns the outbound policy for a component. ComponentOutboundPolicy(name string, componentType ComponentType) *PolicyDefinition // ComponentInboundPolicy returns the inbound policy for a component. ComponentInboundPolicy(name string, componentType ComponentType) *PolicyDefinition // BuiltInPolicy are used to replace existing retries in Dapr which may not bind specifically to one of the above categories. BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition // PolicyDefined returns true if there's policy that applies to the target. PolicyDefined(target string, policyType PolicyType) (exists bool) }
Provider is the interface for returning a `*PolicyDefinition` for the various resiliency scenarios in the runtime.
type Resiliency ¶
type Resiliency struct {
// contains filtered or unexported fields
}
Resiliency encapsulates configuration for timeouts, retries, and circuit breakers. It maps services, actors, components, and routes to each of these configurations. Lastly, it maintains circuit breaker state across invocations.
func FromConfigurations ¶
func FromConfigurations(log logger.Logger, c ...*resiliencyV1alpha.Resiliency) *Resiliency
FromConfigurations creates a resiliency provider and decodes the configurations from `c`.
func (*Resiliency) ActorPostLockPolicy ¶
func (r *Resiliency) ActorPostLockPolicy(actorType string, id string) *PolicyDefinition
ActorPostLockPolicy returns the policy for an actor instance to be used after an actor lock is acquired.
func (*Resiliency) ActorPreLockPolicy ¶
func (r *Resiliency) ActorPreLockPolicy(actorType string, id string) *PolicyDefinition
ActorPreLockPolicy returns the policy for an actor instance to be used before an actor lock is acquired.
func (*Resiliency) BuiltInPolicy ¶
func (r *Resiliency) BuiltInPolicy(name BuiltInPolicyName) *PolicyDefinition
BuiltInPolicy returns a policy that represents a specific built-in retry scenario.
func (*Resiliency) ComponentInboundPolicy ¶
func (r *Resiliency) ComponentInboundPolicy(name string, componentType ComponentType) *PolicyDefinition
ComponentInboundPolicy returns the inbound policy for a component.
func (*Resiliency) ComponentOutboundPolicy ¶
func (r *Resiliency) ComponentOutboundPolicy(name string, componentType ComponentType) *PolicyDefinition
ComponentOutboundPolicy returns the outbound policy for a component.
func (*Resiliency) DecodeConfiguration ¶
func (r *Resiliency) DecodeConfiguration(c *resiliencyV1alpha.Resiliency) error
DecodeConfiguration reads in a single resiliency configuration.
func (*Resiliency) EndpointPolicy ¶
func (r *Resiliency) EndpointPolicy(app string, endpoint string) *PolicyDefinition
EndpointPolicy returns the policy for a service endpoint.
func (*Resiliency) PolicyDefined ¶
func (r *Resiliency) PolicyDefined(target string, policyType PolicyType) (exists bool)
PolicyDefined returns true if there's policy that applies to the target.
type Runner ¶
Runner represents a function to invoke `oper` with resiliency policies applied.
func NewRunner ¶
func NewRunner[T any](ctx context.Context, def *PolicyDefinition) Runner[T]
NewRunner returns a policy runner that encapsulates the configured resiliency policies in a simple execution wrapper. We can't implement this as a method of the Resiliency struct because we can't yet use generic methods in structs.
func NewRunnerWithOptions ¶
func NewRunnerWithOptions[T any](ctx context.Context, def *PolicyDefinition, opts RunnerOpts[T]) Runner[T]
NewRunnerWithOptions is like NewRunner but allows setting additional options
Example (Accumulator) ¶
Example of using NewRunnerWithOptions with an Accumulator function
// Example polocy definition policyDef := &PolicyDefinition{ log: testLog, name: "retry", t: 10 * time.Millisecond, r: &retry.Config{MaxRetries: 6}, } // Handler function val := atomic.Int32{} fn := func(ctx context.Context) (int32, error) { v := val.Add(1) // When the value is "2", we add a sleep that will trip the timeout // As a consequence, the accumulator is not called, so the value "2" should not be included in the result if v == 2 { time.Sleep(50 * time.Millisecond) } // Make this method be executed 4 times in total if v <= 3 { return v, errors.New("continue") } return v, nil } // Invoke the policy and collect all received values received := []int32{} policy := NewRunnerWithOptions(context.Background(), policyDef, RunnerOpts[int32]{ Accumulator: func(i int32) { // Safe to use non-atomic operations in the next line because "received" is not used in the operation function ("fn") received = append(received, i) }, }) // When using accumulators, the result only contains the last value and is normally ignored res, err := policy(fn) fmt.Println(res, err, received)
Output: 4 <nil> [1 3 4]
Example (Disposer) ¶
Example of using NewRunnerWithOptions with a Disposer function
// Example polocy definition policyDef := &PolicyDefinition{ log: testLog, name: "retry", t: 10 * time.Millisecond, r: &retry.Config{MaxRetries: 6}, } // Handler function counter := atomic.Int32{} fn := func(ctx context.Context) (int32, error) { v := counter.Add(1) // When the value is "2", we add a sleep that will trip the timeout if v == 2 { time.Sleep(50 * time.Millisecond) } if v <= 3 { return v, errors.New("continue") } return v, nil } // Invoke the policy and collect all disposed values disposerCalled := make(chan int32, 5) policy := NewRunnerWithOptions(context.Background(), policyDef, RunnerOpts[int32]{ Disposer: func(val int32) { // Dispose the object as needed, for example calling things like: // val.Close() // Use a buffered channel here because the disposer method should not block disposerCalled <- val }, }) // Execute the policy res, err := policy(fn) // The disposer should be 3 times called with values 1, 2, 3 disposed := []int32{} for range 3 { disposed = append(disposed, <-disposerCalled) } slices.Sort(disposed) fmt.Println(res, err, disposed)
Output: 4 <nil> [1 2 3]
type RunnerOpts ¶
type RunnerOpts[T any] struct { // The disposer is a function which is invoked when the operation fails, including due to timing out in a background goroutine. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types). // The disposer can be used to perform cleanup tasks on values returned by the operation function that would otherwise leak (because they're not returned by the result of the runner). Disposer func(T) // The accumulator is a function that is invoked synchronously when an operation completes without timing out, whether successfully or not. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types). // The accumulator can be used to collect intermediate results and not just the final ones, for example in case of working with batched operations. Accumulator func(T) }