Documentation ¶
Overview ¶
Package primitives contains integration tests for primitives in beam.
Index ¶
- func Checkpoints(s beam.Scope)
- func CoGBK() *beam.Pipeline
- func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection
- func Drain(s beam.Scope)
- func Flatten() *beam.Pipeline
- func FlattenDup() *beam.Pipeline
- func Panes(s beam.Scope)
- func PanesFn(pn beam.PaneInfo, value float64, emit func(int))
- func ParDoKVSideInput() *beam.Pipeline
- func ParDoMultiMapSideInput() *beam.Pipeline
- func ParDoMultiOutput() *beam.Pipeline
- func ParDoPipelineOptions() *beam.Pipeline
- func ParDoSideInput() *beam.Pipeline
- func Reshuffle() *beam.Pipeline
- func ReshuffleKV() *beam.Pipeline
- func TestStreamBoolSequence() *beam.Pipeline
- func TestStreamByteSliceSequence() *beam.Pipeline
- func TestStreamFloat64Sequence() *beam.Pipeline
- func TestStreamInt64Sequence() *beam.Pipeline
- func TestStreamStrings() *beam.Pipeline
- func TestStreamTwoBoolSequences() *beam.Pipeline
- func TestStreamTwoFloat64Sequences() *beam.Pipeline
- func TestStreamTwoInt64Sequences() *beam.Pipeline
- func TriggerAfterAll(s beam.Scope)
- func TriggerAfterAny(s beam.Scope)
- func TriggerAfterEach(s beam.Scope)
- func TriggerAfterEndOfWindow(s beam.Scope)
- func TriggerAfterProcessingTime(s beam.Scope)
- func TriggerAfterSynchronizedProcessingTime(s beam.Scope)
- func TriggerAlways(s beam.Scope)
- func TriggerDefault(s beam.Scope)
- func TriggerElementCount(s beam.Scope)
- func TriggerNever(s beam.Scope)
- func TriggerOrFinally(s beam.Scope)
- func TriggerRepeat(s beam.Scope)
- func ValidateWindowedSideInputs(s beam.Scope)
- func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection)
- func WindowSums_GBK(s beam.Scope)
- func WindowSums_Lifted(s beam.Scope)
- type RangeEstimator
- type TruncateFn
- func (fn *TruncateFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction
- func (fn *TruncateFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
- func (fn *TruncateFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation
- func (fn *TruncateFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64
- func (fn *TruncateFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction
- func (fn *TruncateFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte) offsetrange.Restriction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Checkpoints ¶ added in v2.40.0
Checkpoints is a small test pipeline to establish the correctness of the simple test case.
func CreateAndSplit ¶ added in v2.35.0
func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection
CreateAndSplit makes a KV PCollection from a list of stringPair types
func FlattenDup ¶
FlattenDups tests flatten with the same input multiple times.
func Panes ¶ added in v2.37.0
Panes constructs a teststream and applies a pardo to get the pane timings.
func ParDoKVSideInput ¶
ParDoKVSideInput computes the sum of ints using a KV side input.
func ParDoMultiMapSideInput ¶ added in v2.35.0
ParDoMultiMapSideInput checks that the multimap side input access pattern works correctly, properly producing the correct output with an asymmetric join.
func ParDoMultiOutput ¶
ParDoMultiOutput test a DoFn with multiple output.
func ParDoPipelineOptions ¶ added in v2.37.0
ParDoPipelineOptions creates a pipeline with flag options to validate that a DoFn can access them as PipelineOptions.
func ParDoSideInput ¶
ParDoSideInput computes the sum of ints using a side input.
func ReshuffleKV ¶
ReshuffleKV tests Reshuffle with KV PCollections.
func TestStreamBoolSequence ¶
TestStreamBoolSequence tests the TestStream primitive by inserting boolean elements then advancing the watermark past the point where they were inserted.
func TestStreamByteSliceSequence ¶
TestStreamByteSliceSequence tests the TestStream primitive by inserting byte slice elements then advancing the watermark to infinity and comparing the output..
func TestStreamFloat64Sequence ¶
TestStreamFloat64Sequence tests the TestStream primitive by inserting float64 elements then advancing the watermark past the point where they were inserted.
func TestStreamInt64Sequence ¶
TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements then advancing the watermark past the point where they were inserted.
func TestStreamStrings ¶
TestStreamSequence tests the TestStream primitive by inserting string elements then advancing the watermark past the point where they were inserted.
func TestStreamTwoBoolSequences ¶
TestStreamTwoBoolSequences tests the TestStream primitive by inserting two sets of boolean elements that arrive on-time into the TestStream
func TestStreamTwoFloat64Sequences ¶
TestStreamTwoFloat64Sequences tests the TestStream primitive by inserting two sets of float64 elements that arrive on-time into the TestStream
func TestStreamTwoInt64Sequences ¶
TestStreamTwoInt64Sequences tests the TestStream primitive by inserting two sets of int64 elements that arrive on-time into the TestStream
func TriggerAfterAll ¶ added in v2.40.0
TriggerAfterAll tests AfterAll trigger. The output pane is fired when all triggers in the subtriggers are ready. In this test, since trigger.AfterCount(int32(5)) won't be ready unless we see 5 elements, trigger.Always() won't fire until we meet that condition. So we fire only once when we see the 5th element.
func TriggerAfterAny ¶ added in v2.40.0
TriggerAfterAny tests AfterAny trigger. In this test, trigger.Always() gets ready everytime. So we would expect panes to be fired at every element irrespective of checking for other triggers.
func TriggerAfterEach ¶ added in v2.40.0
TriggerAfterEach tests AfterEach trigger. The output pane is fired after each trigger is ready in the order set in subtriggers. In this test, since trigger.AfterCount(int32(3)) is first, first pane is fired after 3 elements, then a pane is fired each for trigger.Always() for element 5.0 and 8.0
func TriggerAfterEndOfWindow ¶ added in v2.34.0
TriggerAfterEndOfWindow tests the AfterEndOfWindow Trigger. With AfterCount(2) as the early firing trigger and AfterCount(1) as late firing trigger. It fires two times, one with early firing when there are two elements while the third elements waits in. This third element is fired in the late firing.
func TriggerAfterProcessingTime ¶
TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires output panes once 't' processing time has passed Not yet supported by the flink runner: java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner.
func TriggerAfterSynchronizedProcessingTime ¶ added in v2.40.0
TriggerAfterSynchronizedProcessingTime tests AfterSynchronizedProcessingTime trigger. It fires at the window expiration since the times doesn't synchronize in this test case.
func TriggerAlways ¶
TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
func TriggerDefault ¶
TriggerDefault tests the default trigger which fires the pane after the end of the window
func TriggerElementCount ¶
TriggerElementCount tests the ElementCount Trigger, it waits for atleast N elements to be ready to fire an output pane
func TriggerNever ¶ added in v2.40.0
TriggerNever tests Never Trigger. It fires at the window expiration.
func TriggerOrFinally ¶ added in v2.40.0
TriggerOrFinally tests OrFinally trigger. The main trigger in this test case trigger.Always() is always ready. But the output is produced only when finally trigger is ready. So it is ready at second element in first window and produces two output panes. Similarly, for the second window.
func TriggerRepeat ¶
TriggerRepeat tests the repeat trigger. As of now is it is configure to take only one trigger as a subtrigger. In the below test, it is expected to receive three output panes with two elements each.
func ValidateWindowedSideInputs ¶ added in v2.35.0
ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
func WindowSums ¶
func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection)
WindowSums produces a pipeline that generates the numbers of a 3x3 magic square, and configures the pipeline so that PCollection. Sum is a closure to handle summing data over the window, in a few conditions.
func WindowSums_GBK ¶
func WindowSums_Lifted ¶
Types ¶
type RangeEstimator ¶ added in v2.40.0
type RangeEstimator struct {
// contains filtered or unexported fields
}
RangeEstimator implements the offsetrange.RangeEndEstimator interface. It provides the estimated end for a restriction.
func (*RangeEstimator) Estimate ¶ added in v2.40.0
func (r *RangeEstimator) Estimate() int64
Estimate returns the estimated end.
func (*RangeEstimator) SetEstimate ¶ added in v2.40.0
func (r *RangeEstimator) SetEstimate(estimate int64)
SetEstimate sets the estimated end.
type TruncateFn ¶ added in v2.40.0
type TruncateFn struct {
Estimator RangeEstimator
}
TruncateFn is an SDF.
func (*TruncateFn) CreateInitialRestriction ¶ added in v2.40.0
func (fn *TruncateFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction
CreateInitialRestriction creates an initial restriction
func (*TruncateFn) CreateTracker ¶ added in v2.40.0
func (fn *TruncateFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
CreateTracker wraps the given restriction into a LockRTracker type.
func (*TruncateFn) ProcessElement ¶ added in v2.40.0
func (fn *TruncateFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation
ProcessElement continually gets the start position of the restriction and emits the element as it is.
func (*TruncateFn) RestrictionSize ¶ added in v2.40.0
func (fn *TruncateFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64
RestrictionSize returns the size of the current restriction
func (*TruncateFn) SplitRestriction ¶ added in v2.40.0
func (fn *TruncateFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction
SplitRestriction is similar to the one used in checkpointing.go test.
func (*TruncateFn) TruncateRestriction ¶ added in v2.40.0
func (fn *TruncateFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte) offsetrange.Restriction
TruncateRestriction truncates the restriction during drain.