queue

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2024 License: Apache-2.0 Imports: 8 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddCountsRoute

func AddCountsRoute(lggr logr.Logger, mux *http.ServeMux, q CountReader)

Types

type Count added in v0.8.0

type Count struct {
	Concurrency int
	RPS         float64
}

type CountReader

type CountReader interface {
	// Current returns the current count of pending requests
	// for the given hostname
	Current() (*Counts, error)
}

CountReader represents the size of a virtual HTTP queue, possibly distributed across multiple HTTP server processes. It only can access the current size of the queue, not any other information about requests.

It is concurrency safe.

type Counter

type Counter interface {
	CountReader
	// Increase increases the queue size by delta for the given host.
	Increase(host string, delta int) error
	// Decrease decreases the queue size by delta for the given host.
	Decrease(host string, delta int) error
	// EnsureKey ensures that host is represented in this counter.
	EnsureKey(host string, window, granularity time.Duration)
	// UpdateBuckets update request backets if there are changes
	UpdateBuckets(host string, window, granularity time.Duration)
	// RemoveKey tries to remove the given host and its
	// associated counts from the queue. returns true if it existed,
	// false otherwise.
	RemoveKey(host string) bool
}

QueueCounter represents a virtual HTTP queue, possibly distributed across multiple HTTP server processes. It can only increase or decrease the size of the queue or read the current size of the queue, but not read or modify any other information about it.

Both the mutation and read functionality is concurrency safe, but the read functionality is point-in-time only

type Counts

type Counts struct {
	json.Marshaler
	json.Unmarshaler
	fmt.Stringer
	Counts map[string]Count
}

Count is a snapshot of the HTTP pending request (Concurrency) count and RPS for each host. This is a json.Marshaler, json.Unmarshaler, and fmt.Stringer implementation.

Use NewQueueCounts to create a new one of these.

func GetCounts

func GetCounts(
	httpCl *http.Client,
	interceptorURL url.URL,
) (*Counts, error)

GetQueueCounts issues an RPC call to get the queue counts from the given hostAndPort. Note that the hostAndPort should not end with a "/" and shouldn't include a path.

func NewCounts

func NewCounts() *Counts

NewQueueCounts creates a new empty QueueCounts struct

func (*Counts) Aggregate

func (q *Counts) Aggregate() Count

Aggregate returns the total count across all hosts

func (*Counts) MarshalJSON

func (q *Counts) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler

func (*Counts) String

func (q *Counts) String() string

String implements fmt.Stringer

func (*Counts) UnmarshalJSON

func (q *Counts) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler

type FakeCountReader

type FakeCountReader struct {
	// contains filtered or unexported fields
}

func (*FakeCountReader) Current

func (f *FakeCountReader) Current() (*Counts, error)

type FakeCounter

type FakeCounter struct {
	RetMap        map[string]Count
	ResizedCh     chan HostAndCount
	ResizeTimeout time.Duration
	// contains filtered or unexported fields
}

func NewFakeCounter

func NewFakeCounter() *FakeCounter

func (*FakeCounter) Current

func (f *FakeCounter) Current() (*Counts, error)

func (*FakeCounter) Decrease added in v0.8.0

func (f *FakeCounter) Decrease(host string, i int) error

func (*FakeCounter) EnsureKey added in v0.8.0

func (f *FakeCounter) EnsureKey(host string, _, _ time.Duration)

func (*FakeCounter) Increase added in v0.8.0

func (f *FakeCounter) Increase(host string, i int) error

func (*FakeCounter) RemoveKey added in v0.8.0

func (f *FakeCounter) RemoveKey(host string) bool

func (*FakeCounter) UpdateBuckets added in v0.8.0

func (f *FakeCounter) UpdateBuckets(_ string, _, _ time.Duration)

type HostAndCount

type HostAndCount struct {
	Host  string
	Count int
}

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory is a Counter implementation that holds the HTTP queue in memory only. Always use NewMemory to create one of these.

func NewMemory

func NewMemory() *Memory

NewMemoryQueue creates a new empty in-memory queue

func (*Memory) Current

func (r *Memory) Current() (*Counts, error)

Current returns the current size of the queue.

func (*Memory) Decrease added in v0.8.0

func (r *Memory) Decrease(host string, delta int) error

Decrease changes the size of the queue reducing delta

func (*Memory) EnsureKey added in v0.8.0

func (r *Memory) EnsureKey(host string, window, granularity time.Duration)

func (*Memory) Increase added in v0.8.0

func (r *Memory) Increase(host string, delta int) error

Increase changes the size of the queue adding delta

func (*Memory) RemoveKey added in v0.8.0

func (r *Memory) RemoveKey(host string) bool

func (*Memory) UpdateBuckets added in v0.8.0

func (r *Memory) UpdateBuckets(host string, window, granularity time.Duration)

type RequestsBuckets added in v0.8.0

type RequestsBuckets struct {
	// contains filtered or unexported fields
}

RequestsBuckets keeps buckets that have been collected at a certain time.

func NewRequestsBuckets added in v0.8.0

func NewRequestsBuckets(window, granularity time.Duration) *RequestsBuckets

NewRequestsBuckets generates a new RequestsBuckets with the given granularity.

func (*RequestsBuckets) IsEmpty added in v0.8.0

func (t *RequestsBuckets) IsEmpty(now time.Time) bool

IsEmpty returns true if no data has been recorded for the `window` period.

func (*RequestsBuckets) Record added in v0.8.0

func (t *RequestsBuckets) Record(now time.Time, value int)

Record adds a value with an associated time to the correct bucket. If this record would introduce a gap in the data, any intervening times between the last write and this one will be recorded as zero. If an entire window length has expired without data, the firstWrite time is reset, meaning the WindowAverage will be of a partial window until enough data is received to fill it again.

func (*RequestsBuckets) WindowAverage added in v0.8.0

func (t *RequestsBuckets) WindowAverage(now time.Time) float64

WindowAverage returns the average bucket value over the window.

If the first write was less than the window length ago, an average is returned over the partial window. For example, if firstWrite was 6 seconds ago, the average will be over these 6 seconds worth of buckets, even if the window is 60s. If a window passes with no data being received, the first write time is reset so this behaviour takes effect again.

Similarly, if we have not received recent data, the average is based on a partial window. For example, if the window is 60 seconds but we last received data 10 seconds ago, the window average will be the average over the first 50 seconds.

In other cases, for example if there are gaps in the data shorter than the window length, the missing data is assumed to be 0 and the average is over the whole window length inclusive of the missing data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL