wmb

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package wmb represents the offset-timeline pair and its corresponding encoder and decoder.

Index

Constants

View Source
const PARTITION_0 = 0

PARTITION_0 is used when it is the only partition of the buffer, and this is used only for those cases where we cannot have more than one partition (e.g., Reduce vertex, Source, etc.)

Variables

View Source
var InitialWatermark = Watermark(time.UnixMilli(-1))

Functions

This section is empty.

Types

type IdleManager added in v0.7.3

type IdleManager interface {
	// NeedToSendCtrlMsg validates whether to send a control message for the given partition or not.
	NeedToSendCtrlMsg(toBufferPartitionName string) bool
	// Get gets the isb.Offset from the given partition.
	Get(toBufferPartitionName string) isb.Offset
	// Update updates the isb.Offset for the given partition using the new offset if the partition exists, otherwise
	// create a new entry.
	Update(fromBufferPartitionIndex int32, toBufferPartitionName string, newOffset isb.Offset)
	// MarkActive marks the active status for the given partition.
	MarkActive(fromBufferPartitionIndex int32, toBufferPartitionName string)
	// MarkIdle marks idle for the given toBuffer partition name if it's not idle.
	MarkIdle(fromBufferPartitionIndex int32, toBufferPartitionName string)
}

IdleManager decides when to send a control message and also keeps track of idle watermark's offset.

func NewIdleManager added in v0.7.3

func NewIdleManager(numOfFromPartitions int, numOfToPartitions int) (IdleManager, error)

NewIdleManager returns an idleManager object as the IdleManager interface type to track the watermark idle status.

func NewNoOpIdleManager added in v0.10.1

func NewNoOpIdleManager() IdleManager

NewNoOpIdleManager returns an no op idleManager object as the IdleManager Interface type

type WMB

type WMB struct {
	// Idle is set to true if the given processor entity hasn't published anything
	// to the offset timeline bucket in a batch processing cycle.
	// Idle is used to signal an idle watermark.
	Idle bool
	// Offset is the monotonically increasing index/offset of the buffer (buffer is the physical representation
	// of the partition of the edge).
	Offset int64
	// Watermark is tightly coupled with the offset and will be monotonically increasing for a given ProcessorEntity
	// as the offset increases.
	// When it is idling (Idle==true), for a given offset, the watermark can monotonically increase without offset
	// increasing.
	Watermark int64
	// Partition to identify the partition to which the watermark belongs.
	Partition int32
}

WMB is used in the KV offset timeline bucket as the value for the given processor entity key.

func DecodeToWMB

func DecodeToWMB(b []byte) (WMB, error)

DecodeToWMB decodes the given byte array into a WMB object using protobuf.

func (WMB) EncodeToBytes

func (w WMB) EncodeToBytes() ([]byte, error)

EncodeToBytes encodes a WMB object into byte array using protobuf.

type WMBChecker added in v0.7.3

type WMBChecker struct {
	// contains filtered or unexported fields
}

WMBChecker checks if the idle watermark is valid. It checks by making sure the Idle WMB's offset has not been changed in X iterations. This check is required because we have to make sure the time at which the idleness has been detected matches with reality (processes could hang in between). The only way to do that is by multiple iterations.

func NewWMBChecker added in v0.7.3

func NewWMBChecker(numOfIteration int) WMBChecker

NewWMBChecker returns a WMBChecker to check if the wmb is idle. If all the iterations get the same wmb offset, the wmb is considered as valid and will be used to publish a wmb to the toBuffer partitions of the next vertex.

func (*WMBChecker) GetCounter added in v0.7.3

func (c *WMBChecker) GetCounter() int

GetCounter gets the current iterationCounter value for the WMBChecker, it's used in log and tests

func (*WMBChecker) ValidateHeadWMB added in v0.7.3

func (c *WMBChecker) ValidateHeadWMB(w WMB) bool

ValidateHeadWMB checks if the head wmb is idle, and it has the same wmb offset from the previous iteration. If all the iterations get the same wmb offset, returns true.

type Watermark

type Watermark time.Time

Watermark is the monotonically increasing watermark. It is tightly coupled with ProcessorEntitier as the processor is responsible for monotonically increasing Watermark for that processor. NOTE: today we support only second progression of watermark, we need to support millisecond too.

func (Watermark) Add added in v1.1.0

func (w Watermark) Add(t time.Duration) time.Time

func (Watermark) After

func (w Watermark) After(t time.Time) bool

func (Watermark) AfterWatermark added in v0.10.0

func (w Watermark) AfterWatermark(compare Watermark) bool

func (Watermark) Before

func (w Watermark) Before(t time.Time) bool

func (Watermark) BeforeWatermark added in v0.10.0

func (w Watermark) BeforeWatermark(compare Watermark) bool

func (Watermark) String

func (w Watermark) String() string

func (Watermark) UnixMilli

func (w Watermark) UnixMilli() int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL