rolling

package module
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2024 License: Apache-2.0 Imports: 6 Imported by: 1

README

rolling

A rolling window implementation for Go.

Go Reference

Installing

go get github.com/kevinconway/rolling/v3

Rolling Time Windows

Rolling time windows represent a view of data from the current point in time to some configurable duration in the past.

var p = rolling.NewTimePolicy[int](rolling.NewWindow[int](3000), time.Millisecond)
var start = time.Now()
for range time.Tick(time.Millisecond) {
  if time.Since(start) > 3*time.Second {
    break
  }
  p.Append(1)
}

The above creates a time window that contains 3,000 buckets where each bucket contains, at most, 1ms of recorded data. New values are always recorded in the most recent bucket and the oldest bucket is dropped in intervals of the bucket duration. In the above, for example, the oldest bucket is dropped every millisecond and a new, most recent bucket is created to collect values.

This type of window policy is most useful for collecting real-time values such as request rates, error rates, and latency of operations.

Time Window Concurrency Safety

The NewTimePolicy constructor returns a lock free implementation that is not safe for concurrent use. Use NewTimePolicyConcurrent if you need to manage concurrent readers and writers.

Time Window With Manual Timestamp Selection

The time based rolling window defaults to using time.Now() for all timestamp generation but it is possible to provide your own timestamps using the AppendWithTimestamp and ReduceWithTimestamp variants:

var p = rolling.NewTimeWindow[int](rolling.NewWindow[int](3000), time.Millisecond)
var start = time.Now()
for range time.Tick(time.Millisecond) {
  if time.Since(start) > 3*time.Second {
    break
  }
  p.AppendWithTimestamp(1, start.Add(time.Millisecond))
}

Using custom timestamps is an advanced use case and requires careful usage. The target use case for this is when populating the window with a series of data that provides its own timestamps. An example of this is processing a strictly ordered event stream where each event has an embedded timestamp. You should not use both the standard and WithTimestamp method variants for the same window. The reason for this is that the time based policy is driven by the largest given timestamp such that the general expectation is for timestamps to only increase in value. Mixing custom and default timestamps can result in unexpected behavior or a corrupt internal state of the policy.

Out Of Order Timestamps

The risk of feeding out of order timestamps to a time based policy may be greater when using custom timestamps but there is still a risk when using the default timestamp generation. The current implementation leverages wall clock values to determine passage of time and when routing values to window buckets. There are multiple ways for wall clock time to run backwards depending on system configuration and load on the system. Here's how the time policy handles different ordering conditions:

  • If the timestamp is older than the oldest bucket in the window then it is discarded.
  • If the timestamp is within the current window then it is added to the appropriate bucket.
  • If the timestamp is in the future compared to the current window then the future timestamp becomes the new leading edge of the window and any existing buckets that are now outside the window are discarded.

In effect, time windows can only move forward and never backwards. If time runs backwards substantially then it is likely that all data points will be discarded until the timestamp values return to the range within the window. If time runs forward substantially then it will both discard all current values in the window and set a new anchor point. The window never adjusts backwards so a future timestamp effectively invalidates the window until the source of timestamps begins producing values within the window established by the future timestamp.

While these timing conditions are possible, a substantial forward or backwards move in wall clock time is uncommon and would likely have other negative effects on a system beyond this library.

Point Window

Rolling point windows represent a view of the most recently added data points.

var p = rolling.NewPointPolicy[int](rolling.NewWindow[int](5))

for x := 0; x < 5; x = x + 1 {
  p.Append(1)
}
p.Reduce(func(w Window[int]) int {
  fmt.Println(w) // [ [1] [1] [1] [1] [1] ]
  return 0
})
w.Append(5)
p.Reduce(func(w Window[int]) int {
  fmt.Println(w) // [ [5] [1] [1] [1] [1] ]
  return 0
})
w.Append(6)
p.Reduce(func(w Window[int]) int {
  fmt.Println(w) // [ [6] [5] [1] [1] [1] ]
  return 0
})

The above creates a window that always contains 5 data points. When the next value is appended it will overwrite the first value. The window continuously overwrites the oldest value with the newest to preserve the specified value count. This type of window is similar to a circular buffer and is useful for collecting data that have a known interval on which they are captured or for tracking data where time is not a factor.

Point Window Concurrency Safety

The NewPointPolicy constructor returns a lock free implementation that is not safe for concurrent use. Use NewPointPolicyConcurrent if you need to manage concurrent readers and writers.

Aggregating Windows

Each window policy exposes a method with the signature Reduce(func(ctx context.Context, w Window[T]) T) T that can be used to aggregate the data stored within the window. The method takes in a function that can compute the contents of the Window into a single value. For convenience, this package provides some common reductions:

p.Reduce(ctx, rolling.Count[int])
p.Reduce(ctx, rolling.Avg[int])
p.Reduce(ctx, rolling.Min[int])
p.Reduce(ctx, rolling.Max[int])
p.Reduce(ctx, rolling.Sum[int])
p.Reduce(ctx, rolling.Percentile[int](99.9))
p.Reduce(ctx, rolling.FastPercentile[int](99.9))

The Count, Avg, Min, Max, and Sum each perform their expected computation. The Percentile reduction can be constructed based on a target percentile.

For cases of very large datasets, the FastPercentile can be used as a replacement for the standard percentile calculation. This alternative version uses the p-squared algorithm for estimating the percentile by processing only one value at a time, in any order. The results are quite accurate but can vary from the actual percentile by a small amount. It's a tradeoff of accuracy for speed when calculating percentiles from large data sets. For more on the p-squared algorithm see: http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf.

Custom Aggregations

Any function that matches the form of func[T rolling.Numeric](context.Context, rolling.Window[T]) T may be given to the Reduce method of any window policy. The Window[T] type is a named version of [][]T where T may be any integer or float type. Calling len(window) will return the number of buckets. Each bucket is, itself, a slice of T where len(bucket) is the number of values measured within that bucket. Most aggregates will take the form of:

func MyAggregate[T rolling.Numeric](ctx context.Context, w rolling.Window[T]) T {
  for _, bucket := range w {
    for _, value := range bucket {
      // aggregate something
    }
  }
}

Performance Considerations

Bucket Size And Allocations

Generally, the window policies leverage fixed sized windows. For example, point policies only contain up to one data point per bucket in the window which means the memory usage is constant and there are no allocations for any number of data points added to a point policy.

Time based policies, however, can record an unbounded number of data points per bucket even if they maintain a fixed number of buckets in the window. To better amortize the cost of allocating new space within buckets to contain more data, time based policies will always re-use previous allocations rather than returning them to garbage collection. A time policy in use for extended periods of time tends to average down to zero allocations (as demonstrated in the benchmarks included with the project). To further reduce runtime allocations you can use the NewPreallocatedWindow(buckets, bucketSize) to generate a window where each bucket contains bucketSize number of spaces. If you match this value to your peak data ingestion rate per unit of time associated with each bucket then you can achieve zero allocation time policy usage.

Reduction Method Complexity

Most of the provided reduction methods (Sum, Avg, Min, etc.) run in O(n) time, where n is the number of data points in the window, and result in zero allocations. The Count reduction runs in O(n) but with n equal to the number of buckets. The Percentile reduction, however, attempts to calculate a perfectly accurate percentile which requires it to sort all values in the window. The sort algorithm complexity is documented in the sort.Stable documentation. The FastPercentile reduction is O(n) and results in zero allocations but it uses an estimation algorithm called p-squared that produces smaller errors when given larger datasets. See the p-squared paper for more details.

Cost Of Concurrency Safety

Another performance factor to consider is that concurrency safe window policies use a mutex lock to guard all reads and writes of the enclosed window. This means that reduction methods hold the lock for the duration of their run. A more complex than usual reduction or even a simple O(n) reduction over a very large dataset could mean blocking. If reductions begin to cause a performance issue then the options under the current design are limited to reducing the complexity of the reduction, reducing the size of the dataset being operated on, and amortizing the reduction cost by, for example, only running it in intervals of time or after a fixed amount of calls that get a cached result.

Fork Of github.com/asecurityteam/rolling

In the past, I was part of the original team that built and maintained github.com/asecurityteam/rolling which was used in service reliability and performance tooling. Since then, the team's priorities and tooling have changed.

I have new use cases for this library so I'm maintaining this fork. This version includes support for multiple numeric types, a small number of bug fixes, and a few small performance optimizations.

Drop-in Replacement Of github.com/asecurityteam/rolling

For convenience, I have created a v2.2.1 tag that matches the last published release of github.com/asecurityteam/rolling. The only difference is that I have updated the module path to github.com/kevinconway/rolling/v2. You should be able to replace github.com/asecurityteam/rolling with github.com/kevinconway/rolling/v2 in either your source code or your go.mod using a replace directive to pull from here instead.

There is no particular advantage to doing this, today, unless it's part of a gradual migration to v3 (the version documented in this file). If someone finds a severe enough bug then I may release an additional v2 patch.

Migrating From github.com/asecurityteam/rolling

This fork of the project has a nearly identical interface to the original except that this version uses Go generics to allow all numeric types in a window rather than only float64. To update, you should only need to add a type parameter to constructor methods and reductions. For example:

p := rolling.NewTimeWindow(rolling.NewWindow(3000), time.Millisecond)
p.Reduce(rolling.Sum)

becomes:

p := rolling.NewTimeWindow[int](rolling.NewWindow[int](3000), time.Millisecond)
p.Reduce(ctx, rolling.Sum[int])

Development

This project has no hard dependencies on any build tools other than Go. You should be able to run go test for any changes and see the results.

If you prefer, the project includes a Makefile with the following rules:

  • update - Update dependencies in the go.mod file.
  • bin - Download all optional build and test tools.
  • fmt - Run goimports on all Go source files.
  • test - Run all tests and create a test coverage record.
  • coverage - Generate a series of coverage reports from test records.

Contributors

For bugs or performance improvements, I welcome pull requests, issues, or comments. If you make a pull request then please be sure to add tests and run make fmt.

For new features, please start a discussion first by creating an issue and explaining the intended change.

License

This project is licensed under the Apache 2.0 license. See LICENSE.txt or http://www.apache.org/licenses/LICENSE-2.0 for the full terms.

This project is forked from https://github.com/asecurityteam/rolling. The original project's copyright attribution and license terms are:

Copyright @ 2017 Atlassian Pty Ltd

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

All files are marked with SPDX tags that both attribute the original copyright as well as identify the author(s) of any significant changes to those files.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Avg

func Avg[T Numeric](_ context.Context, w Window[T]) T

Avg the values within the window.

func Count

func Count[T Numeric](_ context.Context, w Window[T]) T

Count returns the number of elements in a window.

func FastPercentile

func FastPercentile[T Numeric](perc float64) func(_ context.Context, w Window[T]) T

FastPercentile implements the pSquare percentile estimation algorithm for calculating percentiles from streams of data using fixed memory allocations. Percentile values should be given the form 90.0 or 99.9.

func Max

func Max[T Numeric](_ context.Context, w Window[T]) T

Max the values within the window.

func Min

func Min[T Numeric](_ context.Context, w Window[T]) T

Min the values within the window.

func Percentile

func Percentile[T Numeric](perc float64) func(_ context.Context, w Window[T]) T

Percentile returns an aggregating function that computes the given percentile calculation for a window. Percentile values should be given the form 90.0 or 99.9.

func Sum

func Sum[T Numeric](_ context.Context, w Window[T]) T

Sum the values within the window.

Types

type Numeric

type Numeric interface {
	constraints.Integer | constraints.Float
}

type PointPolicy

type PointPolicy[T Numeric] struct {
	// contains filtered or unexported fields
}

PointPolicy is a rolling window policy that tracks the last N values inserted regardless of insertion time.

func NewPointPolicy

func NewPointPolicy[T Numeric](window Window[T]) *PointPolicy[T]

NewPointPolicy generates a Policy that operates on a rolling set of input points. The number of points is determined by the size of the given window. Each bucket will contain, at most, one data point when the window is full.

Note that this implementation is not safe for concurrent use. Use PointPolicyConcurrent if concurrency is required.

func (*PointPolicy[T]) Append

func (w *PointPolicy[T]) Append(ctx context.Context, value T)

Append a value to the window.

func (*PointPolicy[T]) Reduce

func (w *PointPolicy[T]) Reduce(ctx context.Context, f Reduction[T]) T

Reduce the window to a single value using a reduction function.

type PointPolicyConcurrent

type PointPolicyConcurrent[T Numeric] struct {
	// contains filtered or unexported fields
}

PointPolicyConcurrent is a concurrent safe wrapper for PointPolicy that uses a mutex to serialize calls.

func NewPointPolicyConcurrent

func NewPointPolicyConcurrent[T Numeric](window Window[T]) *PointPolicyConcurrent[T]

NewPointPolicyConcurrent generates a variant of PointPolicy that is safe for use in cases of concurrent reading and writing.

func (*PointPolicyConcurrent[T]) Append

func (self *PointPolicyConcurrent[T]) Append(ctx context.Context, value T)

func (*PointPolicyConcurrent[T]) Reduce

func (self *PointPolicyConcurrent[T]) Reduce(ctx context.Context, f Reduction[T]) T

type Reduction

type Reduction[T Numeric] func(ctx context.Context, w Window[T]) T

func MinimumPoints

func MinimumPoints[T Numeric](points int, r Reduction[T]) Reduction[T]

MinimumPoints is a wrapper for any other reduction that prevents the real reduction from running unless there are a sufficient number of data points.

type TimePolicy

type TimePolicy[T Numeric] struct {
	// contains filtered or unexported fields
}

TimePolicy is a rolling window policy that tracks the all values inserted over the last N period of time. Each bucket of a window represents a duration of time such that the window represents an amount of time equal to the sum. For example, 10 buckets in the window and a 100ms duration equal a 1s window. This policy rolls data out of the window by bucket such that it only contains data for the last total window.

func NewTimePolicy

func NewTimePolicy[T Numeric](window Window[T], bucketDuration time.Duration) *TimePolicy[T]

NewTimePolicy manages a window with rolling time durations. The given duration will be used to bucket data within the window. If data points are received entire windows aparts then the window will only contain a single data point. If one or more durations of the window are missed then they are zeroed out to keep the window consistent.

Note that this implementation is not safe for concurrent use. Use NewTimePolicyConcurrent for a safe implementation.

func (*TimePolicy[T]) Append

func (self *TimePolicy[T]) Append(ctx context.Context, value T)

Append a value to the window using a time bucketing strategy.

func (*TimePolicy[T]) AppendWithTimestamp

func (self *TimePolicy[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)

AppendWithTimestamp is the same as Append but with timestamp as parameter. Note that this method is only for advanced use cases where the clock source is external to the system accumulating data. Users of this method must ensure that each call is given a timestamp value that is valid for the active window of time.

Valid timestamps are technically any value greater than or equal to the window's TAIL which is calculated as (HEAD - (BUCKETS*BUCKET_DURATION)) where HEAD represents the largest timestamp that was previously given to AppendWithTimestamp or ReduceWithTimestamp. Values between HEAD and TAIL are placed within existing buckets. Values less than TAIL are ignore because those timestamps represent a time in the past that is no longer covered by the window. Values greater than HEAD permanently move the window forward in time.

func (*TimePolicy[T]) Reduce

func (self *TimePolicy[T]) Reduce(ctx context.Context, f Reduction[T]) T

Reduce the window to a single value using a reduction function.

func (*TimePolicy[T]) ReduceWithTimestamp

func (self *TimePolicy[T]) ReduceWithTimestamp(ctx context.Context, f Reduction[T], timestamp time.Time) T

ReduceWithTimestamp is the same as Reduce but takes the current timestamp as a parameter. The timestamp value must be valid according to the same rules described in the AppendWithTimestamp method. Invalid timestamp values result in a zero value being returned regardless of the reduction function or the current window contents.

Note that the given timestamp does not necessarily limit the view of the window. This is not a "reduce until" method. The given timestamp represents the point in time at which the window is being evaluated and it is used to roll any expired buckets out of the window before being reduced. This is not a read-only method and the effects of using a future time here are the same as documented in AppendWithTimestamp.

type TimePolicyConcurrent

type TimePolicyConcurrent[T Numeric] struct {
	// contains filtered or unexported fields
}

TimePolicyConcurrent is a concurrent safe wrapper for TimePolicy that uses a mutex to serialize calls.

func NewTimePolicyConcurrent

func NewTimePolicyConcurrent[T Numeric](window Window[T], bucketDuration time.Duration) *TimePolicyConcurrent[T]

NewTimePolicyConcurrent generates a variant of TimePolicy that is safe for concurrent use.

func (*TimePolicyConcurrent[T]) Append

func (self *TimePolicyConcurrent[T]) Append(ctx context.Context, value T)

func (*TimePolicyConcurrent[T]) AppendWithTimestamp

func (self *TimePolicyConcurrent[T]) AppendWithTimestamp(ctx context.Context, value T, timestamp time.Time)

func (*TimePolicyConcurrent[T]) Reduce

func (self *TimePolicyConcurrent[T]) Reduce(ctx context.Context, f Reduction[T]) T

func (*TimePolicyConcurrent[T]) ReduceWithTimestamp

func (self *TimePolicyConcurrent[T]) ReduceWithTimestamp(ctx context.Context, f Reduction[T], timestamp time.Time) T

type Window

type Window[T Numeric] [][]T

Window represents a bucketed set of data. It should be used in conjunction with a Policy to populate it with data using some windowing policy.

func NewPreallocatedWindow

func NewPreallocatedWindow[T Numeric](buckets int, bucketSize int) Window[T]

NewPreallocatedWindow creates a Window both with the given number of buckets and with a preallocated bucket size. This constructor may be used when the number of data points per-bucket can be estimated and/or when the desire is to allocate a large slice so that allocations do not happen as the Window is populated by a Policy.

func NewWindow

func NewWindow[T Numeric](buckets int) Window[T]

NewWindow creates a Window with the given number of buckets. The number of buckets is meaningful to each Policy. The Policy implementations will describe their use of buckets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL