Documentation ¶
Index ¶
- Variables
- type AIMD
- type AIMDController
- func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error
- func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
- func (a *AIMDController) IsObjectNotFoundErr(err error) bool
- func (a *AIMDController) IsRetryableErr(err error) bool
- func (a *AIMDController) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error)
- func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bool, error)
- func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
- func (a *AIMDController) Stop()
- func (a *AIMDController) Wrap(client client.ObjectClient) client.ObjectClient
- type Config
- type Controller
- type ControllerConfig
- type DoRequestFunc
- type Hedger
- type HedgerConfig
- type IsRetryableErrFunc
- type LimitedRetrier
- type Metrics
- type NoopController
- func (n *NoopController) DeleteObject(context.Context, string) error
- func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error)
- func (n *NoopController) IsObjectNotFoundErr(error) bool
- func (n *NoopController) IsRetryableErr(error) bool
- func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error)
- func (n *NoopController) ObjectExists(context.Context, string) (bool, error)
- func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error
- func (n *NoopController) Stop()
- func (n *NoopController) Wrap(c client.ObjectClient) client.ObjectClient
- type NoopHedger
- type NoopRetrier
- type Retrier
- type RetrierConfig
Constants ¶
This section is empty.
Variables ¶
var RetriesExceeded = errors.New("retries exceeded")
Functions ¶
This section is empty.
Types ¶
type AIMDController ¶
type AIMDController struct {
// contains filtered or unexported fields
}
AIMDController implements the Additive-Increase/Multiplicative-Decrease algorithm which is used in TCP congestion avoidance. https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
func NewAIMDController ¶
func NewAIMDController(cfg Config) *AIMDController
func (*AIMDController) DeleteObject ¶
func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error
func (*AIMDController) GetObject ¶
func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
func (*AIMDController) IsObjectNotFoundErr ¶
func (a *AIMDController) IsObjectNotFoundErr(err error) bool
func (*AIMDController) IsRetryableErr ¶
func (a *AIMDController) IsRetryableErr(err error) bool
func (*AIMDController) List ¶
func (a *AIMDController) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error)
func (*AIMDController) ObjectExists ¶
func (*AIMDController) PutObject ¶
func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
func (*AIMDController) Stop ¶
func (a *AIMDController) Stop()
func (*AIMDController) Wrap ¶
func (a *AIMDController) Wrap(client client.ObjectClient) client.ObjectClient
type Config ¶
type Config struct { Enabled bool `yaml:"enabled"` Controller ControllerConfig `yaml:"controller"` Retry RetrierConfig `yaml:"retry"` Hedge HedgerConfig `yaml:"hedging"` }
type Controller ¶
type Controller interface { client.ObjectClient // Wrap wraps a given object store client and handles congestion against its backend service Wrap(client client.ObjectClient) client.ObjectClient // contains filtered or unexported methods }
Controller handles congestion by: - determining if calls to object storage can be retried - defining and enforcing a back-pressure mechanism - centralising retries & hedging
func NewController ¶
func NewController(cfg Config, logger log.Logger, metrics *Metrics) Controller
type ControllerConfig ¶
func (*ControllerConfig) RegisterFlags ¶
func (c *ControllerConfig) RegisterFlags(f *flag.FlagSet)
func (*ControllerConfig) RegisterFlagsWithPrefix ¶
func (c *ControllerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
nolint:goconst
type DoRequestFunc ¶
type DoRequestFunc func(attempt int) (io.ReadCloser, int64, error)
type Hedger ¶
type Hedger interface { // HTTPClient returns an HTTP client which is responsible for handling both the initial and all hedged requests. // It is recommended that retries are not hedged. // Bear in mind this function can be called several times, and should return the same client each time. HTTPClient(cfg hedging.Config) (*http.Client, error) // contains filtered or unexported methods }
Hedger orchestrates request "hedging", which is the process of sending a new request when the old request is taking too long, and returning the response that is received first
type HedgerConfig ¶
func (*HedgerConfig) RegisterFlags ¶
func (c *HedgerConfig) RegisterFlags(f *flag.FlagSet)
func (*HedgerConfig) RegisterFlagsWithPrefix ¶
func (c *HedgerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type IsRetryableErrFunc ¶
type LimitedRetrier ¶
type LimitedRetrier struct {
// contains filtered or unexported fields
}
LimitedRetrier executes the initial request plus a configurable limit of subsequent retries. limit=0 is equivalent to NoopRetrier
func NewLimitedRetrier ¶
func NewLimitedRetrier(cfg Config) *LimitedRetrier
func (*LimitedRetrier) Do ¶
func (l *LimitedRetrier) Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error)
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
NewMetrics creates metrics to be used for monitoring congestion control. It needs to accept a "name" because congestion control is used in object clients, and there can be many object clients creates for the same store (multiple period configs, etc). It is the responsibility of the caller to ensure uniqueness, otherwise a duplicate registration panic will occur.
func (Metrics) Unregister ¶
func (m Metrics) Unregister()
type NoopController ¶
type NoopController struct {
// contains filtered or unexported fields
}
func NewNoopController ¶
func NewNoopController(Config) *NoopController
func (*NoopController) DeleteObject ¶
func (n *NoopController) DeleteObject(context.Context, string) error
func (*NoopController) GetObject ¶
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error)
func (*NoopController) IsObjectNotFoundErr ¶
func (n *NoopController) IsObjectNotFoundErr(error) bool
func (*NoopController) IsRetryableErr ¶
func (n *NoopController) IsRetryableErr(error) bool
func (*NoopController) List ¶
func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error)
func (*NoopController) ObjectExists ¶
func (*NoopController) PutObject ¶
func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error
func (*NoopController) Stop ¶
func (n *NoopController) Stop()
func (*NoopController) Wrap ¶
func (n *NoopController) Wrap(c client.ObjectClient) client.ObjectClient
type NoopHedger ¶
type NoopHedger struct{}
func NewNoopHedger ¶
func NewNoopHedger(Config) *NoopHedger
func (*NoopHedger) HTTPClient ¶
type NoopRetrier ¶
type NoopRetrier struct{}
func NewNoopRetrier ¶
func NewNoopRetrier(Config) *NoopRetrier
func (*NoopRetrier) Do ¶
func (n *NoopRetrier) Do(fn DoRequestFunc, _ IsRetryableErrFunc, _ func(), _ func()) (io.ReadCloser, int64, error)
type Retrier ¶
type Retrier interface { // Do executes a given function which is expected to be a GetObject call, and its return signature matches that. // Any failed requests will be retried. // // count is the current request count; any positive number indicates retries, 0 indicates first attempt. Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error) // contains filtered or unexported methods }
Retrier orchestrates requests & subsequent retries (if configured). NOTE: this only supports ObjectClient.GetObject calls right now.
type RetrierConfig ¶
func (*RetrierConfig) RegisterFlags ¶
func (c *RetrierConfig) RegisterFlags(f *flag.FlagSet)
func (*RetrierConfig) RegisterFlagsWithPrefix ¶
func (c *RetrierConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)