sdf

package
v3.0.0-...-16f56ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 3 Imported by: 0

Documentation

Overview

Package contains interfaces used specifically for splittable DoFns.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoundableRTracker

type BoundableRTracker interface {
	RTracker
	// IsBounded returns the boundedness of the current restriction. If the current restriction represents a
	// finite amount of work, it should return true. Otherwise, it should return false.
	IsBounded() bool
}

BoundableRTracker is an interface used to interact with restrictions that may be bounded or unbounded while processing elements in splittable DoFns (specifically, in the ProcessElement method and TruncateRestriction method). Each BoundableRTracker tracks the progress of a single restriction.

All BoundableRTracker methods should be thread-safe for dynamic splits to function correctly.

type LockRTracker

type LockRTracker struct {
	Mu sync.Mutex // Lock on accessing underlying tracker.
	// The underlying tracker. If accessing directly, consider thread safety.
	// Lock the mutex if thread safety is needed.
	Rt RTracker
}

LockRTracker is a restriction tracker that wraps another restriction tracker and adds thread safety to it by locking a mutex in each method, before delegating to the underlying tracker.

func NewLockRTracker

func NewLockRTracker(rt RTracker) *LockRTracker

NewLockRTracker creates a LockRTracker initialized with the specified restriction tracker as its underlying restriction tracker.

func (*LockRTracker) GetError

func (rt *LockRTracker) GetError() error

GetError locks a mutex for thread safety, and then delegates to the underlying tracker's GetError.

func (*LockRTracker) GetProgress

func (rt *LockRTracker) GetProgress() (float64, float64)

GetProgress locks a mutex for thread safety, and then delegates to the underlying tracker's GetProgress.

func (*LockRTracker) GetRestriction

func (rt *LockRTracker) GetRestriction() any

GetRestriction locks a mutex for thread safety, and then delegates to the underlying tracker's GetRestriction.

func (*LockRTracker) IsBounded

func (rt *LockRTracker) IsBounded() bool

IsBounded locks a mutex for thread safety, and then delegates to the underlying tracker's IsBounded(). If BoundableRTracker is not implemented then the RTracker is considered to be bounded by default.

func (*LockRTracker) IsDone

func (rt *LockRTracker) IsDone() bool

IsDone locks a mutex for thread safety, and then delegates to the underlying tracker's IsDone.

func (*LockRTracker) String

func (rt *LockRTracker) String() string

func (*LockRTracker) TryClaim

func (rt *LockRTracker) TryClaim(pos any) (ok bool)

TryClaim locks a mutex for thread safety, and then delegates to the underlying tracker's TryClaim.

func (*LockRTracker) TrySplit

func (rt *LockRTracker) TrySplit(fraction float64) (any, any, error)

TrySplit locks a mutex for thread safety, and then delegates to the underlying tracker's TrySplit.

type ManualWatermarkEstimator

type ManualWatermarkEstimator struct {
	State time.Time
}

ManualWatermarkEstimator is a watermark estimator that advances the current DoFn's output watermark when a user calls UpdateWatermark from within ProcessElement.

func (*ManualWatermarkEstimator) CurrentWatermark

func (e *ManualWatermarkEstimator) CurrentWatermark() time.Time

CurrentWatermark returns the most recent timestamp set from ProcessElement. It is used by the Sdk harness to set the current DoFn's output watermark on splits and checkpoints.

func (*ManualWatermarkEstimator) UpdateWatermark

func (e *ManualWatermarkEstimator) UpdateWatermark(t time.Time)

UpdateWatermark is a convenience function that can be used to update the current watermark from inside ProcessElement.

type ProcessContinuation

type ProcessContinuation interface {
	// ShouldResume returns a boolean indicating whether the process should be
	// resumed at a later time.
	ShouldResume() bool

	// ResumeDelay returns a suggested time.Duration to wait before resuming the
	// process. The runner is not guaranteed to follow this suggestion.
	ResumeDelay() time.Duration
}

ProcessContinuation is an interface used to signal that a splittable DoFn should be split and resumed at a later time. The ProcessContinuation can be returned from a DoFn when it returns, either complete or needing to be resumed.

func ResumeProcessingIn

func ResumeProcessingIn(delay time.Duration) ProcessContinuation

ResumeProcessingIn returns a ProcessContinuation that will resume the process later with a suggested delay passed as a time.Duration.

func StopProcessing

func StopProcessing() ProcessContinuation

StopProcessing returns a ProcessContinuation that will not resume the process later.

type RTracker

type RTracker interface {
	// TryClaim attempts to claim the block of work located in the given position of the
	// restriction. This method must be called in ProcessElement to claim work before it can be
	// processed. Processing work without claiming it first can lead to incorrect output.
	//
	// The position type is up to individual implementations, and will usually be related to the
	// kind of restriction used. For example, a simple restriction representing a numeric range
	// might use an int64. A more complex restriction, such as one representing a multidimensional
	// space, might use a more complex type.
	//
	// If the claim is successful, the DoFn must process the entire block. If the claim is
	// unsuccessful ProcessElement method of the DoFn must return without performing
	// any additional work or emitting any outputs.
	//
	// If the claim fails due to an error, that error is stored and will be automatically emitted
	// when the RTracker is validated, or can be manually retrieved with GetError.
	//
	// This pseudocode example illustrates the typical usage of TryClaim:
	//
	// 	pos = position of first block within the restriction
	// 	for TryClaim(pos) == true {
	// 		// Do all work in the claimed block and emit outputs.
	// 		pos = position of next block within the restriction
	// 	}
	// 	return
	TryClaim(pos any) (ok bool)

	// GetError returns the error that made this RTracker stop executing, and returns nil if no
	// error occurred. This is the error that is emitted if automated validation fails.
	GetError() error

	// TrySplit splits the current restriction into a primary (currently executing work) and
	// residual (work to be split off) based on a fraction of work remaining. The split is performed
	// at the first valid split point located after the given fraction of remaining work.
	//
	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
	//
	// This method modifies the underlying restriction in the RTracker to reflect the primary. It
	// then returns a copy of the newly modified restriction as a primary, and returns a new
	// restriction for the residual. If the split would produce an empty residual (either because
	// the only split point is the end of the restriction, or the split failed for some recoverable
	// reason), then this function returns nil as the residual.
	//
	// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return
	// a primary restriction that represents no remaining work, and the residual should
	// contain all remaining work. The RTracker should be marked as done
	// (and return true when IsDone() is called) after that split.
	// This will ensure that there is no data loss, which would result in
	// the pipeline failing during the checkpoint.
	//
	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
	TrySplit(fraction float64) (primary, residual any, err error)

	// GetProgress returns two abstract scalars representing the amount of done and remaining work.
	// These values have no specific units, but are used to estimate work in relation to each other
	// and should be self-consistent.
	GetProgress() (done float64, remaining float64)

	// IsDone returns a boolean indicating whether all blocks inside the restriction have been
	// claimed. This method is called by the SDK Harness to validate that a splittable DoFn has
	// correctly processed all work in a restriction before finishing. If this method still returns
	// false after processing, then GetError is expected to return a non-nil error.
	//
	// When called immediately following a checkpointing TrySplit() call (with value 0.0), this
	// should return true.
	IsDone() bool

	// GetRestriction returns the restriction this tracker is tracking, or nil if the restriction
	// is unavailable for some reason.
	GetRestriction() any
}

RTracker is an interface used to interact with restrictions while processing elements in splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress of a single restriction.

All RTracker methods should be thread-safe for dynamic splits to function correctly.

type TimestampObservingEstimator

type TimestampObservingEstimator interface {
	WatermarkEstimator
	// ObserveTimestamp is called any time a DoFn emits an element and can use that element's
	// event time to modify the state of the estimator.
	ObserveTimestamp(ts time.Time)
}

TimestampObservingEstimator is an interface used to represent a user defined watermark estimator that has the ability to observe timestamps of elements outputted from a ParDo's emit function.

type TimestampObservingWatermarkEstimator

type TimestampObservingWatermarkEstimator struct {
	State time.Time
}

TimestampObservingWatermarkEstimator is a watermark estimator that advances the current DoFn's output watermark to the timestamp of the most recently emitted element.

func (*TimestampObservingWatermarkEstimator) CurrentWatermark

func (e *TimestampObservingWatermarkEstimator) CurrentWatermark() time.Time

CurrentWatermark returns the current watermark. It is used by the Sdk harness to set the current DoFn's output watermark on splits and checkpoints.

func (*TimestampObservingWatermarkEstimator) ObserveTimestamp

func (e *TimestampObservingWatermarkEstimator) ObserveTimestamp(t time.Time)

ObserveTimestamp returns updates the watermark to the timestamp of the most recently emitted element. It is invoked by the Sdk after each emit. The updated watermark will not be reflected until a split or checkpoint occurs.

type WallTimeWatermarkEstimator

type WallTimeWatermarkEstimator struct{}

WallTimeWatermarkEstimator is a watermark estimator that advances the current DoFn's output watermark to the current wallclock time on splits or checkpoints.

func (*WallTimeWatermarkEstimator) CurrentWatermark

func (e *WallTimeWatermarkEstimator) CurrentWatermark() time.Time

CurrentWatermark returns the current time. It is used by the Sdk harness to set the current DoFn's output watermark on splits and checkpoints.

type WatermarkEstimator

type WatermarkEstimator interface {
	// CurrentWatermark returns the estimator's current watermark. It is called any time a DoFn
	// splits or checkpoints to advance the output watermark of the restriction's stage.
	CurrentWatermark() time.Time
}

WatermarkEstimator is an interface used to represent a user defined watermark estimator. Watermark estimators allow users to advance the output watermark of the current sdf.

type WrappedTracker

type WrappedTracker struct {
	RTracker
}

WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers if necessary (like in self-checkpointing evaluation.)

func NewWrappedTracker

func NewWrappedTracker(underlying RTracker) *WrappedTracker

NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker.

func (*WrappedTracker) IsBounded

func (t *WrappedTracker) IsBounded() bool

IsBounded returns true, indicating that the underlying RTracker represents a bounded amount of work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL