window

package
v1.2.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package window implements windowing constructs. In the world of data processing on an unbounded stream, Windowing is a concept of grouping data using temporal boundaries. We use event-time to discover temporal boundaries on an unbounded, infinite stream and Watermark to ensure the datasets within the boundaries are complete. A reduce function can be applied on this group of data.

Windows are of different types, quite popular ones are Fixed windows and Sliding windows. Sessions are managed via little less popular windowing strategy called Session windows. Windowing is implemented as a two stage process,

  • Assign windows - assign the event to a window
  • Merge windows - group all the events that below to the same window

The two stage approach is required because assignment of windows could happen as elements are streaming in, but merging could happen before the data materialization happens. This is important esp. when we handle session windows where a new event can change the end time of the window.

For simplicity, we will be truncating the windows' boundaries to the nearest time unit (say, 1 minute windows will be truncated to 0th second). Truncating window time to the nearest boundary will help us do mapping with constant time without affecting the correctness, except for the very first materialization of result (e.g., we started at 9:00.11 and the result will be materialized at 9:01.00 and not at 9:01:11).

Windows may be either aligned (e.g., Fixed, Sliding), i.e. applied across all the data for the window of time in question, or unaligned, (e.g., Session) i.e. applied across only specific subsets of the data (e.g. per key) for the given window of time.

Index

Constants

This section is empty.

Variables

View Source
var SharedUnalignedPartition = partition.ID{
	Start: time.UnixMilli(0),
	End:   time.UnixMilli(math.MaxInt64),
	Slot:  "slot-0",
}

SharedUnalignedPartition is a common partition for unaligned window. unaligned windows share a common pbq, this partition is used to identify the pbq instance.

Functions

This section is empty.

Types

type Operation added in v1.2.0

type Operation int

Operation represents the event type of the operation on the window

const (
	// Open is create a new Window (Open the Book).
	Open Operation = iota
	// Delete closes the partition (this means all the keyed-windows have been closed).
	// PBQ gets closed when Delete is called.
	Delete
	// Close operation for the keyed-window (Close of Book). Only the keyed-window on the SDK side will be closed,
	// other keyed-windows for the same partition can be open. `Delete` has to be called once all the keyed-windows
	// are closed.
	Close
	// Merge merges two or more windows, particularly used for SessionWindows.
	// Perhaps in future we will use it for hot-key partitioning.
	Merge
	// Append inserts more data into the opened Window. Append implicitly does Open if window has not been opened yet.
	Append
	// Expand expands the existing window, used in SessionWindow after adding a new element or after a window merge operation.
	Expand
)

func (Operation) String added in v1.2.0

func (e Operation) String() string

type SortedWindowListByEndTime added in v1.2.0

type SortedWindowListByEndTime struct {
	// contains filtered or unexported fields
}

SortedWindowListByEndTime is a thread safe list implementation, which is sorted by window end time from lowest to highest. All search operations are done using binary-search because it is an already sorted list.

func NewSortedWindowListByEndTime added in v1.2.0

func NewSortedWindowListByEndTime() *SortedWindowListByEndTime

NewSortedWindowListByEndTime implements a window list ordered by the end time. The Front/Head of the list will always have the smallest element while the End/Tail will have the largest element (end time).

func (*SortedWindowListByEndTime) Back added in v1.2.0

Back returns the largest element from the list.

func (*SortedWindowListByEndTime) Delete added in v1.2.0

func (s *SortedWindowListByEndTime) Delete(window TimedWindow) (deleted bool)

Delete deletes a window from the list.

func (*SortedWindowListByEndTime) FindWindowForTime added in v1.2.0

func (s *SortedWindowListByEndTime) FindWindowForTime(t time.Time) (TimedWindow, bool)

FindWindowForTime finds a window for a given time. If there are multiple windows for the given time, it returns the first window.

func (*SortedWindowListByEndTime) Front added in v1.2.0

Front returns the smallest element from the list.

func (*SortedWindowListByEndTime) Insert added in v1.2.0

func (s *SortedWindowListByEndTime) Insert(window TimedWindow)

Insert inserts a window to the list.

func (*SortedWindowListByEndTime) InsertBack added in v1.2.0

func (s *SortedWindowListByEndTime) InsertBack(window TimedWindow)

InsertBack inserts a window to the back of the list.

func (*SortedWindowListByEndTime) InsertFront added in v1.2.0

func (s *SortedWindowListByEndTime) InsertFront(window TimedWindow)

InsertFront inserts a window to the front of the list.

func (*SortedWindowListByEndTime) InsertIfNotPresent added in v1.2.0

func (s *SortedWindowListByEndTime) InsertIfNotPresent(window TimedWindow) (TimedWindow, bool)

InsertIfNotPresent inserts a window to the list of active windows if not present and returns the window.

func (*SortedWindowListByEndTime) Items added in v1.2.0

Items returns the entire window list.

func (*SortedWindowListByEndTime) Len added in v1.2.0

func (s *SortedWindowListByEndTime) Len() int

Len returns the length of the window.

func (*SortedWindowListByEndTime) RemoveWindows added in v1.2.0

func (s *SortedWindowListByEndTime) RemoveWindows(t time.Time) []TimedWindow

RemoveWindows removes a set of windows whose end time is smaller than or equal to the given time. It returns the removed windows.

func (*SortedWindowListByEndTime) WindowToBeMerged added in v1.2.0

func (s *SortedWindowListByEndTime) WindowToBeMerged(window TimedWindow) (TimedWindow, bool)

WindowToBeMerged finds a window to be merged with the given window. It returns the window to be merged and a boolean indicating if a window was found.

type Strategy added in v1.2.0

type Strategy int

Strategy represents the windowing strategy

const (
	Fixed Strategy = iota
	Sliding
	Session
	Global
)

func (Strategy) String added in v1.2.0

func (s Strategy) String() string

type TimedWindow added in v1.2.0

type TimedWindow interface {
	// StartTime returns the start time of the window.
	StartTime() time.Time
	// EndTime returns the end time of the window.
	EndTime() time.Time
	// Slot returns the slot to which the window belongs.
	Slot() string
	// Partition returns the partition id of the window, partition is
	// combination of start time, end time and slot.
	// This will be used to map to the pbq instance where the messages
	// should be persisted.
	Partition() *partition.ID
	// Keys returns the keys of the window tracked for Unaligned windows.
	// This will return empty for Aligned windows.
	Keys() []string
	// ID returns the id which is the unique identifier for the window.
	// This is used to compare the windows. For Aligned windows, this is the
	// combination of start time, end time and slot. For Unaligned windows,
	// this is the combination of start time, end time, slot and keys.
	ID() string
	// Merge merges the window with the new window. It is used only for
	// Unaligned window.
	Merge(tw TimedWindow)
	// Expand expands the window end time to the new endTime. It is used only for
	// Unaligned window.
	Expand(endTime time.Time)
}

TimedWindow represents a time-based window.

func NewAlignedTimedWindow added in v1.2.0

func NewAlignedTimedWindow(st time.Time, et time.Time, slot string) TimedWindow

NewAlignedTimedWindow returns a new TimedWindow for the given start time, end time and slot.

func NewUnalignedTimedWindow added in v1.2.0

func NewUnalignedTimedWindow(st time.Time, et time.Time, slot string, keys []string) TimedWindow

NewUnalignedTimedWindow returns a new TimedWindow for the given start time, end time, slot and keys. We track the keys for Unaligned windows. Because for unaligned windows the start and end times for each window are only dependent on the specific key.

type TimedWindowRequest added in v1.2.0

type TimedWindowRequest struct {
	// Operation is the `Operation` on the windows
	Operation Operation
	// ReadMessage represents the isb message
	ReadMessage *isb.ReadMessage
	// ID represents the partition id
	// this is to map to the pbq instance to which the message should be assigned
	ID *partition.ID
	// windows is the list of windows on which the operation is performed
	Windows []TimedWindow
}

TimedWindowRequest represents the operation to be performed on the window. TimedWindowRequest is sent to the UDF and it contains enough context to execute the operation.

type TimedWindowResponse added in v1.2.0

type TimedWindowResponse struct {
	// WriteMessage represents the isb message
	WriteMessage *isb.WriteMessage
	// Window represents the window to which the message belongs
	Window TimedWindow
	// EOF represents the end of the response for the given window.
	// When EOF is true, it will be just a metadata payload, there won't be any WriteMessage.
	EOF bool
}

TimedWindowResponse is the response from the UDF based on how the result is propagated back. It could be one or more responses based on how many results the user is streaming out.

type TimedWindower added in v1.2.0

type TimedWindower interface {
	// Strategy returns the window strategy.
	Strategy() Strategy
	// Type returns the window type.
	Type() Type
	// AssignWindows assigns the event to the window based on give window configuration.
	AssignWindows(message *isb.ReadMessage) []*TimedWindowRequest
	// CloseWindows closes the windows that are past the watermark.
	CloseWindows(time time.Time) []*TimedWindowRequest
	// InsertWindow inserts a window to the list of active windows.
	InsertWindow(tw TimedWindow)
	// NextWindowToBeClosed returns the next window yet to be closed.
	NextWindowToBeClosed() TimedWindow
	// DeleteClosedWindow deletes the window from the closed windows list.
	DeleteClosedWindow(tw TimedWindow)
	// OldestWindowEndTime returns the end time of the oldest window among both active and closed windows.
	// If there are no windows, it returns -1.
	OldestWindowEndTime() time.Time
}

TimedWindower is the interface for windowing strategy. It manages the lifecycle of timed windows for a reduce vertex. It maintains a list of timed windows locally, generates window requests to be sent to the reduce UDF, and reflects the changes to the list of timed windows based on the response from the UDF.

type Type added in v1.2.0

type Type int

Type represents window type

const (
	Aligned Type = iota
	Unaligned
)

func (Type) String added in v1.2.0

func (t Type) String() string

Directories

Path Synopsis
strategy
fixed
Package fixed implements Fixed windows.
Package fixed implements Fixed windows.
sliding
Package sliding implements Sliding windows.
Package sliding implements Sliding windows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL