Documentation ¶
Overview ¶
Package window implements windowing constructs. In the world of data processing on an unbounded stream, Windowing is a concept of grouping data using temporal boundaries. We use event-time to discover temporal boundaries on an unbounded, infinite stream and Watermark to ensure the datasets within the boundaries are complete. A reduce function can be applied on this group of data.
Windows are of different types, quite popular ones are Fixed windows and Sliding windows. Sessions are managed via little less popular windowing strategy called Session windows. Windowing is implemented as a two stage process,
- Assign windows - assign the event to a window
- Merge windows - group all the events that below to the same window
The two stage approach is required because assignment of windows could happen as elements are streaming in, but merging could happen before the data materialization happens. This is important esp. when we handle session windows where a new event can change the end time of the window.
For simplicity, we will be truncating the windows' boundaries to the nearest time unit (say, 1 minute windows will be truncated to 0th second). Truncating window time to the nearest boundary will help us do mapping with constant time without affecting the correctness, except for the very first materialization of result (e.g., we started at 9:00.11 and the result will be materialized at 9:01.00 and not at 9:01:11).
Windows may be either aligned (e.g., Fixed, Sliding), i.e. applied across all the data for the window of time in question, or unaligned, (e.g., Session) i.e. applied across only specific subsets of the data (e.g. per key) for the given window of time.
Index ¶
- type AlignedKeyedWindower
- type AlignedWindower
- type SortedWindowList
- func (s *SortedWindowList[W]) Back() W
- func (s *SortedWindowList[W]) DeleteWindow(kw W)
- func (s *SortedWindowList[W]) Front() W
- func (s *SortedWindowList[W]) InsertBack(kw W)
- func (s *SortedWindowList[W]) InsertFront(kw W)
- func (s *SortedWindowList[W]) InsertIfNotPresent(kw W) (aw W, isPresent bool)
- func (s *SortedWindowList[W]) Items() []W
- func (s *SortedWindowList[W]) Len() int
- func (s *SortedWindowList[W]) RemoveWindows(tm time.Time) []W
- type Windower
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AlignedKeyedWindower ¶ added in v0.6.3
type AlignedKeyedWindower interface { AlignedWindower // AddSlot adds a slot to the window. Slots are hash-ranges for keys. AddSlot(string) // Partitions returns an array of partition ids Partitions() []partition.ID // Slots returns an array of keys Slots() []string }
AlignedKeyedWindower represents a keyed or non-keyed aligned window (bounded by start and end).
type AlignedWindower ¶ added in v0.7.3
type AlignedWindower interface { // StartTime returns the start time of the window StartTime() time.Time // EndTime returns the end time of the window EndTime() time.Time }
AlignedWindower defines a window that is aligned and bounded by start and end time.
type SortedWindowList ¶ added in v0.7.3
type SortedWindowList[W AlignedWindower] struct { // contains filtered or unexported fields }
SortedWindowList is a thread safe list implementation, which is sorted by window start time from lowest to highest
func NewSortedWindowList ¶ added in v0.7.3
func NewSortedWindowList[W AlignedWindower]() *SortedWindowList[W]
NewSortedWindowList implements a window list ordered by the start time. The Front/Head of the list will always have the smallest element while the End/Tail will have the largest element (start time).
func (*SortedWindowList[W]) Back ¶ added in v0.7.3
func (s *SortedWindowList[W]) Back() W
Back returns the largest element from the list.
func (*SortedWindowList[W]) DeleteWindow ¶ added in v0.7.3
func (s *SortedWindowList[W]) DeleteWindow(kw W)
DeleteWindow deletes a window from the list.
func (*SortedWindowList[W]) Front ¶ added in v0.7.3
func (s *SortedWindowList[W]) Front() W
Front returns the smallest element from the list.
func (*SortedWindowList[W]) InsertBack ¶ added in v0.7.3
func (s *SortedWindowList[W]) InsertBack(kw W)
InsertBack tries to insert the window to the Back of the list as long as the window is larger than the current Back/Tail.
func (*SortedWindowList[W]) InsertFront ¶ added in v0.7.3
func (s *SortedWindowList[W]) InsertFront(kw W)
InsertFront tries to insert the window to the Front of the list as long as the window is smaller than the current Front/Head.
func (*SortedWindowList[W]) InsertIfNotPresent ¶ added in v0.7.3
func (s *SortedWindowList[W]) InsertIfNotPresent(kw W) (aw W, isPresent bool)
InsertIfNotPresent inserts a window to the list of active windows if not present and returns the window.
func (*SortedWindowList[W]) Items ¶ added in v0.7.3
func (s *SortedWindowList[W]) Items() []W
Items returns the entire window list.
func (*SortedWindowList[W]) Len ¶ added in v0.7.3
func (s *SortedWindowList[W]) Len() int
Len returns the length of the window.
func (*SortedWindowList[W]) RemoveWindows ¶ added in v0.7.3
func (s *SortedWindowList[W]) RemoveWindows(tm time.Time) []W
RemoveWindows removes a set of windows smaller than or equal to the given time.
type Windower ¶
type Windower interface { // AssignWindow assigns the event to the window based on give window configuration. AssignWindow(eventTime time.Time) []AlignedKeyedWindower // InsertIfNotPresent inserts window to the list of active windows if not present // if present it will return the window InsertIfNotPresent(aw AlignedKeyedWindower) (AlignedKeyedWindower, bool) // RemoveWindows returns list of window(s) that can be closed RemoveWindows(time time.Time) []AlignedKeyedWindower // NextWindowToBeClosed returns the next window yet to be closed. // will be used by the data forwarder to check if the late message can be considered for processing. NextWindowToBeClosed() AlignedKeyedWindower }
Windower manages AlignedKeyedWindower Will be implemented by each of the windowing strategies.