Documentation ¶
Index ¶
- func Avg[T Numeric](_ context.Context, w Window[T]) T
- func Count[T Numeric](_ context.Context, w Window[T]) T
- func FastPercentile[T Numeric](perc float64) func(_ context.Context, w Window[T]) T
- func Max[T Numeric](_ context.Context, w Window[T]) T
- func Min[T Numeric](_ context.Context, w Window[T]) T
- func Percentile[T Numeric](perc float64) func(_ context.Context, w Window[T]) T
- func Sum[T Numeric](_ context.Context, w Window[T]) T
- type Numeric
- type PointPolicy
- type PointPolicyConcurrent
- type Reduction
- type TimePolicy
- func (self *TimePolicy[T]) Append(ctx context.Context, value T)
- func (self *TimePolicy[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)
- func (self *TimePolicy[T]) Reduce(ctx context.Context, f Reduction[T]) T
- func (self *TimePolicy[T]) ReduceWithTimestamp(ctx context.Context, f Reduction[T], timestamp time.Time) T
- type TimePolicyConcurrent
- func (self *TimePolicyConcurrent[T]) Append(ctx context.Context, value T)
- func (self *TimePolicyConcurrent[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)
- func (self *TimePolicyConcurrent[T]) Reduce(ctx context.Context, f Reduction[T]) T
- func (self *TimePolicyConcurrent[T]) ReduceWithTimestamp(ctx context.Context, f Reduction[T], timestamp time.Time) T
- type Window
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FastPercentile ¶
FastPercentile implements the pSquare percentile estimation algorithm for calculating percentiles from streams of data using fixed memory allocations. Percentile values should be given the form 90.0 or 99.9.
func Percentile ¶
Percentile returns an aggregating function that computes the given percentile calculation for a window. Percentile values should be given the form 90.0 or 99.9.
Types ¶
type Numeric ¶
type Numeric interface { constraints.Integer | constraints.Float }
type PointPolicy ¶
type PointPolicy[T Numeric] struct { // contains filtered or unexported fields }
PointPolicy is a rolling window policy that tracks the last N values inserted regardless of insertion time.
func NewPointPolicy ¶
func NewPointPolicy[T Numeric](window Window[T]) *PointPolicy[T]
NewPointPolicy generates a Policy that operates on a rolling set of input points. The number of points is determined by the size of the given window. Each bucket will contain, at most, one data point when the window is full.
Note that this implementation is not safe for concurrent use. Use PointPolicyConcurrent if concurrency is required.
func (*PointPolicy[T]) Append ¶
func (w *PointPolicy[T]) Append(ctx context.Context, value T)
Append a value to the window.
type PointPolicyConcurrent ¶
type PointPolicyConcurrent[T Numeric] struct { // contains filtered or unexported fields }
PointPolicyConcurrent is a concurrent safe wrapper for PointPolicy that uses a mutex to serialize calls.
func NewPointPolicyConcurrent ¶
func NewPointPolicyConcurrent[T Numeric](window Window[T]) *PointPolicyConcurrent[T]
NewPointPolicyConcurrent generates a variant of PointPolicy that is safe for use in cases of concurrent reading and writing.
func (*PointPolicyConcurrent[T]) Append ¶
func (self *PointPolicyConcurrent[T]) Append(ctx context.Context, value T)
type TimePolicy ¶
type TimePolicy[T Numeric] struct { // contains filtered or unexported fields }
TimePolicy is a rolling window policy that tracks the all values inserted over the last N period of time. Each bucket of a window represents a duration of time such that the window represents an amount of time equal to the sum. For example, 10 buckets in the window and a 100ms duration equal a 1s window. This policy rolls data out of the window by bucket such that it only contains data for the last total window.
func NewTimePolicy ¶
func NewTimePolicy[T Numeric](window Window[T], bucketDuration time.Duration) *TimePolicy[T]
NewTimePolicy manages a window with rolling time durations. The given duration will be used to bucket data within the window. If data points are received entire windows aparts then the window will only contain a single data point. If one or more durations of the window are missed then they are zeroed out to keep the window consistent.
Note that this implementation is not safe for concurrent use. Use NewTimePolicyConcurrent for a safe implementation.
func (*TimePolicy[T]) Append ¶
func (self *TimePolicy[T]) Append(ctx context.Context, value T)
Append a value to the window using a time bucketing strategy.
func (*TimePolicy[T]) AppendWithTimestamp ¶
func (self *TimePolicy[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)
AppendWithTimestamp is the same as Append but with timestamp as parameter. Note that this method is only for advanced use cases where the clock source is external to the system accumulating data. Users of this method must ensure that each call is given a timestamp value that is valid for the active window of time.
Valid timestamps are technically any value greater than or equal to the window's TAIL which is calculated as (HEAD - (BUCKETS*BUCKET_DURATION)) where HEAD represents the largest timestamp that was previously given to AppendWithTimestamp or ReduceWithTimestamp. Values between HEAD and TAIL are placed within existing buckets. Values less than TAIL are ignore because those timestamps represent a time in the past that is no longer covered by the window. Values greater than HEAD permanently move the window forward in time.
func (*TimePolicy[T]) Reduce ¶
func (self *TimePolicy[T]) Reduce(ctx context.Context, f Reduction[T]) T
Reduce the window to a single value using a reduction function.
func (*TimePolicy[T]) ReduceWithTimestamp ¶
func (self *TimePolicy[T]) ReduceWithTimestamp(ctx context.Context, f Reduction[T], timestamp time.Time) T
ReduceWithTimestamp is the same as Reduce but takes the current timestamp as a parameter. The timestamp value must be valid according to the same rules described in the AppendWithTimestamp method. Invalid timestamp values result in a zero value being returned regardless of the reduction function or the current window contents.
Note that the given timestamp does not necessarily limit the view of the window. This is not a "reduce until" method. The given timestamp represents the point in time at which the window is being evaluated and it is used to roll any expired buckets out of the window before being reduced. This is not a read-only method and the effects of using a future time here are the same as documented in AppendWithTimestamp.
type TimePolicyConcurrent ¶
type TimePolicyConcurrent[T Numeric] struct { // contains filtered or unexported fields }
TimePolicyConcurrent is a concurrent safe wrapper for TimePolicy that uses a mutex to serialize calls.
func NewTimePolicyConcurrent ¶
func NewTimePolicyConcurrent[T Numeric](window Window[T], bucketDuration time.Duration) *TimePolicyConcurrent[T]
NewTimePolicyConcurrent generates a variant of TimePolicy that is safe for concurrent use.
func (*TimePolicyConcurrent[T]) Append ¶
func (self *TimePolicyConcurrent[T]) Append(ctx context.Context, value T)
func (*TimePolicyConcurrent[T]) AppendWithTimestamp ¶
func (self *TimePolicyConcurrent[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)
func (*TimePolicyConcurrent[T]) Reduce ¶
func (self *TimePolicyConcurrent[T]) Reduce(ctx context.Context, f Reduction[T]) T
func (*TimePolicyConcurrent[T]) ReduceWithTimestamp ¶
type Window ¶
type Window[T Numeric] [][]T
Window represents a bucketed set of data. It should be used in conjunction with a Policy to populate it with data using some windowing policy.
func NewPreallocatedWindow ¶
NewPreallocatedWindow creates a Window both with the given number of buckets and with a preallocated bucket size. This constructor may be used when the number of data points per-bucket can be estimated and/or when the desire is to allocate a large slice so that allocations do not happen as the Window is populated by a Policy.