internal

package
v1.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package internal protects these types' concurrency primitives and other internals from accidental misuse.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AtomicMap

type AtomicMap[Key comparable, Value any] struct {
	// contains filtered or unexported fields
}

AtomicMap adds type safety around a sync.Map (which has atomic-like behavior), and:

  • implicitly constructs values as needed, not relying on zero values
  • simplifies the API quite a bit because very few methods are in use. in particular there is no "Store" currently because it is not needed.
  • tracks length (atomically, so values are only an estimate)

Due to length tracking, this is marginally more costly when modifying contents than "just" a type-safe sync.Map. It should only be used when length is needed.

func NewAtomicMap

func NewAtomicMap[Key comparable, Value any](create func(key Key) Value) *AtomicMap[Key, Value]

NewAtomicMap makes a simplified type-safe sync.Map that creates values as needed, and tracks length.

The `create` callback will be called when creating a new value, possibly multiple times, without synchronization. It must be concurrency safe and should return ASAP to reduce the window for storage races, so ideally it should be simple and non-blocking, or pulling from a pre-populated cache if not.

Due to length tracking, this is marginally more costly when modifying contents than "just" a type-safe sync.Map. It should only be used when length is needed.

func (*AtomicMap[Key, Value]) Delete

func (t *AtomicMap[Key, Value]) Delete(k Key)

Delete removes an entry from the map, and updates the length.

Like the underlying sync.Map.LoadAndDelete, this can be called concurrently with Range.

func (*AtomicMap[Key, Value]) Len

func (t *AtomicMap[Key, Value]) Len() int

Len returns the currently-known size of the collection. It cannot be guaranteed to be precise, as the collection may change at any time during or after this call.

In particular, Range may iterate over more or fewer entries.

func (*AtomicMap[Key, Value]) Load

func (t *AtomicMap[Key, Value]) Load(key Key) Value

Load will get the current Value for a Key, initializing it if necessary.

func (*AtomicMap[Key, Value]) Range

func (t *AtomicMap[Key, Value]) Range(f func(k Key, v Value) bool)

Range calls sync.Map.Range on the underlying sync.Map, and has the same semantics.

This can be used while concurrently modifying the map, and it may result in ranging over more or fewer entries than Len would imply.

type FallbackLimiter

type FallbackLimiter struct {
	// contains filtered or unexported fields
}

FallbackLimiter wraps a rate.Limiter with a fallback Limiter (i.e. a github.com/uber/cadence/common/quotas.Limiter) to use after a configurable number of failed updates.

Intended use is:

  • collect allowed vs rejected metrics (implicitly tracked by calling Allow())
  • periodically, the limiting host gathers all FallbackLimiter metrics and zeros them (with Collect())
  • this info is submitted to aggregating hosts, who compute new target RPS values
  • these new target values are used to adjust this ratelimiter (with Update(...))

During this sequence, a requested limit may not be returned by an aggregator for two major reasons, and will result in a FailedUpdate() call to shorten a "use fallback logic" fuse:

  • this limit is legitimately unused and no data exists for it
  • ring re-sharding led to losing track of the limit, and it is now owned by a host with insufficient data

To mitigate the impact of the second case, "insufficient data" responses from aggregating hosts (which are currently modeled as "no data") are temporarily ignored, and the previously-configured update is used. This gives the aggregating host time to fill in its data, and then the next cycle should use "real" values that match actual usage.

If no data has been returned for a sufficiently long time, the "main" ratelimit will be dropped, and the fallback limit will be used exclusively. This is intended as a safety fallback, e.g. during initial rollout/rollback and outage scenarios, normal use is not expected to rely on it.

func NewFallbackLimiter

func NewFallbackLimiter(fallback Limiter) *FallbackLimiter

func (*FallbackLimiter) Allow

func (b *FallbackLimiter) Allow() bool

Allow returns true if a request is allowed right now.

func (*FallbackLimiter) Collect

func (b *FallbackLimiter) Collect() (accepted int, rejected int, usingFallback bool)

Collect returns the current accepted/rejected values, and resets them to zero. Small bits of imprecise counting / time-bucketing due to concurrent limiting is expected and allowed, as it should be more than small enough in practice to not matter.

func (*FallbackLimiter) FailedUpdate

func (b *FallbackLimiter) FailedUpdate() (failures int)

FailedUpdate should be called when a limit fails to update from an aggregator, possibly implying some kind of problem, which may be unique to this limit.

After crossing a threshold of failures (currently 10), the fallback will be switched to.

func (*FallbackLimiter) Reset

func (b *FallbackLimiter) Reset()

Reset defers to the fallback until an update is received.

This is intended to be used when the current limit is no longer trustworthy for some reason, determined externally rather than from too many FailedUpdate calls.

func (*FallbackLimiter) Update

func (b *FallbackLimiter) Update(rps float64)

Update adjusts the underlying "main" ratelimit, and resets the fallback fuse.

type Limiter

type Limiter interface {
	Allow() bool
}

Limiter is a simplified version of github.com/uber/cadence/common/quotas.Limiter, for both simpler mocking and to remove the need to import it.

Wait and Reserve are intentionally excluded here. They are not currently needed, and it seems likely to be much more difficult to correctly track their usage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL