staleness

package
v0.113.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NowFunc = time.Now

We override how Now() is returned, so we can have deterministic tests

Functions

This section is empty.

Types

type PriorityQueue

type PriorityQueue interface {
	// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted
	Update(id identity.Stream, newPrio time.Time)
	// Peek will return the entry at the HEAD of the queue *without* removing it from the queue
	Peek() (identity.Stream, time.Time)
	// Pop will remove the entry at the HEAD of the queue and return it
	Pop() (identity.Stream, time.Time)
	// Len will return the number of entries in the queue
	Len() int
}

PriorityQueue represents a way to store entries sorted by their priority. Pop() will return the oldest entry of the set.

func NewPriorityQueue

func NewPriorityQueue() PriorityQueue

type Staleness

type Staleness[T any] struct {
	Max time.Duration
	// contains filtered or unexported fields
}

Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is older than the `max`

NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded environment, then it is the user's responsibility to properly serialize calls to Staleness methods

func NewStaleness

func NewStaleness[T any](max time.Duration, items streams.Map[T]) *Staleness[T]

func (*Staleness[T]) Clear added in v0.100.0

func (s *Staleness[T]) Clear()

func (*Staleness[T]) Delete added in v0.97.0

func (s *Staleness[T]) Delete(id identity.Stream)

func (*Staleness[T]) Evict added in v0.97.0

func (s *Staleness[T]) Evict() (identity.Stream, bool)

func (*Staleness[T]) ExpireOldEntries

func (s *Staleness[T]) ExpireOldEntries()

ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed.

func (*Staleness[T]) Items

func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool

Items returns an iterator function that in future go version can be used with range See: https://go.dev/wiki/RangefuncExperiment

func (*Staleness[T]) Len added in v0.97.0

func (s *Staleness[T]) Len() int

func (*Staleness[T]) Load

func (s *Staleness[T]) Load(id identity.Stream) (T, bool)

Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value

func (*Staleness[T]) Next added in v0.97.0

func (s *Staleness[T]) Next() time.Time

func (*Staleness[T]) Store

func (s *Staleness[T]) Store(id identity.Stream, v T) error

Store the given key value pair in the map, and update the pair's staleness value to "now"

type Tracker added in v0.111.0

type Tracker struct {
	// contains filtered or unexported fields
}

func NewTracker added in v0.111.0

func NewTracker() Tracker

func (Tracker) Collect added in v0.111.0

func (stale Tracker) Collect(max time.Duration) []identity.Stream

func (Tracker) Refresh added in v0.111.0

func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL