congestion

package
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RetriesExceeded = errors.New("retries exceeded")

Functions

This section is empty.

Types

type AIMD

type AIMD struct {
	Start         uint    `yaml:"start"`
	UpperBound    uint    `yaml:"upper_bound"`
	BackoffFactor float64 `yaml:"backoff_factor"`
}

type AIMDController

type AIMDController struct {
	// contains filtered or unexported fields
}

AIMDController implements the Additive-Increase/Multiplicative-Decrease algorithm which is used in TCP congestion avoidance. https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease

func NewAIMDController

func NewAIMDController(cfg Config) *AIMDController

func (*AIMDController) DeleteObject

func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error

func (*AIMDController) GetObject

func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)

func (*AIMDController) GetObjectRange added in v3.2.0

func (a *AIMDController) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error)

func (*AIMDController) IsObjectNotFoundErr

func (a *AIMDController) IsObjectNotFoundErr(err error) bool

func (*AIMDController) IsRetryableErr

func (a *AIMDController) IsRetryableErr(err error) bool

func (*AIMDController) List

func (*AIMDController) ObjectExists

func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bool, error)

func (*AIMDController) PutObject

func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.Reader) error

func (*AIMDController) Stop

func (a *AIMDController) Stop()

func (*AIMDController) Wrap

type Config

type Config struct {
	Enabled    bool             `yaml:"enabled"`
	Controller ControllerConfig `yaml:"controller"`
	Retry      RetrierConfig    `yaml:"retry"`
	Hedge      HedgerConfig     `yaml:"hedging"`
}

func (*Config) RegisterFlagsWithPrefix

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type Controller

type Controller interface {
	client.ObjectClient

	// Wrap wraps a given object store client and handles congestion against its backend service
	Wrap(client client.ObjectClient) client.ObjectClient
	// contains filtered or unexported methods
}

Controller handles congestion by: - determining if calls to object storage can be retried - defining and enforcing a back-pressure mechanism - centralising retries & hedging

func NewController

func NewController(cfg Config, logger log.Logger, metrics *Metrics) Controller

type ControllerConfig

type ControllerConfig struct {
	Strategy string `yaml:"strategy"`
	AIMD     AIMD   `yaml:"aimd"`
}

func (*ControllerConfig) RegisterFlags

func (c *ControllerConfig) RegisterFlags(f *flag.FlagSet)

func (*ControllerConfig) RegisterFlagsWithPrefix

func (c *ControllerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

nolint:goconst

type DoRequestFunc

type DoRequestFunc func(attempt int) (io.ReadCloser, int64, error)

type Hedger

type Hedger interface {
	// HTTPClient returns an HTTP client which is responsible for handling both the initial and all hedged requests.
	// It is recommended that retries are not hedged.
	// Bear in mind this function can be called several times, and should return the same client each time.
	HTTPClient(cfg hedging.Config) (*http.Client, error)
	// contains filtered or unexported methods
}

Hedger orchestrates request "hedging", which is the process of sending a new request when the old request is taking too long, and returning the response that is received first

type HedgerConfig

type HedgerConfig struct {
	hedging.Config

	Strategy string `yaml:"strategy"`
}

func (*HedgerConfig) RegisterFlags

func (c *HedgerConfig) RegisterFlags(f *flag.FlagSet)

func (*HedgerConfig) RegisterFlagsWithPrefix

func (c *HedgerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type IsRetryableErrFunc

type IsRetryableErrFunc func(err error) bool

type LimitedRetrier

type LimitedRetrier struct {
	// contains filtered or unexported fields
}

LimitedRetrier executes the initial request plus a configurable limit of subsequent retries. limit=0 is equivalent to NoopRetrier

func NewLimitedRetrier

func NewLimitedRetrier(cfg Config) *LimitedRetrier

func (*LimitedRetrier) Do

func (l *LimitedRetrier) Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(name string, cfg Config) *Metrics

NewMetrics creates metrics to be used for monitoring congestion control. It needs to accept a "name" because congestion control is used in object clients, and there can be many object clients creates for the same store (multiple period configs, etc). It is the responsibility of the caller to ensure uniqueness, otherwise a duplicate registration panic will occur.

func (Metrics) Unregister

func (m Metrics) Unregister()

type NoopController

type NoopController struct {
	// contains filtered or unexported fields
}

func NewNoopController

func NewNoopController(Config) *NoopController

func (*NoopController) DeleteObject

func (n *NoopController) DeleteObject(context.Context, string) error

func (*NoopController) GetObject

func (*NoopController) GetObjectRange added in v3.2.0

func (n *NoopController) GetObjectRange(context.Context, string, int64, int64) (io.ReadCloser, error)

func (*NoopController) IsObjectNotFoundErr

func (n *NoopController) IsObjectNotFoundErr(error) bool

func (*NoopController) IsRetryableErr

func (n *NoopController) IsRetryableErr(error) bool

func (*NoopController) ObjectExists

func (n *NoopController) ObjectExists(context.Context, string) (bool, error)

func (*NoopController) PutObject

func (*NoopController) Stop

func (n *NoopController) Stop()

func (*NoopController) Wrap

type NoopHedger

type NoopHedger struct{}

func NewNoopHedger

func NewNoopHedger(Config) *NoopHedger

func (*NoopHedger) HTTPClient

func (n *NoopHedger) HTTPClient(hedging.Config) (*http.Client, error)

type NoopRetrier

type NoopRetrier struct{}

func NewNoopRetrier

func NewNoopRetrier(Config) *NoopRetrier

func (*NoopRetrier) Do

func (n *NoopRetrier) Do(fn DoRequestFunc, _ IsRetryableErrFunc, _ func(), _ func()) (io.ReadCloser, int64, error)

type Retrier

type Retrier interface {
	// Do executes a given function which is expected to be a GetObject call, and its return signature matches that.
	// Any failed requests will be retried.
	//
	// count is the current request count; any positive number indicates retries, 0 indicates first attempt.
	Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error)
	// contains filtered or unexported methods
}

Retrier orchestrates requests & subsequent retries (if configured). NOTE: this only supports ObjectClient.GetObject calls right now.

type RetrierConfig

type RetrierConfig struct {
	Strategy string `yaml:"strategy"`
	Limit    int    `yaml:"limit"`
}

func (*RetrierConfig) RegisterFlags

func (c *RetrierConfig) RegisterFlags(f *flag.FlagSet)

func (*RetrierConfig) RegisterFlagsWithPrefix

func (c *RetrierConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL