Documentation ¶
Index ¶
- Constants
- func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error
- func WasRateLimited(err error) bool
- type AdaptiveRateLimitTracker
- type AdaptiveRateLimiter
- type AdaptiveRateLimiterConfig
- type Command
- type CreateEntityCommand
- type Decoder
- type DecoderF
- type Decoders
- type DefaultRateLimiter
- type DeleteEntityCommand
- type Dispatcher
- type EntityCreator
- type EntityDeleter
- type EntityManager
- type EntityMarshaller
- type EntityUpdater
- type LocalDispatcher
- type NoOpAdaptiveRateLimitTracker
- type NoOpAdaptiveRateLimiter
- type NoOpRateLimiter
- type RateLimitControl
- type RateLimiter
- type RateLimiterConfig
- type SyncSnapshotCommand
- type UpdateEntityCommand
- type Validatable
Constants ¶
const ( MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count" MetricCommandLimiterWorkTimer = "command.limiter.work_timer" DefaultLimiterSize = 100 MinLimiterSize = 10 DefaultAdaptiveRateLimiterEnabled = true DefaultAdaptiveRateLimiterMinWindowSize = 5 DefaultAdaptiveRateLimiterMaxWindowSize = 250 DefaultAdaptiveRateLimiterTimeout = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func LoadAdaptiveRateLimiterConfig ¶ added in v0.33.0
func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error
func WasRateLimited ¶ added in v0.32.2
Types ¶
type AdaptiveRateLimitTracker ¶ added in v0.33.0
type AdaptiveRateLimitTracker interface { RunRateLimited(label string) (RateLimitControl, error) RunRateLimitedF(label string, f func(control RateLimitControl) error) error IsRateLimited() bool }
An AdaptiveRateLimitTracker works similarly to an AdaptiveRateLimiter, except it just manages the rate limiting without actually running the work. Because it doesn't run the work itself, it has to account for the possibility that some work may never report as complete or failed. It thus has a configurable timeout at which point outstanding work will be marked as failed.
func NewAdaptiveRateLimitTracker ¶ added in v0.33.0
func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) AdaptiveRateLimitTracker
type AdaptiveRateLimiter ¶ added in v0.32.0
type AdaptiveRateLimiter interface {
RunRateLimited(f func() error) (RateLimitControl, error)
}
An AdaptiveRateLimiter allows running arbitrary, sequential operations with a limiter, so that only N operations can be queued to run at any given time. If the system is too busy, the rate limiter will return an ApiError indicating that the server is too busy.
The rate limiter returns a RateLimitControl, allow the calling code to indicate if the operation finished in time. If operations are timing out before the results are available, the rate limiter should allow fewer operations in, as they will likely time out before the results can be used.
The rate limiter doesn't have a set queue size, it has a window which can grow and shrink. When a timeout is signaled, using the RateLimitControl, it shrinks the window based on queue position of the timed out operation. For example, if an operation was queued at position 200, but the times out, we assume that we need to limit the queue size to something less than 200 for now.
The limiter will also reject already queued operations if the window size changes and the operation was queued at a position larger than the current window size.
The window size will slowly grow back towards the max as successes are noted in the RateLimitControl.
func NewAdaptiveRateLimiter ¶ added in v0.32.0
func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) AdaptiveRateLimiter
type AdaptiveRateLimiterConfig ¶ added in v0.32.0
type AdaptiveRateLimiterConfig struct { // Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting Enabled bool // MaxSize - the maximum window size to allow MaxSize uint32 // MinSize - the smallest window size to allow MinSize uint32 // WorkTimerMetric - the name of the timer metric for timing how long operations take to execute WorkTimerMetric string // QueueSize - the name of the gauge metric showing the current number of operations queued QueueSizeMetric string // WindowSizeMetric - the name of the metric show the current window size WindowSizeMetric string // Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to // have failed if it hasn't been marked completed yet, so that work slots aren't lost Timeout time.Duration }
AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter
func (*AdaptiveRateLimiterConfig) SetDefaults ¶ added in v0.33.0
func (self *AdaptiveRateLimiterConfig) SetDefaults()
type Command ¶
type Command interface { // Apply runs the commands Apply(ctx boltz.MutateContext) error // GetChangeContext returns the change context associated with the command GetChangeContext() *change.Context // Encode returns a serialized representation of the command Encode() ([]byte, error) }
Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination
type CreateEntityCommand ¶
type CreateEntityCommand[T models.Entity] struct { Context *change.Context Creator EntityCreator[T] Entity T PostCreateHook func(ctx boltz.MutateContext, entity T) error Flags uint32 }
func (*CreateEntityCommand[T]) Apply ¶
func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*CreateEntityCommand[T]) Encode ¶
func (self *CreateEntityCommand[T]) Encode() ([]byte, error)
func (*CreateEntityCommand[T]) GetChangeContext ¶
func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context
type Decoders ¶
type Decoders interface { Register(id int32, decoder Decoder) RegisterF(id int32, decoder DecoderF) Decode(data []byte) (Command, error) GetDecoder(id int32) Decoder Clear() }
func GetDefaultDecoders ¶
func GetDefaultDecoders() Decoders
func NewDecoders ¶
func NewDecoders() Decoders
type DefaultRateLimiter ¶ added in v0.31.0
type DefaultRateLimiter struct {
// contains filtered or unexported fields
}
func (*DefaultRateLimiter) GetQueueFillPct ¶ added in v0.33.0
func (self *DefaultRateLimiter) GetQueueFillPct() float64
func (*DefaultRateLimiter) RunRateLimited ¶ added in v0.31.0
func (self *DefaultRateLimiter) RunRateLimited(f func() error) error
type DeleteEntityCommand ¶
type DeleteEntityCommand struct { Context *change.Context Deleter EntityDeleter Id string }
func (*DeleteEntityCommand) Apply ¶
func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error
func (*DeleteEntityCommand) Encode ¶
func (self *DeleteEntityCommand) Encode() ([]byte, error)
func (*DeleteEntityCommand) GetChangeContext ¶
func (self *DeleteEntityCommand) GetChangeContext() *change.Context
type Dispatcher ¶
type Dispatcher interface { Dispatch(command Command) error IsLeaderOrLeaderless() bool GetPeers() map[string]channel.Channel GetRateLimiter() RateLimiter }
Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally
type EntityCreator ¶
type EntityCreator[T models.Entity] interface { EntityMarshaller[T] // ApplyCreate creates the entity described by the given command ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error }
EntityCreator instances can apply a create entity command to create entities of a given type
type EntityDeleter ¶
type EntityDeleter interface { GetEntityTypeId() string // ApplyDelete deletes the entity described by the given command ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error }
EntityDeleter instances can apply a delete entity command to delete entities of a given type
type EntityManager ¶
type EntityManager[T models.Entity] interface { EntityCreator[T] EntityUpdater[T] EntityDeleter }
EntityManager instances can handle create, update and delete entities of a specific type
type EntityMarshaller ¶
type EntityMarshaller[T any] interface { // GetEntityTypeId returns the entity type id. This is distinct from the Store entity id // which may be shared by types, such as service and router. The entity type is unique // for each type GetEntityTypeId() string // Marshall marshals the entity to a bytes encoded format Marshall(entity T) ([]byte, error) // Unmarshall unmarshalls the bytes back into an entity Unmarshall(bytes []byte) (T, error) }
EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type
type EntityUpdater ¶
type EntityUpdater[T models.Entity] interface { EntityMarshaller[T] // ApplyUpdate updates the entity described by the given command ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error }
EntityUpdater instances can apply an update entity command to update entities of a given type
type LocalDispatcher ¶
type LocalDispatcher struct { EncodeDecodeCommands bool Limiter RateLimiter }
LocalDispatcher should be used when running a non-clustered system
func (*LocalDispatcher) Dispatch ¶
func (self *LocalDispatcher) Dispatch(command Command) error
func (*LocalDispatcher) GetPeers ¶
func (self *LocalDispatcher) GetPeers() map[string]channel.Channel
func (*LocalDispatcher) GetRateLimiter ¶ added in v0.33.0
func (self *LocalDispatcher) GetRateLimiter() RateLimiter
func (*LocalDispatcher) IsLeaderOrLeaderless ¶
func (self *LocalDispatcher) IsLeaderOrLeaderless() bool
type NoOpAdaptiveRateLimitTracker ¶ added in v0.33.0
type NoOpAdaptiveRateLimitTracker struct{}
func (NoOpAdaptiveRateLimitTracker) IsRateLimited ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool
func (NoOpAdaptiveRateLimitTracker) RunRateLimited ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) RunRateLimited(string) (RateLimitControl, error)
func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF ¶ added in v0.33.0
func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control RateLimitControl) error) error
type NoOpAdaptiveRateLimiter ¶ added in v0.32.0
type NoOpAdaptiveRateLimiter struct{}
func (NoOpAdaptiveRateLimiter) RunRateLimited ¶ added in v0.32.0
func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (RateLimitControl, error)
type NoOpRateLimiter ¶ added in v0.31.0
type NoOpRateLimiter struct{}
func (NoOpRateLimiter) GetQueueFillPct ¶ added in v0.33.0
func (self NoOpRateLimiter) GetQueueFillPct() float64
func (NoOpRateLimiter) RunRateLimited ¶ added in v0.31.0
func (self NoOpRateLimiter) RunRateLimited(f func() error) error
type RateLimitControl ¶ added in v0.32.0
type RateLimitControl interface { // Success indicates the operation was a success Success() // Backoff indicates that we need to backoff Backoff() // Failed indicates the operation was not a success, but a backoff isn't required Failed() }
func NoOpRateLimitControl ¶ added in v0.33.0
func NoOpRateLimitControl() RateLimitControl
type RateLimiter ¶ added in v0.31.0
A RateLimiter allows running arbitrary, sequential operations with a limiter, so that only N operations can be queued to run at any given time. If the system is too busy, the rate limiter will return an ApiError indicating that the server is too busy
func NewRateLimiter ¶ added in v0.31.0
func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) RateLimiter
type RateLimiterConfig ¶ added in v0.31.0
type SyncSnapshotCommand ¶
type SyncSnapshotCommand struct { SnapshotId string Snapshot []byte SnapshotSink func(cmd *SyncSnapshotCommand) error }
func (*SyncSnapshotCommand) Apply ¶
func (self *SyncSnapshotCommand) Apply(boltz.MutateContext) error
func (*SyncSnapshotCommand) Encode ¶
func (self *SyncSnapshotCommand) Encode() ([]byte, error)
func (*SyncSnapshotCommand) GetChangeContext ¶
func (self *SyncSnapshotCommand) GetChangeContext() *change.Context
type UpdateEntityCommand ¶
type UpdateEntityCommand[T models.Entity] struct { Context *change.Context Updater EntityUpdater[T] Entity T UpdatedFields fields.UpdatedFields Flags uint32 }
func (*UpdateEntityCommand[T]) Apply ¶
func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*UpdateEntityCommand[T]) Encode ¶
func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)
func (*UpdateEntityCommand[T]) GetChangeContext ¶
func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context
type Validatable ¶
type Validatable interface {
Validate() error
}
Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called