internal

package
v1.2.16-prerelease01 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package internal protects these types' concurrency primitives and other internals from accidental misuse.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewShadowedLimiter added in v1.2.12

func NewShadowedLimiter(primary, shadow quotas.Limiter) quotas.Limiter

NewShadowedLimiter mirrors all quotas.Limiter to its two wrapped limiters, but only returns results from / waits as long as the "primary" one (first arg).

This is intended for when you want to use one limit but also run a second limit at the same time for comparison (to decide before switching) or to warm data (to reduce spikes when switching).

Types

type AtomicMap

type AtomicMap[Key comparable, Value any] struct {
	// contains filtered or unexported fields
}

AtomicMap adds type safety around a sync.Map (which has atomic-like behavior), and:

  • implicitly constructs values as needed, not relying on zero values
  • simplifies the API quite a bit because very few methods are in use. in particular there is no "Store" currently because it is not needed.
  • tracks length (atomically, so values are only an estimate)

Due to length tracking, this is marginally more costly when modifying contents than "just" a type-safe sync.Map. It should only be used when length is needed.

func NewAtomicMap

func NewAtomicMap[Key comparable, Value any](create func(key Key) Value) *AtomicMap[Key, Value]

NewAtomicMap makes a simplified type-safe sync.Map that creates values as needed, and tracks length.

The `create` callback will be called when creating a new value, possibly multiple times, without synchronization. It must be concurrency safe and should return ASAP to reduce the window for storage races, so ideally it should be simple and non-blocking, or pulling from a pre-populated cache if not.

Due to length tracking, this is marginally more costly when modifying contents than "just" a type-safe sync.Map. It should only be used when length is needed.

func (*AtomicMap[Key, Value]) Delete

func (t *AtomicMap[Key, Value]) Delete(k Key)

Delete removes an entry from the map, and updates the length.

Like the underlying sync.Map.LoadAndDelete, this can be called concurrently with Range.

func (*AtomicMap[Key, Value]) Len

func (t *AtomicMap[Key, Value]) Len() int

Len returns the currently-known size of the collection. It cannot be guaranteed to be precise, as the collection may change at any time during or after this call.

In particular, Range may iterate over more or fewer entries.

func (*AtomicMap[Key, Value]) Load

func (t *AtomicMap[Key, Value]) Load(key Key) Value

Load will get the current Value for a Key, initializing it if necessary.

func (*AtomicMap[Key, Value]) Range

func (t *AtomicMap[Key, Value]) Range(f func(k Key, v Value) bool)

Range calls sync.Map.Range on the underlying sync.Map, and has the same semantics.

This can be used while concurrently modifying the map, and it may result in ranging over more or fewer entries than Len would imply.

type AtomicUsage added in v1.2.12

type AtomicUsage struct {
	// contains filtered or unexported fields
}

func (*AtomicUsage) Collect added in v1.2.12

func (a *AtomicUsage) Collect() UsageMetrics

func (*AtomicUsage) Count added in v1.2.12

func (a *AtomicUsage) Count(allowed bool)

type CountedLimiter added in v1.2.12

type CountedLimiter struct {
	// contains filtered or unexported fields
}

func NewCountedLimiter added in v1.2.12

func NewCountedLimiter(limiter quotas.Limiter) CountedLimiter

func (CountedLimiter) Allow added in v1.2.12

func (c CountedLimiter) Allow() bool

func (CountedLimiter) Collect added in v1.2.12

func (c CountedLimiter) Collect() UsageMetrics

func (CountedLimiter) Limit added in v1.2.13

func (c CountedLimiter) Limit() rate.Limit

func (CountedLimiter) Reserve added in v1.2.12

func (c CountedLimiter) Reserve() clock.Reservation

func (CountedLimiter) Wait added in v1.2.12

func (c CountedLimiter) Wait(ctx context.Context) error

type FallbackLimiter

type FallbackLimiter struct {
	// contains filtered or unexported fields
}

FallbackLimiter wraps a "primary" rate.Limiter with a "fallback" Limiter (i.e. a github.com/uber/cadence/common/quotas.Limiter) to use before the primary is fully ready, or after too many attempts to update the "primary" limit have failed.

Intended use is:

  • "primary" is the global-load-balanced ratelimiter, with weights updating every few seconds
  • "fallback" is the "rps / num-of-hosts" ratelimiters we use many other places, which do not need to exchange data to work
  • When "primary" is not yet initialized (no updates yet at startup) or the global-load-balancing system is broken (too many failures), this limiter internally switches to the "fallback" limiter until it recovers. Otherwise, "primary" is preferred.
  • both limiters are called all the time to keep their used-tokens roughly in sync (via shadowedLimiter), so they can be switched between without allowing bursts due to the "other" limit's token bucket filling up.

The owning global/collection.Collection uses this as follows:

  • every couple seconds, Collect() usage information from all limiters
  • submit this to aggregator hosts, which return how much weight to allow this host per limiter
  • per limiter:
  • if a weight was returned, Update(rps) it so the new value is used in the future
  • else, call FailedUpdate() to shorten the "use fallback" fuse, possibly switching to the fallback by doing so

During this sequence, a requested limit may not be returned by an aggregator for two major reasons, and will result in a FailedUpdate() call to shorten a "use fallback logic" fuse:

  1. this limit is legitimately unused (or idle too long) and no data exists for it
  2. ring re-sharding led to losing track of the limit, and it is now owned by a host with insufficient data

To mitigate the impact of the second case, missing data is temporarily ignored, and the previously-configured Update rate is used without any changes. This gives the aggregating host time to fill in its data, and a later cycle should return "real" values that match actual usage, Update-ing this limiter.

If no data has been returned for a sufficiently long time, the "primary" ratelimit will be ignored, and the fallback limit will be used exclusively. This is intended as a safety fallback, e.g. during initial rollout/rollback and outage scenarios, normal use is not expected to rely on it.

-----

Implementation notes:

  • atomics are used as values instead of pointers simply for data locality, which requires FallbackLimiter to be used as a pointer. `go vet -copylocks` should be enough to ensure this is done correctly, which is enforced by `make lint`.
  • the variety of atomics here is almost certainly NOT necessary for performance, it was just possible and simple enough to use, and it makes deadlocks trivially impossible. but rate.Limiter already use mutexes internally, so adding another to track counts / fallback decisions / etc should be entirely fine, if it becomes needed.

func NewFallbackLimiter

func NewFallbackLimiter(fallback quotas.Limiter) *FallbackLimiter

NewFallbackLimiter returns a quotas.Limiter that uses a simpler fallback when necessary, and attempts to keep both the fallback and the "real" limiter "warm" by mirroring calls between the two regardless of which is being used.

func (*FallbackLimiter) Allow

func (b *FallbackLimiter) Allow() bool

func (*FallbackLimiter) Collect

func (b *FallbackLimiter) Collect() (usage UsageMetrics, starting, failing bool)

Collect returns the current allowed/rejected values and resets them to zero (like CountedLimiter). it also returns info about if this limiter is currently starting up or in a "use fallback due to failures" mode. "starting" is expected any time a limit is new (or not recently used), "failing" may happen rarely but should not ever be a steady event unless something is broken.

func (*FallbackLimiter) FailedUpdate

func (b *FallbackLimiter) FailedUpdate() (failures int)

FailedUpdate should be called when a limit fails to update from an aggregator, possibly implying some kind of problem, which may be unique to this limit.

After crossing a threshold of failures (currently 10), the fallback will be switched to.

func (*FallbackLimiter) FallbackLimit added in v1.2.13

func (b *FallbackLimiter) FallbackLimit() rate.Limit

func (*FallbackLimiter) Limit added in v1.2.13

func (b *FallbackLimiter) Limit() rate.Limit

func (*FallbackLimiter) Reserve added in v1.2.12

func (b *FallbackLimiter) Reserve() clock.Reservation

func (*FallbackLimiter) Reset

func (b *FallbackLimiter) Reset()

Reset defers to the fallback limiter until an update is received.

This is intended to be used when the current limit is no longer trustworthy for some reason, determined externally rather than from too many FailedUpdate calls.

func (*FallbackLimiter) Update

func (b *FallbackLimiter) Update(lim rate.Limit)

Update adjusts the underlying "primary" ratelimit, and resets the fallback fuse. This implies switching to the "primary" limiter - if that is not desired, call Reset() immediately after.

func (*FallbackLimiter) Wait added in v1.2.12

func (b *FallbackLimiter) Wait(ctx context.Context) error

type UsageMetrics added in v1.2.12

type UsageMetrics struct {
	Allowed, Rejected, Idle int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL