Documentation ¶
Overview ¶
Package retry implements rpc retry
Index ¶
- Constants
- Variables
- func RegisterDDLStop(f DDLStopFunc)
- type BackOff
- type BackOffCfgKey
- type BackOffPolicy
- type BackOffType
- type BackupPolicy
- type CBPolicy
- type Container
- func (rc *Container) Dump() interface{}
- func (rc *Container) Init(mp map[string]Policy, rr *ShouldResultRetry) (err error)
- func (rc *Container) InitWithPolicies(methodPolicies map[string]Policy) (inited bool, err error)
- func (rc *Container) NotifyPolicyChange(method string, p Policy)
- func (rc *Container) WithRetryIfNeeded(ctx context.Context, callOptRetry Policy, rpcCall RPCCallFunc, ...) (recycleRI bool, err error)
- type DDLStopFunc
- type FailurePolicy
- func (p *FailurePolicy) DisableChainRetryStop()
- func (p *FailurePolicy) Equals(np *FailurePolicy) bool
- func (p FailurePolicy) IsErrorRetryNonNil() bool
- func (p FailurePolicy) IsRespRetryNonNil() bool
- func (p FailurePolicy) String() string
- func (p *FailurePolicy) WithDDLStop()
- func (p *FailurePolicy) WithFixedBackOff(fixMS int)
- func (p *FailurePolicy) WithMaxDurationMS(maxMS uint32)
- func (p *FailurePolicy) WithMaxRetryTimes(retryTimes int)
- func (p *FailurePolicy) WithRandomBackOff(minMS, maxMS int)
- func (p *FailurePolicy) WithRetryBreaker(errRate float64)
- func (p *FailurePolicy) WithRetrySameNode()
- func (p *FailurePolicy) WithSpecifiedResultRetry(rr *ShouldResultRetry)
- type Policy
- type RPCCallFunc
- type Retryer
- type ShouldResultRetry
- type StopPolicy
- type Type
Constants ¶
const ( // TransitKey is transited persistently when the req is a retry call. // When a request with this key, the downstream will not do retry(just support in kitex now). TransitKey = "RetryReq" // CtxReqOp is used to ignore RPC Request concurrent write CtxReqOp ctxKey = "K_REQ_OP" // CtxRespOp is used to ignore RPC Response concurrent write/read. CtxRespOp ctxKey = "K_RESP_OP" // Wildcard stands for 'any method' when associated with a retryer. Wildcard = "*" )
const ( OpNo int32 = iota OpDoing OpDone )
Req or Resp operation state, just be useful when concurrent write may happen
Variables ¶
var NoneBackOff = &noneBackOff{}
NoneBackOff means there's no backoff.
Functions ¶
Types ¶
type BackOff ¶
type BackOff interface {
Wait(callTimes int)
}
BackOff is the interface of back off implements
type BackOffCfgKey ¶
type BackOffCfgKey string
BackOffCfgKey represents the keys for BackOff.
const ( FixMSBackOffCfgKey BackOffCfgKey = "fix_ms" MinMSBackOffCfgKey BackOffCfgKey = "min_ms" MaxMSBackOffCfgKey BackOffCfgKey = "max_ms" InitialMSBackOffCfgKey BackOffCfgKey = "initial_ms" MultiplierBackOffCfgKey BackOffCfgKey = "multiplier" )
the keys of all back off configs
type BackOffPolicy ¶
type BackOffPolicy struct { BackOffType BackOffType `json:"backoff_type"` CfgItems map[BackOffCfgKey]float64 `json:"cfg_items,omitempty"` }
BackOffPolicy is the BackOff policy.
func (*BackOffPolicy) Equals ¶
func (p *BackOffPolicy) Equals(np *BackOffPolicy) bool
Equals to check if BackOffPolicy is equal.
type BackOffType ¶
type BackOffType string
BackOffType means the BackOff type.
const ( NoneBackOffType BackOffType = "none" FixedBackOffType BackOffType = "fixed" RandomBackOffType BackOffType = "random" )
all back off types
type BackupPolicy ¶
type BackupPolicy struct { RetryDelayMS uint32 `json:"retry_delay_ms"` StopPolicy StopPolicy `json:"stop_policy"` RetrySameNode bool `json:"retry_same_node"` }
BackupPolicy for backup request
func NewBackupPolicy ¶
func NewBackupPolicy(delayMS uint32) *BackupPolicy
NewBackupPolicy init backup request policy the param delayMS is suggested to set as TP99
func (*BackupPolicy) DisableChainRetryStop ¶
func (p *BackupPolicy) DisableChainRetryStop()
DisableChainRetryStop disables chain stop
func (*BackupPolicy) Equals ¶
func (p *BackupPolicy) Equals(np *BackupPolicy) bool
Equals to check if BackupPolicy is equal
func (BackupPolicy) String ¶
func (p BackupPolicy) String() string
String is used to print human readable debug info.
func (*BackupPolicy) WithMaxRetryTimes ¶
func (p *BackupPolicy) WithMaxRetryTimes(retryTimes int)
WithMaxRetryTimes default is 1, not include first call
func (*BackupPolicy) WithRetryBreaker ¶
func (p *BackupPolicy) WithRetryBreaker(errRate float64)
WithRetryBreaker sets error rate.
func (*BackupPolicy) WithRetrySameNode ¶
func (p *BackupPolicy) WithRetrySameNode()
WithRetrySameNode set retry to use the same node.
type CBPolicy ¶
type CBPolicy struct {
ErrorRate float64 `json:"error_rate"`
}
CBPolicy is the circuit breaker policy
type Container ¶
Container is a wrapper for Retryer.
func NewRetryContainer ¶
func NewRetryContainer() *Container
NewRetryContainer build Container that need to build circuit breaker and do circuit breaker statistic.
func NewRetryContainerWithCB ¶
func NewRetryContainerWithCB(cc *circuitbreak.Control, cp circuitbreaker.Panel) *Container
NewRetryContainerWithCB build Container that doesn't do circuit breaker statistic but get statistic result. Which is used in case that circuit breaker is enable. eg:
cbs := circuitbreak.NewCBSuite(circuitbreak.RPCInfo2Key) retryC := retry.NewRetryContainerWithCB(cbs.ServiceControl(), cbs.ServicePanel()) var opts []client.Option opts = append(opts, client.WithRetryContainer(retryC)) // enable service circuit breaker opts = append(opts, client.WithMiddleware(cbs.ServiceCBMW()))
func NewRetryContainerWithCBStat ¶
func NewRetryContainerWithCBStat(cc *circuitbreak.Control, cp circuitbreaker.Panel) *Container
NewRetryContainerWithCBStat build Container that need to do circuit breaker statistic. Which is used in case that the service CB key is customized. eg:
cbs := circuitbreak.NewCBSuite(YourGenServiceCBKeyFunc) retry.NewRetryContainerWithCBStat(cbs.ServiceControl(), cbs.ServicePanel())
func (*Container) Dump ¶
func (rc *Container) Dump() interface{}
Dump is used to show current retry policy
func (*Container) Init ¶
func (rc *Container) Init(mp map[string]Policy, rr *ShouldResultRetry) (err error)
Init to build Retryer with code config.
func (*Container) InitWithPolicies ¶
InitWithPolicies to init Retryer with methodPolicies Notice, InitWithPolicies is export func, the lock should be added inside
func (*Container) NotifyPolicyChange ¶
NotifyPolicyChange to receive policy when it changes
func (*Container) WithRetryIfNeeded ¶
func (rc *Container) WithRetryIfNeeded(ctx context.Context, callOptRetry Policy, rpcCall RPCCallFunc, ri rpcinfo.RPCInfo, request interface{}) (recycleRI bool, err error)
WithRetryIfNeeded to check if there is a retryer can be used and if current call can retry. When the retry condition is satisfied, use retryer to call
type DDLStopFunc ¶
type DDLStopFunc func(ctx context.Context, policy StopPolicy) (bool, string)
DDLStopFunc is the definition of ddlStop func
type FailurePolicy ¶
type FailurePolicy struct { StopPolicy StopPolicy `json:"stop_policy"` BackOffPolicy *BackOffPolicy `json:"backoff_policy,omitempty"` RetrySameNode bool `json:"retry_same_node"` ShouldResultRetry *ShouldResultRetry `json:"-"` }
FailurePolicy for failure retry
func NewFailurePolicy ¶
func NewFailurePolicy() *FailurePolicy
NewFailurePolicy init default failure retry policy
func NewFailurePolicyWithResultRetry ¶ added in v0.4.0
func NewFailurePolicyWithResultRetry(rr *ShouldResultRetry) *FailurePolicy
NewFailurePolicyWithResultRetry init failure retry policy with ShouldResultRetry
func (*FailurePolicy) DisableChainRetryStop ¶
func (p *FailurePolicy) DisableChainRetryStop()
DisableChainRetryStop disables chain stop
func (*FailurePolicy) Equals ¶
func (p *FailurePolicy) Equals(np *FailurePolicy) bool
Equals to check if FailurePolicy is equal
func (FailurePolicy) IsErrorRetryNonNil ¶ added in v0.4.0
func (p FailurePolicy) IsErrorRetryNonNil() bool
IsErrorRetryNonNil is used to judge if ErrorRetry is nil
func (FailurePolicy) IsRespRetryNonNil ¶ added in v0.4.0
func (p FailurePolicy) IsRespRetryNonNil() bool
IsRespRetryNonNil is used to judge if RespRetry is nil
func (FailurePolicy) String ¶
func (p FailurePolicy) String() string
String prints human readable information.
func (*FailurePolicy) WithDDLStop ¶
func (p *FailurePolicy) WithDDLStop()
WithDDLStop sets ddl stop to true.
func (*FailurePolicy) WithFixedBackOff ¶
func (p *FailurePolicy) WithFixedBackOff(fixMS int)
WithFixedBackOff set fixed time.
func (*FailurePolicy) WithMaxDurationMS ¶
func (p *FailurePolicy) WithMaxDurationMS(maxMS uint32)
WithMaxDurationMS include first call and retry call, if the total duration exceed limit then stop retry
func (*FailurePolicy) WithMaxRetryTimes ¶
func (p *FailurePolicy) WithMaxRetryTimes(retryTimes int)
WithMaxRetryTimes default is 2, not include first call
func (*FailurePolicy) WithRandomBackOff ¶
func (p *FailurePolicy) WithRandomBackOff(minMS, maxMS int)
WithRandomBackOff sets the random time.
func (*FailurePolicy) WithRetryBreaker ¶
func (p *FailurePolicy) WithRetryBreaker(errRate float64)
WithRetryBreaker sets the error rate.
func (*FailurePolicy) WithRetrySameNode ¶
func (p *FailurePolicy) WithRetrySameNode()
WithRetrySameNode sets to retry the same node.
func (*FailurePolicy) WithSpecifiedResultRetry ¶ added in v0.4.0
func (p *FailurePolicy) WithSpecifiedResultRetry(rr *ShouldResultRetry)
WithSpecifiedResultRetry sets customized ShouldResultRetry.
type Policy ¶
type Policy struct { Enable bool `json:"enable"` // 0 is failure retry, 1 is backup Type Type `json:"type"` // notice: only one retry policy can be enabled, which one depend on Policy.Type FailurePolicy *FailurePolicy `json:"failure_policy,omitempty"` BackupPolicy *BackupPolicy `json:"backup_policy,omitempty"` }
Policy contains all retry policies
func BuildBackupRequest ¶ added in v0.4.0
func BuildBackupRequest(p *BackupPolicy) Policy
BuildBackupRequest is used to build Policy with *BackupPolicy
func BuildFailurePolicy ¶ added in v0.4.0
func BuildFailurePolicy(p *FailurePolicy) Policy
BuildFailurePolicy is used to build Policy with *FailurePolicy
type RPCCallFunc ¶
type RPCCallFunc func(context.Context, Retryer) (rpcinfo rpcinfo.RPCInfo, resp interface{}, err error)
RPCCallFunc is the definition with wrap rpc call
type Retryer ¶
type Retryer interface { // AllowRetry to check if current request satisfy retry condition[eg: circuit, retry times == 0, chain stop, ddl]. // If not satisfy won't execute Retryer.Do and return the reason message // Execute anyway for the first time regardless of able to retry. AllowRetry(ctx context.Context) (msg string, ok bool) // ShouldRetry to check if retry request can be called, it is checked in retryer.Do. // If not satisfy will return the reason message ShouldRetry(ctx context.Context, err error, callTimes int, req interface{}, cbKey string) (msg string, ok bool) UpdatePolicy(policy Policy) error // Retry policy execute func. recycleRI is to decide if the firstRI can be recycled. Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpcinfo.RPCInfo, request interface{}) (recycleRI bool, err error) AppendErrMsgIfNeeded(err error, ri rpcinfo.RPCInfo, msg string) // Prepare to do something needed before retry call. Prepare(ctx context.Context, prevRI, retryRI rpcinfo.RPCInfo) Dump() map[string]interface{} Type() Type }
Retryer is the interface for Retry implements
func NewRetryer ¶
func NewRetryer(p Policy, r *ShouldResultRetry, cbC *cbContainer) (retryer Retryer, err error)
NewRetryer build a retryer with policy
type ShouldResultRetry ¶ added in v0.4.0
type ShouldResultRetry struct { ErrorRetry func(err error, ri rpcinfo.RPCInfo) bool RespRetry func(resp interface{}, ri rpcinfo.RPCInfo) bool }
ShouldResultRetry is used for specifying which error or resp need to be retried
func AllErrorRetry ¶ added in v0.4.0
func AllErrorRetry() *ShouldResultRetry
AllErrorRetry is common choice for ShouldResultRetry.
type StopPolicy ¶
type StopPolicy struct { MaxRetryTimes int `json:"max_retry_times"` MaxDurationMS uint32 `json:"max_duration_ms"` DisableChainStop bool `json:"disable_chain_stop"` DDLStop bool `json:"ddl_stop"` CBPolicy CBPolicy `json:"cb_policy"` }
StopPolicy is a group policies to decide when stop retry