Documentation ¶
Overview ¶
Package window implements windowing constructs. In the world of data processing on an unbounded stream, Windowing is a concept of grouping data using temporal boundaries. We use event-time to discover temporal boundaries on an unbounded, infinite stream and Watermark to ensure the datasets within the boundaries are complete. A reduce function can be applied on this group of data.
Windows are of different types, quite popular ones are Fixed windows and Sliding windows. Sessions are managed via little less popular windowing strategy called Session windows. Windowing is implemented as a two stage process,
- Assign windows - assign the event to a window
- Merge windows - group all the events that below to the same window
The two stage approach is required because assignment of windows could happen as elements are streaming in, but merging could happen before the data materialization happens. This is important esp. when we handle session windows where a new event can change the end time of the window.
For simplicity, we will be truncating the windows' boundaries to the nearest time unit (say, 1 minute windows will be truncated to 0th second). Truncating window time to the nearest boundary will help us do mapping with constant time without affecting the correctness, except for the very first materialization of result (e.g., we started at 9:00.11 and the result will be materialized at 9:01.00 and not at 9:01:11).
Windows may be either aligned (e.g., Fixed, Sliding), i.e. applied across all the data for the window of time in question, or unaligned, (e.g., Session) i.e. applied across only specific subsets of the data (e.g. per key) for the given window of time.
Index ¶
- Variables
- type Operation
- type SortedWindowListByEndTime
- func (s *SortedWindowListByEndTime) Back() TimedWindow
- func (s *SortedWindowListByEndTime) Delete(window TimedWindow) (deleted bool)
- func (s *SortedWindowListByEndTime) FindWindowForTime(t time.Time) (TimedWindow, bool)
- func (s *SortedWindowListByEndTime) Front() TimedWindow
- func (s *SortedWindowListByEndTime) Insert(window TimedWindow)
- func (s *SortedWindowListByEndTime) InsertBack(window TimedWindow)
- func (s *SortedWindowListByEndTime) InsertFront(window TimedWindow)
- func (s *SortedWindowListByEndTime) InsertIfNotPresent(window TimedWindow) (TimedWindow, bool)
- func (s *SortedWindowListByEndTime) Items() []TimedWindow
- func (s *SortedWindowListByEndTime) Len() int
- func (s *SortedWindowListByEndTime) RemoveWindows(t time.Time) []TimedWindow
- func (s *SortedWindowListByEndTime) WindowToBeMerged(window TimedWindow) (TimedWindow, bool)
- type Strategy
- type TimedWindow
- type TimedWindowRequest
- type TimedWindowResponse
- type TimedWindower
- type Type
Constants ¶
This section is empty.
Variables ¶
SharedUnalignedPartition is a common partition for unaligned window. unaligned windows share a common pbq, this partition is used to identify the pbq instance.
Functions ¶
This section is empty.
Types ¶
type Operation ¶ added in v1.2.0
type Operation int
Operation represents the event type of the operation on the window
const ( // Open is create a new Window (Open the Book). Open Operation = iota // Delete closes the partition (this means all the keyed-windows have been closed). // PBQ gets closed when Delete is called. Delete // Close operation for the keyed-window (Close of Book). Only the keyed-window on the SDK side will be closed, // other keyed-windows for the same partition can be open. `Delete` has to be called once all the keyed-windows // are closed. Close // Merge merges two or more windows, particularly used for SessionWindows. // Perhaps in future we will use it for hot-key partitioning. Merge // Append inserts more data into the opened Window. Append implicitly does Open if window has not been opened yet. Append // Expand expands the existing window, used in SessionWindow after adding a new element or after a window merge operation. Expand )
type SortedWindowListByEndTime ¶ added in v1.2.0
type SortedWindowListByEndTime struct {
// contains filtered or unexported fields
}
SortedWindowListByEndTime is a thread safe list implementation, which is sorted by window end time from lowest to highest. All search operations are done using binary-search because it is an already sorted list.
func NewSortedWindowListByEndTime ¶ added in v1.2.0
func NewSortedWindowListByEndTime() *SortedWindowListByEndTime
NewSortedWindowListByEndTime implements a window list ordered by the end time. The Front/Head of the list will always have the smallest element while the End/Tail will have the largest element (end time).
func (*SortedWindowListByEndTime) Back ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Back() TimedWindow
Back returns the largest element from the list.
func (*SortedWindowListByEndTime) Delete ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Delete(window TimedWindow) (deleted bool)
Delete deletes a window from the list.
func (*SortedWindowListByEndTime) FindWindowForTime ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) FindWindowForTime(t time.Time) (TimedWindow, bool)
FindWindowForTime finds a window for a given time. If there are multiple windows for the given time, it returns the first window.
func (*SortedWindowListByEndTime) Front ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Front() TimedWindow
Front returns the smallest element from the list.
func (*SortedWindowListByEndTime) Insert ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Insert(window TimedWindow)
Insert inserts a window to the list.
func (*SortedWindowListByEndTime) InsertBack ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) InsertBack(window TimedWindow)
InsertBack inserts a window to the back of the list.
func (*SortedWindowListByEndTime) InsertFront ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) InsertFront(window TimedWindow)
InsertFront inserts a window to the front of the list.
func (*SortedWindowListByEndTime) InsertIfNotPresent ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) InsertIfNotPresent(window TimedWindow) (TimedWindow, bool)
InsertIfNotPresent inserts a window to the list of active windows if not present and returns the window.
func (*SortedWindowListByEndTime) Items ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Items() []TimedWindow
Items returns the entire window list.
func (*SortedWindowListByEndTime) Len ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) Len() int
Len returns the length of the window.
func (*SortedWindowListByEndTime) RemoveWindows ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) RemoveWindows(t time.Time) []TimedWindow
RemoveWindows removes a set of windows whose end time is smaller than or equal to the given time. It returns the removed windows.
func (*SortedWindowListByEndTime) WindowToBeMerged ¶ added in v1.2.0
func (s *SortedWindowListByEndTime) WindowToBeMerged(window TimedWindow) (TimedWindow, bool)
WindowToBeMerged finds a window to be merged with the given window. It returns the window to be merged and a boolean indicating if a window was found.
type TimedWindow ¶ added in v1.2.0
type TimedWindow interface { // StartTime returns the start time of the window. StartTime() time.Time // EndTime returns the end time of the window. EndTime() time.Time // Slot returns the slot to which the window belongs. Slot() string // Partition returns the partition id of the window, partition is // combination of start time, end time and slot. // This will be used to map to the pbq instance where the messages // should be persisted. Partition() *partition.ID // Keys returns the keys of the window tracked for Unaligned windows. // This will return empty for Aligned windows. Keys() []string // ID returns the id which is the unique identifier for the window. // This is used to compare the windows. For Aligned windows, this is the // combination of start time, end time and slot. For Unaligned windows, // this is the combination of start time, end time, slot and keys. ID() string // Merge merges the window with the new window. It is used only for // Unaligned window. Merge(tw TimedWindow) // Expand expands the window end time to the new endTime. It is used only for // Unaligned window. Expand(endTime time.Time) }
TimedWindow represents a time-based window.
func NewAlignedTimedWindow ¶ added in v1.2.0
NewAlignedTimedWindow returns a new TimedWindow for the given start time, end time and slot.
func NewUnalignedTimedWindow ¶ added in v1.2.0
NewUnalignedTimedWindow returns a new TimedWindow for the given start time, end time, slot and keys. We track the keys for Unaligned windows. Because for unaligned windows the start and end times for each window are only dependent on the specific key.
type TimedWindowRequest ¶ added in v1.2.0
type TimedWindowRequest struct { // Operation is the `Operation` on the windows Operation Operation // ReadMessage represents the isb message ReadMessage *isb.ReadMessage // ID represents the partition id // this is to map to the pbq instance to which the message should be assigned ID *partition.ID // windows is the list of windows on which the operation is performed Windows []TimedWindow }
TimedWindowRequest represents the operation to be performed on the window. TimedWindowRequest is sent to the UDF and it contains enough context to execute the operation.
type TimedWindowResponse ¶ added in v1.2.0
type TimedWindowResponse struct { // WriteMessage represents the isb message WriteMessage *isb.WriteMessage // Window represents the window to which the message belongs Window TimedWindow // EOF represents the end of the response for the given window. // When EOF is true, it will be just a metadata payload, there won't be any WriteMessage. EOF bool }
TimedWindowResponse is the response from the UDF based on how the result is propagated back. It could be one or more responses based on how many results the user is streaming out.
type TimedWindower ¶ added in v1.2.0
type TimedWindower interface { // Strategy returns the window strategy. Strategy() Strategy // Type returns the window type. Type() Type // AssignWindows assigns the event to the window based on give window configuration. AssignWindows(message *isb.ReadMessage) []*TimedWindowRequest // CloseWindows closes the windows that are past the watermark. CloseWindows(time time.Time) []*TimedWindowRequest // InsertWindow inserts a window to the list of active windows. InsertWindow(tw TimedWindow) // NextWindowToBeClosed returns the next window yet to be closed. NextWindowToBeClosed() TimedWindow // DeleteClosedWindow deletes the window from the closed windows list. DeleteClosedWindow(tw TimedWindow) // OldestWindowEndTime returns the end time of the oldest window among both active and closed windows. // If there are no windows, it returns -1. OldestWindowEndTime() time.Time }
TimedWindower is the interface for windowing strategy. It manages the lifecycle of timed windows for a reduce vertex. It maintains a list of timed windows locally, generates window requests to be sent to the reduce UDF, and reflects the changes to the list of timed windows based on the response from the UDF.