throttler

package
v0.21.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 21 Imported by: 17

Documentation

Overview

Package throttler provides a client-side, local throttler which is used to throttle (and actively pace) writes during the resharding process.

The throttler has two main goals: a) allow resharding data into an existing keyspace by throttling at a fixed rate b) ensure that the MySQL replicas do not become overloaded

To support b), the throttler constantly monitors the health of all replicas and reduces the allowed rate if the replication lag is above a certain threshold.

Index

Constants

View Source
const (
	// NotThrottled will be returned by Throttle() if the application is currently
	// not throttled.
	NotThrottled time.Duration = 0

	// ZeroRateNoProgess can be used to set maxRate to 0. In this case, the
	// throttler won't let any requests through until the rate is increased again.
	ZeroRateNoProgess = 0

	// MaxRateModuleDisabled can be set in NewThrottler() to disable throttling
	// by a fixed rate.
	MaxRateModuleDisabled = int64(math.MaxInt64)

	// InvalidMaxRate is a constant which will fail in a NewThrottler() call.
	// It should be used when returning maxRate in an error case.
	InvalidMaxRate = -1

	// ReplicationLagModuleDisabled can be set in NewThrottler() to disable
	// throttling based on the MySQL replication lag.
	ReplicationLagModuleDisabled = int64(math.MaxInt64)

	// InvalidMaxReplicationLag is a constant which will fail in a NewThrottler()
	// call. It should be used when returning maxReplicationlag in an error case.
	InvalidMaxReplicationLag = -1
)

Variables

View Source
var GlobalManager = newManager()

GlobalManager is the per-process manager which manages all active throttlers.

Functions

This section is empty.

Types

type Manager

type Manager interface {
	// MaxRates returns the max rate of all known throttlers.
	MaxRates() map[string]int64

	// SetMaxRate sets the max rate on all known throttlers.
	// It returns the names of the updated throttlers.
	SetMaxRate(rate int64) []string

	// GetConfiguration returns the configuration of the MaxReplicationlag module
	// for the given throttler or all throttlers if "throttlerName" is empty.
	GetConfiguration(throttlerName string) (map[string]*throttlerdatapb.Configuration, error)

	// UpdateConfiguration (partially) updates the configuration of the
	// MaxReplicationlag module for the given throttler or all throttlers if
	// "throttlerName" is empty.
	// If "copyZeroValues" is true, fields with zero values will be copied
	// as well.
	// The function returns the names of the updated throttlers.
	UpdateConfiguration(throttlerName string, configuration *throttlerdatapb.Configuration, copyZeroValues bool) ([]string, error)

	// ResetConfiguration resets the configuration of the MaxReplicationlag module
	// to the initial configuration for the given throttler or all throttlers if
	// "throttlerName" is empty.
	// The function returns the names of the updated throttlers.
	ResetConfiguration(throttlerName string) ([]string, error)
}

Manager defines the public interface of the throttler manager. It is used for example by the different RPC implementations.

type MaxRateModule

type MaxRateModule struct {
	// contains filtered or unexported fields
}

MaxRateModule allows to set and retrieve a maximum rate limit. It implements the Module interface.

func NewMaxRateModule

func NewMaxRateModule(maxRate int64) *MaxRateModule

NewMaxRateModule will create a new module instance and set the initial rate limit to maxRate.

func (*MaxRateModule) MaxRate

func (m *MaxRateModule) MaxRate() int64

MaxRate returns the current maximum allowed rate.

func (*MaxRateModule) SetMaxRate

func (m *MaxRateModule) SetMaxRate(rate int64)

SetMaxRate sets the current max rate and notifies the throttler about the rate update.

func (*MaxRateModule) Start

func (m *MaxRateModule) Start(rateUpdateChan chan<- struct{})

Start currently does nothing. It implements the Module interface.

func (*MaxRateModule) Stop

func (m *MaxRateModule) Stop()

Stop currently does nothing. It implements the Module interface.

type MaxReplicationLagModule

type MaxReplicationLagModule struct {
	// contains filtered or unexported fields
}

MaxReplicationLagModule calculates the maximum rate based on observed replication lag and throttler rate changes. It implements the Module interface. Unless specified, the fields below are not guarded by a Mutex because they are only accessed within the Go routine running ProcessRecords().

func NewMaxReplicationLagModule

func NewMaxReplicationLagModule(config MaxReplicationLagModuleConfig, actualRatesHistory *aggregatedIntervalHistory, nowFunc func() time.Time) (*MaxReplicationLagModule, error)

NewMaxReplicationLagModule will create a new module instance and set the initial max replication lag limit to maxReplicationLag.

func (*MaxReplicationLagModule) MaxRate

func (m *MaxReplicationLagModule) MaxRate() int64

MaxRate returns the current maximum allowed rate. It implements the Module interface.

func (*MaxReplicationLagModule) ProcessRecords

func (m *MaxReplicationLagModule) ProcessRecords()

ProcessRecords is the main loop, run in a separate Go routine, which reacts to any replication lag updates (recordings).

func (*MaxReplicationLagModule) RecordReplicationLag

func (m *MaxReplicationLagModule) RecordReplicationLag(t time.Time, th *discovery.TabletHealth)

RecordReplicationLag records the current replication lag for processing.

func (*MaxReplicationLagModule) Start

func (m *MaxReplicationLagModule) Start(rateUpdateChan chan<- struct{})

Start launches a Go routine which reacts on replication lag records. It implements the Module interface.

func (*MaxReplicationLagModule) Stop

func (m *MaxReplicationLagModule) Stop()

Stop blocks until the module's Go routine is stopped. It implements the Module interface.

type MaxReplicationLagModuleConfig

type MaxReplicationLagModuleConfig struct {
	*throttlerdatapb.Configuration
}

MaxReplicationLagModuleConfig stores all configuration parameters for MaxReplicationLagModule. Internally, the parameters are represented by a protobuf message. This message is also used to update the parameters.

func DefaultMaxReplicationLagModuleConfig

func DefaultMaxReplicationLagModuleConfig() MaxReplicationLagModuleConfig

DefaultMaxReplicationLagModuleConfig returns a copy of the default config object.

func NewMaxReplicationLagModuleConfig

func NewMaxReplicationLagModuleConfig(maxReplicationLag int64) MaxReplicationLagModuleConfig

NewMaxReplicationLagModuleConfig returns a default configuration where only "maxReplicationLag" is set.

func (MaxReplicationLagModuleConfig) AgeBadRateAfter

func (cfg MaxReplicationLagModuleConfig) AgeBadRateAfter() time.Duration

AgeBadRateAfter is a helper function which returns the respective protobuf field as native Go type.

func (MaxReplicationLagModuleConfig) Clone added in v0.11.0

func (MaxReplicationLagModuleConfig) MaxDurationBetweenIncreases

func (cfg MaxReplicationLagModuleConfig) MaxDurationBetweenIncreases() time.Duration

MaxDurationBetweenIncreases is a helper function which returns the respective protobuf field as native Go type.

func (MaxReplicationLagModuleConfig) MinDurationBetweenDecreases

func (cfg MaxReplicationLagModuleConfig) MinDurationBetweenDecreases() time.Duration

MinDurationBetweenDecreases is a helper function which returns the respective protobuf field as native Go type.

func (MaxReplicationLagModuleConfig) MinDurationBetweenIncreases

func (cfg MaxReplicationLagModuleConfig) MinDurationBetweenIncreases() time.Duration

MinDurationBetweenIncreases is a helper function which returns the respective protobuf field as native Go type.

func (MaxReplicationLagModuleConfig) SpreadBacklogAcross

func (cfg MaxReplicationLagModuleConfig) SpreadBacklogAcross() time.Duration

SpreadBacklogAcross is a helper function which returns the respective protobuf field as native Go type.

func (MaxReplicationLagModuleConfig) Verify

func (cfg MaxReplicationLagModuleConfig) Verify() error

Verify returns an error if the config is invalid.

type Module

type Module interface {
	// Start can be implemented to e.g. start own Go routines and initialize the
	// module.
	// rateUpdateChan must be used to notify the Throttler when the module's
	// maximum rate limit has changed and a subsequent MaxRate() call would return
	// a different value.
	Start(rateUpdateChan chan<- struct{})
	// Stop will free all resources and be called by Throttler.Close().
	Stop()

	// MaxRate returns the maximum allowed rate determined by the module.
	MaxRate() int64
}

Module specifies the API for a Decision Module which can tell the throttler to dynamically increase or decrease the current rate limit.

type Result added in v0.21.0

type Result struct {
	Now        time.Time
	RateChange rateChange

	OldState    state
	TestedState state
	NewState    state

	OldRate int64
	NewRate int64
	Reason  string

	CurrentRate      int64
	GoodOrBad        goodOrBadRate
	MemorySkipReason string
	HighestGood      int64
	LowestBad        int64

	LagRecordNow                 replicationLagRecord
	LagRecordBefore              replicationLagRecord
	PrimaryRate                  int64
	GuessedReplicationRate       int64
	GuessedReplicationBacklogOld int
	GuessedReplicationBacklogNew int
	// contains filtered or unexported fields
}

Result is generated by the MaxReplicationLag module for each processed "replicationLagRecord". It captures the details and the decision of the processing.

func (Result) AgeOfBeforeLag added in v0.21.0

func (r Result) AgeOfBeforeLag() string

func (Result) Alias added in v0.21.0

func (r Result) Alias() string

func (Result) LagBefore added in v0.21.0

func (r Result) LagBefore() string

func (Result) String added in v0.21.0

func (r Result) String() string

func (Result) TimeSinceLastRateChange added in v0.21.0

func (r Result) TimeSinceLastRateChange() string

type Throttler

type Throttler interface {
	Throttle(threadID int) time.Duration
	ThreadFinished(threadID int)
	Close()
	MaxRate() int64
	SetMaxRate(rate int64)
	RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
	GetConfiguration() *throttlerdatapb.Configuration
	UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
	ResetConfiguration()
	MaxLag(tabletType topodatapb.TabletType) uint32
	Log() []Result
}

Throttler defines the throttler interface.

func NewThrottler

func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (Throttler, error)

NewThrottler creates a new Throttler instance. Use the constants MaxRateModuleDisabled or ReplicationLagModuleDisabled if you want to disable parts of its functionality. maxRate will be distributed across all threadCount threads and must be >= threadCount. If it's lower, it will be automatically set to threadCount. maxRate can also be set to 0 which will effectively pause the user and constantly block until the rate has been increased again. unit refers to the type of entity you want to throttle e.g. "queries" or "transactions". name describes the Throttler instance and will be used by the webinterface.

func NewThrottlerFromConfig added in v0.17.0

func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error)

type ThrottlerImpl added in v0.21.0

type ThrottlerImpl struct {
	// contains filtered or unexported fields
}

ThrottlerImpl implements a client-side, thread-aware throttler. See the package doc for more information.

Calls of Throttle() and ThreadFinished() take threadID as parameter which is in the range [0, threadCount). (threadCount is set in NewThrottler().) NOTE: Trottle() and ThreadFinished() assume that *per thread* calls to them

are serialized and must not happen concurrently.

func (*ThrottlerImpl) Close added in v0.21.0

func (t *ThrottlerImpl) Close()

Close stops all modules and frees all resources. When Close() returned, the Throttler object must not be used anymore.

func (*ThrottlerImpl) GetConfiguration added in v0.21.0

func (t *ThrottlerImpl) GetConfiguration() *throttlerdatapb.Configuration

GetConfiguration returns the configuration of the MaxReplicationLag module.

func (*ThrottlerImpl) Log added in v0.21.0

func (t *ThrottlerImpl) Log() []Result

Log returns the most recent changes of the MaxReplicationLag module.

func (*ThrottlerImpl) MaxLag added in v0.21.0

func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32

MaxLag returns the max of all the last replication lag values seen across all tablets of the provided type, excluding ignored tablets.

func (*ThrottlerImpl) MaxRate added in v0.21.0

func (t *ThrottlerImpl) MaxRate() int64

MaxRate returns the current rate of the MaxRateModule.

func (*ThrottlerImpl) RecordReplicationLag added in v0.21.0

func (t *ThrottlerImpl) RecordReplicationLag(time time.Time, th *discovery.TabletHealth)

RecordReplicationLag must be called by users to report the "ts" tablet health data observed at "time". Note: After Close() is called, this method must not be called anymore.

func (*ThrottlerImpl) ResetConfiguration added in v0.21.0

func (t *ThrottlerImpl) ResetConfiguration()

ResetConfiguration resets the configuration of the MaxReplicationLag module to its initial settings.

func (*ThrottlerImpl) SetMaxRate added in v0.21.0

func (t *ThrottlerImpl) SetMaxRate(rate int64)

SetMaxRate updates the rate of the MaxRateModule.

func (*ThrottlerImpl) ThreadFinished added in v0.21.0

func (t *ThrottlerImpl) ThreadFinished(threadID int)

ThreadFinished marks threadID as finished and redistributes the thread's rate allotment across the other threads. After ThreadFinished() is called, Throttle() must not be called anymore.

func (*ThrottlerImpl) Throttle added in v0.21.0

func (t *ThrottlerImpl) Throttle(threadID int) time.Duration

Throttle returns a backoff duration which specifies for how long "threadId" should wait before it issues the next request. If the duration is zero, the thread is not throttled. If the duration is not zero, the thread must call Throttle() again after the backoff duration elapsed. The maximum value for the returned backoff is 1 second since the throttler internally operates on a per-second basis.

func (*ThrottlerImpl) UpdateConfiguration added in v0.21.0

func (t *ThrottlerImpl) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error

UpdateConfiguration updates the configuration of the MaxReplicationLag module.

Directories

Path Synopsis
Package grpcthrottlerserver contains the gRPC implementation of the server side of the throttler service.
Package grpcthrottlerserver contains the gRPC implementation of the server side of the throttler service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL