Documentation ¶
Overview ¶
Package wmb represents the offset-timeline pair and its corresponding encoder and decoder.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var InitialWatermark = Watermark(time.UnixMilli(-1))
Functions ¶
This section is empty.
Types ¶
type IdleManager ¶ added in v0.7.3
type IdleManager struct {
// contains filtered or unexported fields
}
IdleManager manages the idle watermark whether the control message is a duplicate and also keeps track of the idle WMB's offset.
func NewIdleManager ¶ added in v0.7.3
func NewIdleManager(length int) *IdleManager
NewIdleManager returns an IdleManager object to track the watermark idle status.
func (*IdleManager) Exists ¶ added in v0.7.3
func (im *IdleManager) Exists(toBufferName string) bool
Exists returns true if the given toBuffer partition name exists in the IdleManager map.
func (*IdleManager) Get ¶ added in v0.7.3
func (im *IdleManager) Get(toBufferName string) isb.Offset
Get gets the offset for the given toBuffer partition name.
func (*IdleManager) Reset ¶ added in v0.7.3
func (im *IdleManager) Reset(toBufferName string)
Reset will clear the item for the given toBuffer partition name.
type WMB ¶
type WMB struct { // Idle is set to true if the given processor entity hasn't published anything // to the offset timeline bucket in a batch processing cycle. // Idle is used to signal an idle watermark. Idle bool // Offset is the monotonically increasing index/offset of the buffer (buffer is the physical representation // of the partition of the edge). Offset int64 // Watermark is tightly coupled with the offset and will be monotonically increasing for a given ProcessorEntity // as the offset increases. // When it is idling (Idle==true), for a given offset, the watermark can monotonically increase without offset // increasing. Watermark int64 // Partition to identify the partition to which the watermark belongs. Partition int32 }
WMB is used in the KV offset timeline bucket as the value for the given processor entity key.
func DecodeToWMB ¶
DecodeToWMB decodes the given byte array into a WMB object.
func (WMB) EncodeToBytes ¶
EncodeToBytes encodes a WMB object into byte array.
type WMBChecker ¶ added in v0.7.3
type WMBChecker struct {
// contains filtered or unexported fields
}
WMBChecker checks if the idle watermark is valid. It checks by making sure the Idle WMB's offset has not been changed in X iterations. This check is required because we have to make sure the time at which the idleness has been detected matches with reality (processes could hang in between). The only way to do that is by multiple iterations.
func NewWMBChecker ¶ added in v0.7.3
func NewWMBChecker(numOfIteration int) WMBChecker
NewWMBChecker returns a WMBChecker to check if the wmb is idle. If all the iterations get the same wmb offset, the wmb is considered as valid and will be used to publish a wmb to pods of the next vertex.
func (*WMBChecker) GetCounter ¶ added in v0.7.3
func (c *WMBChecker) GetCounter() int
GetCounter gets the current iterationCounter value for the WMBChecker, it's used in log and tests
func (*WMBChecker) ValidateHeadWMB ¶ added in v0.7.3
func (c *WMBChecker) ValidateHeadWMB(w WMB) bool
ValidateHeadWMB checks if the head wmb is idle, and it has the same wmb offset from the previous iteration. If all the iterations get the same wmb offset, returns true.
type Watermark ¶
Watermark is the monotonically increasing watermark. It is tightly coupled with ProcessorEntitier as the processor is responsible for monotonically increasing Watermark for that processor. NOTE: today we support only second progression of watermark, we need to support millisecond too.