Documentation ¶
Overview ¶
Package exec contains runtime plan representation and execution. A pipeline must be translated to a runtime plan to be executed.
Index ¶
- func Convert(v any, to reflect.Type) any
- func ConvertFn(from, to reflect.Type) func(any) any
- func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, typex.PaneInfo, error)
- func EncodeElement(c ElementEncoder, val any) ([]byte, error)
- func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error)
- func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, p typex.PaneInfo, ...) error
- func IsEmitterRegistered(t reflect.Type) bool
- func IsInputRegistered(t reflect.Type) bool
- func MultiFinishBundle(ctx context.Context, list ...Node) error
- func MultiStartBundle(ctx context.Context, id string, data DataContext, list ...Node) error
- func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)
- func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)
- type Checkpoint
- type Combine
- func (n *Combine) Down(ctx context.Context) error
- func (n *Combine) FinishBundle(ctx context.Context) error
- func (n *Combine) GetPID() string
- func (n *Combine) ID() UnitID
- func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Combine) String() string
- func (n *Combine) Up(ctx context.Context) error
- type ConvertToAccumulators
- type DataContext
- type DataManager
- type DataSample
- type DataSampler
- type DataSink
- func (n *DataSink) Down(ctx context.Context) error
- func (n *DataSink) FinishBundle(ctx context.Context) error
- func (n *DataSink) ID() UnitID
- func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (n *DataSink) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *DataSink) String() string
- func (n *DataSink) Up(ctx context.Context) error
- type DataSource
- func (n *DataSource) Down(ctx context.Context) error
- func (n *DataSource) FinishBundle(ctx context.Context) error
- func (n *DataSource) ID() UnitID
- func (n *DataSource) InitSplittable()
- func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error)
- func (n *DataSource) Progress() ProgressReportSnapshot
- func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error)
- func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *DataSource) String() string
- func (n *DataSource) Up(ctx context.Context) error
- type Decoder
- type Discard
- func (d *Discard) Down(ctx context.Context) error
- func (d *Discard) FinishBundle(ctx context.Context) error
- func (d *Discard) ID() UnitID
- func (d *Discard) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (d *Discard) StartBundle(ctx context.Context, id string, data DataContext) error
- func (d *Discard) String() string
- func (d *Discard) Up(ctx context.Context) error
- type ElementDecoder
- type ElementEncoder
- type ElementProcessor
- type Elements
- type Encoder
- type Expand
- func (n *Expand) Down(ctx context.Context) error
- func (n *Expand) FinishBundle(ctx context.Context) error
- func (n *Expand) ID() UnitID
- func (n *Expand) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *Expand) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Expand) String() string
- func (n *Expand) Up(ctx context.Context) error
- type ExtractOutput
- type FixedKey
- func (n *FixedKey) Down(ctx context.Context) error
- func (n *FixedKey) FinishBundle(ctx context.Context) error
- func (n *FixedKey) ID() UnitID
- func (n *FixedKey) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *FixedKey) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *FixedKey) String() string
- func (n *FixedKey) Up(ctx context.Context) error
- type FixedReStream
- type FixedStream
- type Flatten
- func (m *Flatten) Down(ctx context.Context) error
- func (m *Flatten) FinishBundle(ctx context.Context) error
- func (m *Flatten) ID() UnitID
- func (m *Flatten) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (m *Flatten) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *Flatten) String() string
- func (m *Flatten) Up(ctx context.Context) error
- type FullValue
- func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, ...) (*FullValue, error)deprecated
- func InvokeWithOpts(ctx context.Context, fn *funcx.Fn, pn typex.PaneInfo, ws []typex.Window, ...) (*FullValue, error)
- func InvokeWithOptsWithoutEventTime(ctx context.Context, fn *funcx.Fn, opts InvokeOpts) (*FullValue, error)
- func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, ...) (*FullValue, error)deprecated
- func ReadAll(rs ReStream) ([]FullValue, error)
- type GenID
- type Inject
- func (n *Inject) Down(ctx context.Context) error
- func (n *Inject) FinishBundle(ctx context.Context) error
- func (n *Inject) ID() UnitID
- func (n *Inject) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *Inject) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Inject) String() string
- func (n *Inject) Up(ctx context.Context) error
- type InvokeOpts
- type LiftedCombine
- func (n *LiftedCombine) Down(ctx context.Context) error
- func (n *LiftedCombine) FinishBundle(ctx context.Context) error
- func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *LiftedCombine) String() string
- func (n *LiftedCombine) Up(ctx context.Context) error
- type MainInput
- type MapWindows
- func (m *MapWindows) Down(_ context.Context) error
- func (m *MapWindows) FinishBundle(ctx context.Context) error
- func (m *MapWindows) ID() UnitID
- func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (m *MapWindows) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *MapWindows) String() string
- func (m *MapWindows) Up(_ context.Context) error
- type MergeAccumulators
- type Multiplex
- func (m *Multiplex) Down(ctx context.Context) error
- func (m *Multiplex) FinishBundle(ctx context.Context) error
- func (m *Multiplex) ID() UnitID
- func (m *Multiplex) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (m *Multiplex) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *Multiplex) String() string
- func (m *Multiplex) Up(ctx context.Context) error
- type Node
- type PCollection
- func (p *PCollection) Down(ctx context.Context) error
- func (p *PCollection) FinishBundle(ctx context.Context) error
- func (p *PCollection) ID() UnitID
- func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (p *PCollection) StartBundle(ctx context.Context, id string, data DataContext) error
- func (p *PCollection) String() string
- func (p *PCollection) Up(ctx context.Context) error
- type PCollectionSnapshot
- type PairWithRestriction
- func (n *PairWithRestriction) Down(_ context.Context) error
- func (n *PairWithRestriction) FinishBundle(ctx context.Context) error
- func (n *PairWithRestriction) ID() UnitID
- func (n *PairWithRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *PairWithRestriction) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *PairWithRestriction) String() string
- func (n *PairWithRestriction) Up(_ context.Context) error
- type ParDo
- func (n *ParDo) AttachFinalizer(bf *bundleFinalizer)
- func (n *ParDo) Down(ctx context.Context) error
- func (n *ParDo) FinishBundle(_ context.Context) error
- func (n *ParDo) GetPID() string
- func (n *ParDo) HasOnTimer() bool
- func (n *ParDo) ID() UnitID
- func (n *ParDo) ProcessElement(_ context.Context, elm *FullValue, values ...ReStream) error
- func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error)
- func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *ParDo) String() string
- func (n *ParDo) Up(ctx context.Context) error
- type Plan
- func (p *Plan) Checkpoint() []*Checkpoint
- func (p *Plan) Down(ctx context.Context) error
- func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error
- func (p *Plan) Finalize() error
- func (p *Plan) GetExpirationTime() time.Time
- func (p *Plan) ID() string
- func (p *Plan) Progress() (PlanSnapshot, bool)
- func (p *Plan) SourcePTransformID() string
- func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error)
- func (p *Plan) String() string
- type PlanSnapshot
- type Port
- type ProcessSizedElementsAndRestrictions
- func (n *ProcessSizedElementsAndRestrictions) AttachFinalizer(bf *bundleFinalizer)
- func (n *ProcessSizedElementsAndRestrictions) Checkpoint(ctx context.Context) ([]*FullValue, error)
- func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error
- func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) error
- func (n *ProcessSizedElementsAndRestrictions) GetInputId() string
- func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() map[string]*timestamppb.Timestamp
- func (n *ProcessSizedElementsAndRestrictions) GetProgress() float64
- func (n *ProcessSizedElementsAndRestrictions) GetTransformId() string
- func (n *ProcessSizedElementsAndRestrictions) ID() UnitID
- func (n *ProcessSizedElementsAndRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *ProcessSizedElementsAndRestrictions) Split(ctx context.Context, f float64) ([]*FullValue, []*FullValue, error)
- func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *ProcessSizedElementsAndRestrictions) String() string
- func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error
- type ProgressReportSnapshot
- type ReStream
- type ReshuffleInput
- func (n *ReshuffleInput) Down(ctx context.Context) error
- func (n *ReshuffleInput) FinishBundle(ctx context.Context) error
- func (n *ReshuffleInput) ID() UnitID
- func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *ReshuffleInput) String() string
- func (n *ReshuffleInput) Up(ctx context.Context) error
- type ReshuffleOutput
- func (n *ReshuffleOutput) Down(ctx context.Context) error
- func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error
- func (n *ReshuffleOutput) ID() UnitID
- func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
- func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *ReshuffleOutput) String() string
- func (n *ReshuffleOutput) Up(ctx context.Context) error
- type ReusableEmitter
- type ReusableInput
- type ReusableTimestampObservingWatermarkEmitter
- type Root
- type SdfFallback
- func (n *SdfFallback) AttachFinalizer(bf *bundleFinalizer)
- func (n *SdfFallback) Down(ctx context.Context) error
- func (n *SdfFallback) FinishBundle(ctx context.Context) error
- func (n *SdfFallback) ID() UnitID
- func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *SdfFallback) String() string
- func (n *SdfFallback) Up(ctx context.Context) error
- type SideCache
- type SideInputAdapter
- type SplitAndSizeRestrictions
- func (n *SplitAndSizeRestrictions) Down(_ context.Context) error
- func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error
- func (n *SplitAndSizeRestrictions) ID() UnitID
- func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *SplitAndSizeRestrictions) String() string
- func (n *SplitAndSizeRestrictions) Up(_ context.Context) error
- type SplitPoints
- type SplitResult
- type SplittableUnit
- type StateReader
- type Status
- type Stream
- type StreamID
- type TimerRecv
- type ToString
- func (m *ToString) Down(ctx context.Context) error
- func (m *ToString) FinishBundle(ctx context.Context) error
- func (m *ToString) ID() UnitID
- func (m *ToString) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (m *ToString) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *ToString) String() string
- func (m *ToString) Up(ctx context.Context) error
- type TruncateSizedRestriction
- func (n *TruncateSizedRestriction) Down(_ context.Context) error
- func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error
- func (n *TruncateSizedRestriction) ID() UnitID
- func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *TruncateSizedRestriction) String() string
- func (n *TruncateSizedRestriction) Up(ctx context.Context) error
- type Unit
- type UnitID
- type UserStateAdapter
- type WindowDecoder
- type WindowEncoder
- type WindowInto
- func (w *WindowInto) Down(ctx context.Context) error
- func (w *WindowInto) FinishBundle(ctx context.Context) error
- func (w *WindowInto) ID() UnitID
- func (w *WindowInto) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
- func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContext) error
- func (w *WindowInto) String() string
- func (w *WindowInto) Up(ctx context.Context) error
- type WindowMapper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Convert ¶
Convert converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.
func ConvertFn ¶
ConvertFn returns a function that converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.
func DecodeWindowedValueHeader ¶
func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, typex.PaneInfo, error)
DecodeWindowedValueHeader deserializes a windowed value header.
func EncodeElement ¶
func EncodeElement(c ElementEncoder, val any) ([]byte, error)
EncodeElement is a convenience function for encoding a single element into a byte slice.
func EncodeWindow ¶
func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error)
EncodeWindow is a convenience function for encoding a single window into a byte slice.
func EncodeWindowedValueHeader ¶
func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, p typex.PaneInfo, w io.Writer) error
EncodeWindowedValueHeader serializes a windowed value header.
func IsEmitterRegistered ¶
IsEmitterRegistered returns whether an emitter maker has already been registered.
func IsInputRegistered ¶
IsInputRegistered returns whether an input maker has already been registered.
func MultiFinishBundle ¶
MultiFinishBundle calls FinishBundle on multiple nodes. Convenience function.
func MultiStartBundle ¶
MultiStartBundle calls StartBundle on multiple nodes. Convenience function.
func RegisterEmitter ¶
func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)
RegisterEmitter registers an emitter for the given type, such as "func(int)". If multiple emitters are registered for the same type, the last registration wins.
func RegisterInput ¶
func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)
RegisterInput registers an input handler for the given type, such as "func(*int)bool". If multiple input handlers are registered for the same type, the last registration wins.
Types ¶
type Checkpoint ¶
type Checkpoint struct { SR SplitResult Reapply time.Duration }
type Combine ¶
type Combine struct { UID UnitID Fn *graph.CombineFn UsesKey bool Out Node PID string // contains filtered or unexported fields }
Combine is a Combine executor. Combiners do not have side inputs (or output).
func (*Combine) FinishBundle ¶
FinishBundle completes this node's processing of a bundle.
func (*Combine) ProcessElement ¶
ProcessElement combines elements grouped by key using the CombineFn's AddInput, MergeAccumulators, and ExtractOutput functions.
func (*Combine) StartBundle ¶
StartBundle initializes processing this bundle for combines.
type ConvertToAccumulators ¶
type ConvertToAccumulators struct {
*Combine
}
ConvertToAccumulators is an executor for converting an input value to an accumulator value.
func (*ConvertToAccumulators) ProcessElement ¶
func (n *ConvertToAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
ProcessElement accepts an input value and returns an accumulator containing that one value.
func (*ConvertToAccumulators) String ¶
func (n *ConvertToAccumulators) String() string
type DataContext ¶
type DataContext struct { Data DataManager State StateReader }
DataContext holds connectors to various data connections, incl. state and side input.
type DataManager ¶
type DataManager interface { // OpenElementChan opens a channel for data and timers. OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error) // OpenWrite opens a closable byte stream for data writing. OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error) // OpenTimerWrite opens a byte stream for writing timers OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error) }
DataManager manages external data byte streams. Each data stream can be opened by one consumer only.
type DataSample ¶
DataSample contains property for sampled element
type DataSampler ¶
type DataSampler struct {
// contains filtered or unexported fields
}
DataSampler manages sampled elements based on PCollectionID
func NewDataSampler ¶
func NewDataSampler(ctx context.Context) *DataSampler
NewDataSampler inits a new Data Sampler object and returns pointer to it.
func (*DataSampler) GetSamples ¶
func (d *DataSampler) GetSamples(pids []string) map[string][]*DataSample
GetSamples returns samples for given pCollectionID. If no pCollectionID is provided, return all available samples
func (*DataSampler) SendSample ¶
func (d *DataSampler) SendSample(pCollectionID string, element []byte, timestamp time.Time)
SendSample is called by PCollection Node to send sampled element to Data Sampler async
type DataSink ¶
type DataSink struct { UID UnitID SID StreamID Coder *coder.Coder PCol *PCollection // Handles size metrics. // contains filtered or unexported fields }
DataSink is a Node that writes element data to the data service..
func (*DataSink) FinishBundle ¶
FinishBundle closes the write to the data channel.
func (*DataSink) ProcessElement ¶
ProcessElement encodes the windowed value header for the element, followed by the element, emitting it to the data service.
func (*DataSink) StartBundle ¶
StartBundle opens the writer to the data service.
type DataSource ¶
type DataSource struct { UID UnitID SID StreamID Name string Coder *coder.Coder Out Node PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests. // OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks. OnTimerTransforms map[string]*ParDo // contains filtered or unexported fields }
DataSource is a Root execution unit.
func (*DataSource) Down ¶
func (n *DataSource) Down(ctx context.Context) error
Down resets the source.
func (*DataSource) FinishBundle ¶
func (n *DataSource) FinishBundle(ctx context.Context) error
FinishBundle resets the source.
func (*DataSource) InitSplittable ¶
func (n *DataSource) InitSplittable()
InitSplittable initializes the SplittableUnit channel from the output unit, if it provides one.
func (*DataSource) Process ¶
func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error)
Process opens the data source, reads and decodes data, kicking off element processing.
func (*DataSource) Progress ¶
func (n *DataSource) Progress() ProgressReportSnapshot
Progress returns a snapshot of the source's progress.
func (*DataSource) Split ¶
func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error)
Split takes a sorted set of potential split indices and a fraction of the remainder to split at, selects and actuates a split on an appropriate split index, and returns the selected split index in a SplitResult if successful or an error when unsuccessful.
If the following transform is splittable, and the split indices and fraction allow for splitting on the currently processing element, then a sub-element split is performed, and the appropriate information is returned in the SplitResult.
The bufSize param specifies the estimated number of elements that will be sent to this DataSource, and is used to be able to perform accurate splits even if the DataSource has not yet received all its elements. A bufSize of 0 or less indicates that it's unknown, and so uses the current known size.
func (*DataSource) StartBundle ¶
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle initializes this datasource for the bundle.
func (*DataSource) String ¶
func (n *DataSource) String() string
type Decoder ¶
type Decoder interface { // Decode decodes the []byte in to a value of the given type. Decode(reflect.Type, []byte) (any, error) }
Decoder is a uniform custom decoder interface. It wraps various forms of reflectx.Funcs.
type Discard ¶
type Discard struct { // UID is the unit identifier. UID UnitID }
Discard silently discard all elements. It is implicitly inserted for any loose ends in the pipeline.
func (*Discard) ProcessElement ¶
func (*Discard) StartBundle ¶
type ElementDecoder ¶
type ElementDecoder interface { // Decode deserializes a value from the given reader. Decode(io.Reader) (*FullValue, error) // DecodeTo deserializes a value from the given reader into the provided FullValue. DecodeTo(io.Reader, *FullValue) error }
ElementDecoder handles FullValue deserialization from a byte stream. The decoder can be reused, even if an error is encountered.
func MakeElementDecoder ¶
func MakeElementDecoder(c *coder.Coder) ElementDecoder
MakeElementDecoder returns a ElementDecoder for the given coder. It panics if given an unknown coder, or a coder with stream types, such as GBK.
type ElementEncoder ¶
type ElementEncoder interface { // Encode serializes the given value to the writer. Encode(*FullValue, io.Writer) error }
ElementEncoder handles FullValue serialization to a byte stream. The encoder can be reused, even if an error is encountered.
func MakeElementEncoder ¶
func MakeElementEncoder(c *coder.Coder) ElementEncoder
MakeElementEncoder returns a ElementCoder for the given coder. It panics if given an unknown coder, or a coder with stream types, such as GBK.
type ElementProcessor ¶
type ElementProcessor interface { // Call processes a single element. If GBK or CoGBK result, the values // are populated. Otherwise, they're empty. // The *FullValue is owned by the caller, and is not safe to cache. ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error }
ElementProcessor presents a component that can process an element.
type Elements ¶
Elements holds data or timers sent across the data channel. If TimerFamilyID is populated, it's a timer, otherwise it's data elements.
type Encoder ¶
type Encoder interface { // Encode encodes the given value (of the given type). Encode(reflect.Type, any) ([]byte, error) }
Encoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.
type Expand ¶
type Expand struct { // UID is the unit identifier. UID UnitID ValueDecoders []ElementDecoder Out Node }
func (*Expand) ProcessElement ¶
func (*Expand) StartBundle ¶
type ExtractOutput ¶
type ExtractOutput struct {
*Combine
}
ExtractOutput is an executor for extracting output from a lifted combine.
func (*ExtractOutput) ProcessElement ¶
func (n *ExtractOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
ProcessElement accepts an accumulator value, and extracts the final return type from it.
func (*ExtractOutput) String ¶
func (n *ExtractOutput) String() string
type FixedKey ¶
type FixedKey struct { // UID is the unit identifier. UID UnitID // Key is the given key Key any // Out is the successor node. Out Node }
FixedKey transform any value into KV<K, V> for a fixed K.
func (*FixedKey) ProcessElement ¶
func (*FixedKey) StartBundle ¶
type FixedReStream ¶
type FixedReStream struct {
Buf []FullValue
}
FixedReStream is a simple in-memory ReStream.
func (*FixedReStream) Open ¶
func (n *FixedReStream) Open() (Stream, error)
Open returns the Stream from the start of the in-memory ReStream.
type FixedStream ¶
type FixedStream struct { Buf []FullValue // contains filtered or unexported fields }
FixedStream is a simple in-memory Stream from a fixed array.
func (*FixedStream) Close ¶
func (s *FixedStream) Close() error
Close releases the buffer, closing the stream.
func (*FixedStream) Read ¶
func (s *FixedStream) Read() (*FullValue, error)
Read produces the next value in the stream.
type Flatten ¶
type Flatten struct { // UID is the unit identifier. UID UnitID // N is the number of incoming edges. N int // Out is the output node. Out Node // contains filtered or unexported fields }
Flatten is a fan-in node. It ensures that Start/FinishBundle are only called once downstream.
func (*Flatten) ProcessElement ¶
func (*Flatten) StartBundle ¶
type FullValue ¶
type FullValue struct { Elm any // Element or KV key. Elm2 any // KV value, if not invalid Timestamp typex.EventTime Windows []typex.Window Pane typex.PaneInfo Continuation sdf.ProcessContinuation }
FullValue represents the full runtime value for a data element, incl. the implicit context. The result of a GBK or CoGBK is not a single FullValue. The consumer is responsible for converting the values to the correct type. To represent a nested KV with FullValues, assign a *FullValue to Elm/Elm2.
func Invoke
deprecated
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) (*FullValue, error)
Invoke invokes the fn with the given values. The extra values must match the non-main side input and emitters. It returns the direct output, if any.
Deprecated: prefer InvokeWithOpts
func InvokeWithOpts ¶
func InvokeWithOpts(ctx context.Context, fn *funcx.Fn, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error)
InvokeWithOpts invokes the fn with the given values. The extra values must match the non-main side input and emitters. It returns the direct output, if any.
func InvokeWithOptsWithoutEventTime ¶
func InvokeWithOptsWithoutEventTime(ctx context.Context, fn *funcx.Fn, opts InvokeOpts) (*FullValue, error)
InvokeWithOptsWithoutEventTime runs the given function at time 0 in the global window.
func InvokeWithoutEventTime
deprecated
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error)
InvokeWithoutEventTime runs the given function at time 0 in the global window.
Deprecated: prefer InvokeWithOptsWithoutEventTime
type GenID ¶
type GenID struct {
// contains filtered or unexported fields
}
GenID is a simple UnitID generator.
type Inject ¶
type Inject struct { // UID is the unit identifier. UID UnitID // N is the index (tag) in the union. N int // ValueCoder is the encoder for the value part of the incoming KV<K,V>. ValueEncoder ElementEncoder // Out is the successor node. Out Node }
Inject injects the predecessor index into each FullValue and encodes the value, effectively converting KV<X,Y> into KV<X,KV<int,[]byte>>. Used in combination with Expand.
func (*Inject) ProcessElement ¶
func (*Inject) StartBundle ¶
type InvokeOpts ¶
type InvokeOpts struct {
// contains filtered or unexported fields
}
InvokeOpts are optional parameters to invoke a Fn.
type LiftedCombine ¶
type LiftedCombine struct { *Combine KeyCoder *coder.Coder WindowCoder *coder.WindowCoder // contains filtered or unexported fields }
LiftedCombine is an executor for combining values before grouping by keys for a lifted combine. Partially groups values by key within a bundle, accumulating them in an in memory cache, before emitting them in the FinishBundle step.
func (*LiftedCombine) Down ¶
func (n *LiftedCombine) Down(ctx context.Context) error
Down tears down the cache.
func (*LiftedCombine) FinishBundle ¶
func (n *LiftedCombine) FinishBundle(ctx context.Context) error
FinishBundle iterates through the cached (key, accumulator) pairs, and then processes the value in the bundle as normal.
func (*LiftedCombine) ProcessElement ¶
func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
ProcessElement takes a KV pair and combines values with the same key into an accumulator, caching them until the bundle is complete. If the cache grows too large, a random eviction policy is used.
func (*LiftedCombine) StartBundle ¶
func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle initializes the in memory cache of keys to accumulators.
func (*LiftedCombine) String ¶
func (n *LiftedCombine) String() string
type MapWindows ¶
type MapWindows struct { UID UnitID Fn WindowMapper Out Node FnUrn string // Keep the urn for debugging purposes. }
MapWindows maps each element window from a main input window space to window from a side input window space.
func (*MapWindows) FinishBundle ¶
func (m *MapWindows) FinishBundle(ctx context.Context) error
FinishBundle propagates finish bundle to downstream nodes.
func (*MapWindows) ProcessElement ¶
func (*MapWindows) StartBundle ¶
func (m *MapWindows) StartBundle(ctx context.Context, id string, data DataContext) error
func (*MapWindows) String ¶
func (m *MapWindows) String() string
type MergeAccumulators ¶
type MergeAccumulators struct {
*Combine
}
MergeAccumulators is an executor for merging accumulators from a lifted combine.
func (*MergeAccumulators) ProcessElement ¶
func (n *MergeAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error
ProcessElement accepts a stream of accumulator values with the same key and runs the MergeAccumulatorsFn over them repeatedly.
func (*MergeAccumulators) String ¶
func (n *MergeAccumulators) String() string
type Multiplex ¶
type Multiplex struct { // UID is the unit identifier. UID UnitID // Out is a list of output nodes. Out []Node }
Multiplex is a fan-out node. It simply forwards any call to all downstream nodes.
func (*Multiplex) ProcessElement ¶
func (*Multiplex) StartBundle ¶
type Node ¶
type Node interface { Unit ElementProcessor }
Node represents an single-bundle processing unit. Each node contains its processing continuation, notably other nodes.
type PCollection ¶
type PCollection struct { UID UnitID PColID string Out Node // Out is the consumer of this PCollection. Coder *coder.Coder WindowCoder *coder.WindowCoder Seed int64 // contains filtered or unexported fields }
PCollection is a passthrough node to collect PCollection metrics, and must be placed as the Out node of any producer of a PCollection.
In particular, must not be placed after a Multiplex, and must be placed after a Flatten.
func (*PCollection) FinishBundle ¶
func (p *PCollection) FinishBundle(ctx context.Context) error
FinishBundle propagates bundle termination.
func (*PCollection) ProcessElement ¶
ProcessElement increments the element count and sometimes takes size samples of the elements.
func (*PCollection) StartBundle ¶
func (p *PCollection) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle resets collected metrics for this PCollection, and propagates bundle start.
func (*PCollection) String ¶
func (p *PCollection) String() string
type PCollectionSnapshot ¶
type PCollectionSnapshot struct { ID string ElementCount int64 // If SizeCount is zero, then no size metrics should be exported. SizeCount, SizeSum, SizeMin, SizeMax int64 }
PCollectionSnapshot contains the PCollectionID
type PairWithRestriction ¶
type PairWithRestriction struct { UID UnitID Fn *graph.DoFn Out Node // contains filtered or unexported fields }
PairWithRestriction is an executor for the expanded SDF step of the same name. This is the first step of an expanded SDF. It pairs each main input element with a restriction via the SDF's associated sdf.RestrictionProvider. This step is followed by SplitAndSizeRestrictions.
func (*PairWithRestriction) Down ¶
func (n *PairWithRestriction) Down(_ context.Context) error
Down currently does nothing.
func (*PairWithRestriction) FinishBundle ¶
func (n *PairWithRestriction) FinishBundle(ctx context.Context) error
FinishBundle resets the invokers.
func (*PairWithRestriction) ID ¶
func (n *PairWithRestriction) ID() UnitID
ID returns the UnitID for this unit.
func (*PairWithRestriction) ProcessElement ¶
func (n *PairWithRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
ProcessElement expects elm to be the main input to the ParDo. See exec.FullValue for more details on the expected input.
ProcessElement creates an initial restriction representing the entire input. The output is in the structure <elem, restriction>, where elem is the main input originally passed in (i.e. the parameter elm). Windows and Timestamp are copied to the outer *FullValue. They can be left within the original element, but won't be used by later SDF steps.
Output Diagram:
*FullValue { Elm: *FullValue (original input) Elm2: *FullValue { Elm: Restriction Elm2: Watermark estimator state } Windows Timestamps }
func (*PairWithRestriction) StartBundle ¶
func (n *PairWithRestriction) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle currently does nothing.
func (*PairWithRestriction) String ¶
func (n *PairWithRestriction) String() string
String outputs a human-readable description of this transform.
type ParDo ¶
type ParDo struct { UID UnitID Fn *graph.DoFn Inbound []*graph.Inbound Side []SideInputAdapter UState UserStateAdapter TimerTracker *userTimerAdapter Out []Node PID string // contains filtered or unexported fields }
ParDo is a DoFn executor.
func (*ParDo) AttachFinalizer ¶
func (n *ParDo) AttachFinalizer(bf *bundleFinalizer)
func (*ParDo) FinishBundle ¶
FinishBundle does post-bundle processing operations for the DoFn. Note: This is not a "FinalizeBundle" operation. Data is not yet durably persisted at this point.
func (*ParDo) HasOnTimer ¶
HasOnTimer returns if this ParDo wraps a DoFn that has an OnTimer method.
func (*ParDo) ProcessElement ¶
ProcessElement processes each parallel element with the DoFn.
func (*ParDo) ProcessTimers ¶
ProcessTimers processes all timers in firing order from the runner for a timer family ID.
A timer refers to a specific combination of Key+Window + Family + Tag. They also have a fireing time, and a data watermark hold time. The SDK doesn't determine if a timer is ready to fire or not, that's up to the runner.
This method fires timers in the order from the runner. During this process, the user code may set additional firings for one or more timers, which may overwrite orderings from the runner.
In particular, if runner sent timer produces a new firing that is earlier than a 2nd runner sent timer, then it is processed before that 2nd timer. This will override any subsequent firing of the same timer, and as a result, must add a clear to the set of timer modifications.
func (*ParDo) StartBundle ¶
StartBundle does pre-bundle processing operation for the DoFn.
type Plan ¶
type Plan struct {
// contains filtered or unexported fields
}
Plan represents the bundle execution plan. It will generally be constructed from a part of a pipeline. A plan can be used to process multiple bundles serially.
func UnmarshalPlan ¶
func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor, dataSampler *DataSampler) (*Plan, error)
UnmarshalPlan converts a model bundle descriptor into an execution Plan.
func (*Plan) Checkpoint ¶
func (p *Plan) Checkpoint() []*Checkpoint
Checkpoint attempts to split an SDF if the DoFn self-checkpointed.
func (*Plan) Execute ¶
Execute executes the plan with the given data context and bundle id. Units are brought up on the first execution. If a bundle fails, the plan cannot be reused for further bundles. Does not panic. Blocking.
func (*Plan) Finalize ¶
Finalize runs any callbacks registered by the bundleFinalizer. Should be run on bundle finalization.
func (*Plan) GetExpirationTime ¶
GetExpirationTime returns the last expiration time of any of the callbacks registered by the bundleFinalizer. Once we have passed this time, it is safe to move this plan to inactive without missing any valid callbacks.
func (*Plan) Progress ¶
func (p *Plan) Progress() (PlanSnapshot, bool)
Progress returns a snapshot of progress of the plan, and associated metrics. The retuend boolean indicates whether the plan includes a DataSource, which is important for handling legacy metrics. This boolean will be removed once we no longer return legacy metrics.
func (*Plan) SourcePTransformID ¶
SourcePTransformID returns the ID of the data's origin PTransform.
func (*Plan) Split ¶
func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error)
Split takes a set of potential split indexes, and if successful returns the split result. Returns an error when unable to split.
type PlanSnapshot ¶
type PlanSnapshot struct { Source ProgressReportSnapshot PCols []PCollectionSnapshot }
PlanSnapshot contains system metrics for the current run of the plan.
type Port ¶
type Port struct {
URL string
}
Port represents the connection port of external operations.
type ProcessSizedElementsAndRestrictions ¶
type ProcessSizedElementsAndRestrictions struct { PDo *ParDo TfId string // Transform ID. Needed for splitting. // SU is a buffered channel for indicating when this unit is splittable. // When this unit is processing an element, it sends a SplittableUnit // interface through the channel. That interface can be received on other // threads and used to perform splitting or other related operation. // // This channel should be received on in a non-blocking manner, to avoid // hanging if no element is processing. // // Receiving the SplittableUnit prevents the current element from finishing // processing, so the element does not unexpectedly change during a split. // Therefore, receivers of the SplittableUnit must send it back through the // channel once finished with it, or it will block indefinitely. SU chan SplittableUnit // contains filtered or unexported fields }
ProcessSizedElementsAndRestrictions is an executor for the expanded SDF step of the same name. It is the final step of the expanded SDF. It sets up and invokes the user's SDF methods, similar to exec.ParDo but with slight changes to support the SDF's method signatures and the expected structure of the FullValue being received.
func (*ProcessSizedElementsAndRestrictions) AttachFinalizer ¶
func (n *ProcessSizedElementsAndRestrictions) AttachFinalizer(bf *bundleFinalizer)
func (*ProcessSizedElementsAndRestrictions) Checkpoint ¶
func (n *ProcessSizedElementsAndRestrictions) Checkpoint(ctx context.Context) ([]*FullValue, error)
Checkpoint splits the remaining work in a restriction into residuals to be resumed later by the runner. This is done iff the underlying Splittable DoFn returns a resuming ProcessContinuation. If the split occurs and the primary restriction is marked as done my the RTracker, the Checkpoint fails as this is a potential data-loss case.
func (*ProcessSizedElementsAndRestrictions) Down ¶
func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error
Down calls the ParDo's Down method.
func (*ProcessSizedElementsAndRestrictions) FinishBundle ¶
func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) error
FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.
func (*ProcessSizedElementsAndRestrictions) GetInputId ¶
func (n *ProcessSizedElementsAndRestrictions) GetInputId() string
GetInputId returns the main input ID, since main input elements are being split.
func (*ProcessSizedElementsAndRestrictions) GetOutputWatermark ¶
func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() map[string]*timestamppb.Timestamp
GetOutputWatermark gets the current output watermark of the splittable unit if one is defined, or returns nil otherwise.
func (*ProcessSizedElementsAndRestrictions) GetProgress ¶
func (n *ProcessSizedElementsAndRestrictions) GetProgress() float64
GetProgress returns the current restriction tracker's progress as a fraction. This implementation accounts for progress across windows in window-observing DoFns, so 1.0 is only returned once all windows have been processed.
func (*ProcessSizedElementsAndRestrictions) GetTransformId ¶
func (n *ProcessSizedElementsAndRestrictions) GetTransformId() string
GetTransformId returns this transform's transform ID.
func (*ProcessSizedElementsAndRestrictions) ID ¶
func (n *ProcessSizedElementsAndRestrictions) ID() UnitID
ID calls the ParDo's ID method.
func (*ProcessSizedElementsAndRestrictions) ProcessElement ¶
func (n *ProcessSizedElementsAndRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
ProcessElement expects the same structure as the output of SplitAndSizeRestrictions, approximately <<elem, <restriction,watermark estimator state>>, size>. The only difference is that if the input was decoded in between the two steps, then single-element inputs were lifted from the *FullValue they were stored in.
Input Diagram:
*FullValue { Elm: *FullValue { Elm: *FullValue (KV input) or InputType (single-element input) Elm2: *FullValue { Elm: Restriction Elm2: Watermark estimator state } } Elm2: float64 (size) Windows Timestamps }
ProcessElement then creates a restriction tracker from the stored restriction and processes each element using the underlying ParDo and adding the restriction tracker to the normal invocation. Sizing information is present but currently ignored. Output is forwarded to the underlying ParDo's outputs.
func (*ProcessSizedElementsAndRestrictions) Split ¶
func (n *ProcessSizedElementsAndRestrictions) Split(ctx context.Context, f float64) ([]*FullValue, []*FullValue, error)
Split splits the currently processing element using its restriction tracker. Then it returns zero or more primaries and residuals, following the expected input structure to this unit, including updating the size of the split elements.
This implementation of Split considers whether windows are being exploded for window-observing DoFns, and has significantly different behavior if windows need to be taken into account. For implementation details on when each case occurs and the implementation details, see the documentation for the singleWindowSplit and multiWindowSplit methods.
func (*ProcessSizedElementsAndRestrictions) StartBundle ¶
func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle calls the ParDo's StartBundle method.
func (*ProcessSizedElementsAndRestrictions) String ¶
func (n *ProcessSizedElementsAndRestrictions) String() string
String outputs a human-readable description of this transform.
type ProgressReportSnapshot ¶
type ProgressReportSnapshot struct {
ID, Name string
Count int64
ConsumingReceivedData bool
// contains filtered or unexported fields
}
ProgressReportSnapshot captures the progress reading an input source.
type ReshuffleInput ¶
type ReshuffleInput struct { UID UnitID SID StreamID Coder *coder.Coder // Coder for the input PCollection. Seed int64 Out Node // contains filtered or unexported fields }
ReshuffleInput is a Node.
func (*ReshuffleInput) Down ¶
func (n *ReshuffleInput) Down(ctx context.Context) error
Down is a no-op.
func (*ReshuffleInput) FinishBundle ¶
func (n *ReshuffleInput) FinishBundle(ctx context.Context) error
FinishBundle propagates finish bundle, and clears cached state.
func (*ReshuffleInput) ProcessElement ¶
func (*ReshuffleInput) StartBundle ¶
func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle is a no-op.
func (*ReshuffleInput) String ¶
func (n *ReshuffleInput) String() string
type ReshuffleOutput ¶
type ReshuffleOutput struct { UID UnitID SID StreamID Coder *coder.Coder // Coder for the receiving PCollection. Out Node // contains filtered or unexported fields }
ReshuffleOutput is a Node.
func (*ReshuffleOutput) Down ¶
func (n *ReshuffleOutput) Down(ctx context.Context) error
Down is a no-op.
func (*ReshuffleOutput) FinishBundle ¶
func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error
FinishBundle propagates finish bundle to downstream nodes.
func (*ReshuffleOutput) ProcessElement ¶
func (*ReshuffleOutput) StartBundle ¶
func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle is a no-op.
func (*ReshuffleOutput) String ¶
func (n *ReshuffleOutput) String() string
type ReusableEmitter ¶
type ReusableEmitter interface { // Init resets the value. Can be called multiple times. Init(ctx context.Context, ws []typex.Window, t typex.EventTime) error // Value returns the side input value. Constant value. Value() any }
ReusableEmitter is a resettable value needed to hold the implicit context and emit event time.
type ReusableInput ¶
type ReusableInput interface { // Init initializes the value before use. Init() error // Value returns the side input value. Value() any // Reset resets the value after use. Reset() error }
ReusableInput is a resettable value, notably used to unwind iterators cheaply and cache materialized side input across invocations.
type ReusableTimestampObservingWatermarkEmitter ¶
type ReusableTimestampObservingWatermarkEmitter interface { ReusableEmitter AttachEstimator(est *sdf.WatermarkEstimator) }
ReusableTimestampObservingWatermarkEmitter is a resettable value needed to hold the implicit context and emit event time. It also has the ability to have a watermark estimator attached.
type Root ¶
type Root interface { Unit // Process processes the entire source, notably emitting elements to // downstream nodes. Process(ctx context.Context) ([]*Checkpoint, error) }
Root represents a root processing unit. It contains its processing continuation, notably other nodes.
type SdfFallback ¶
type SdfFallback struct { PDo *ParDo // contains filtered or unexported fields }
SdfFallback is an executor used when an SDF isn't expanded into steps by the runner, indicating that the runner doesn't support splitting. It executes all the SDF steps together in one unit.
func (*SdfFallback) AttachFinalizer ¶
func (n *SdfFallback) AttachFinalizer(bf *bundleFinalizer)
func (*SdfFallback) Down ¶
func (n *SdfFallback) Down(ctx context.Context) error
Down calls the ParDo's Down method.
func (*SdfFallback) FinishBundle ¶
func (n *SdfFallback) FinishBundle(ctx context.Context) error
FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.
func (*SdfFallback) ProcessElement ¶
ProcessElement performs all the work from the steps above in one transform. This means creating initial restrictions, performing initial splits on those restrictions, and then creating restriction trackers and processing each restriction with the underlying ParDo. This executor skips the sizing step because sizing information is unnecessary for unexpanded SDFs.
func (*SdfFallback) StartBundle ¶
func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle calls the ParDo's StartBundle method.
func (*SdfFallback) String ¶
func (n *SdfFallback) String() string
String outputs a human-readable description of this transform.
type SideCache ¶
type SideCache interface { // QueryCache checks the cache for a ReStream corresponding to the transform and // side input being used. QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) ReStream // SetCache places a ReStream into the cache for a transform and side input. SetCache(ctx context.Context, transformID, sideInputID string, win, key []byte, input ReStream) ReStream }
SideCache manages cached ReStream values for side inputs that can be re-used across bundles.
type SideInputAdapter ¶
type SideInputAdapter interface { NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error) NewKeyedIterable(ctx context.Context, reader StateReader, w typex.Window, iterKey any) (ReStream, error) }
SideInputAdapter provides a concrete ReStream from a low-level side input reader. It encapsulates StreamID and coding as needed.
func NewSideInputAdapter ¶
func NewSideInputAdapter(sid StreamID, sideInputID string, c *coder.Coder, wm WindowMapper) SideInputAdapter
NewSideInputAdapter returns a side input adapter for the given StreamID and coder. It expects a W<V> or W<KV<K,V>> coder, because the protocol requires windowing information.
type SplitAndSizeRestrictions ¶
type SplitAndSizeRestrictions struct { UID UnitID Fn *graph.DoFn Out Node // contains filtered or unexported fields }
SplitAndSizeRestrictions is an executor for the expanded SDF step of the same name. It is the second step of the expanded SDF, occuring after CreateInitialRestriction. It performs initial splits on the initial restrictions and adds sizing information, producing one or more output elements per input element. This step is followed by ProcessSizedElementsAndRestrictions.
func (*SplitAndSizeRestrictions) Down ¶
func (n *SplitAndSizeRestrictions) Down(_ context.Context) error
Down currently does nothing.
func (*SplitAndSizeRestrictions) FinishBundle ¶
func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error
FinishBundle resets the invokers.
func (*SplitAndSizeRestrictions) ID ¶
func (n *SplitAndSizeRestrictions) ID() UnitID
ID returns the UnitID for this unit.
func (*SplitAndSizeRestrictions) ProcessElement ¶
func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
ProcessElement expects elm.Elm to hold the original input while elm.Elm2 contains the restriction.
Input Diagram:
*FullValue { Elm: *FullValue (original input) Elm2: *FullValue { Elm: Restriction Elm2: Watermark estimator state } Windows Timestamps }
ProcessElement splits the given restriction into one or more restrictions and then sizes each. The outputs are in the structure <<elem, <restriction, watermark estimator state>>, size> where elem is the original main input to the unexpanded SDF. Windows and Timestamps are copied to each split output.
Output Diagram:
*FullValue { Elm: *FullValue { Elm: *FullValue (original input) Elm2: *FullValue { Elm: Restriction Elm2: Watermark estimator state } } Elm2: float64 (size) Windows Timestamps }
func (*SplitAndSizeRestrictions) StartBundle ¶
func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle currently does nothing.
func (*SplitAndSizeRestrictions) String ¶
func (n *SplitAndSizeRestrictions) String() string
String outputs a human-readable description of this transform.
type SplitPoints ¶
type SplitPoints struct { // Splits is a list of desired split indices. Splits []int64 Frac float64 // Estimated total number of elements (including unsent) for the source. // A zero value indicates unknown, instead use locally known size. BufSize int64 }
SplitPoints captures the split requested by the Runner.
type SplitResult ¶
type SplitResult struct { Unsuccessful bool // Indicates the split was unsuccessful. // Indices are always included, for both channel and sub-element splits. PI int64 // Primary index, last element of the primary. RI int64 // Residual index, first element of the residual. // Extra information included for sub-element splits. If PS and RS are // present then a sub-element split occurred. PS [][]byte // Primary splits. If an element is split, these are the encoded primaries. RS [][]byte // Residual splits. If an element is split, these are the encoded residuals. TId string // Transform ID of the transform receiving the split elements. InId string // Input ID of the input the split elements are received from. OW map[string]*timestamppb.Timestamp // Map of outputs to output watermark for the plan being split }
SplitResult contains the result of performing a split on a Plan.
type SplittableUnit ¶
type SplittableUnit interface { // Split performs a split on a fraction of a currently processing element // and returns zero or more primaries and residuals resulting from it, or an // error if the split failed. // // Zero primaries/residuals can be returned if the split succeeded but // resulted in no change. In this case, an empty slice is returned. // // More than one primary/residual can happen if the split result cannot be // fully represented in just one. Split(ctx context.Context, fraction float64) (primaries, residuals []*FullValue, err error) // Checkpoint performs a split at fraction 0.0 of an element that has stopped // processing and has work that needs to be resumed later. This function will // check that the produced primary restriction from the split represents // completed work to avoid data loss and will error if work remains. Checkpoint(ctx context.Context) (residuals []*FullValue, err error) // GetProgress returns the fraction of progress the current element has // made in processing. (ex. 0.0 means no progress, and 1.0 means fully // processed.) GetProgress() float64 // GetTransformId returns the transform ID of the splittable unit. GetTransformId() string // GetInputId returns the local input ID of the input that the element being // split was received from. GetInputId() string // GetOutputWatermark gets the current output watermark of the splittable unit // if one is defined, or nil otherwise. GetOutputWatermark() map[string]*timestamppb.Timestamp }
SplittableUnit is an interface that defines sub-element splitting operations for a unit, and provides access to them on other threads.
type StateReader ¶
type StateReader interface { // OpenIterableSideInput opens a byte stream for reading iterable side input. OpenIterableSideInput(ctx context.Context, id StreamID, sideInputID string, w []byte) (io.ReadCloser, error) // OpenMultiMapSideInput opens a byte stream for reading multimap side input. OpenMultiMapSideInput(ctx context.Context, id StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error) // OpenIterable opens a byte stream for reading unwindowed iterables from the runner. OpenIterable(ctx context.Context, id StreamID, key []byte) (io.ReadCloser, error) // OpenBagUserStateReader opens a byte stream for reading user bag state. OpenBagUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error) // OpenBagUserStateAppender opens a byte stream for appending user bag state. OpenBagUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) // OpenBagUserStateClearer opens a byte stream for clearing user bag state. OpenBagUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) // OpenMultimapUserStateReader opens a byte stream for reading user multimap state. OpenMultimapUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.ReadCloser, error) // OpenMultimapUserStateAppender opens a byte stream for appending user multimap state. OpenMultimapUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error) // OpenMultimapUserStateClearer opens a byte stream for clearing user multimap state by key. OpenMultimapUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error) // OpenMultimapKeysUserStateReader opens a byte stream for reading the keys of user multimap state. OpenMultimapKeysUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error) // OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state. OpenMultimapKeysUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) // GetSideInputCache returns the SideInputCache being used at the harness level. GetSideInputCache() SideCache }
StateReader is the interface for reading side input data.
type Stream ¶
Stream is a FullValue reader. It returns io.EOF when complete, but can be prematurely closed.
type StreamID ¶
StreamID represents the static information needed to identify a data stream. Dynamic information, notably bundleID, is provided implicitly by the managers.
type TimerRecv ¶
type TimerRecv struct { Key *FullValue KeyString string // The bytes for the key to avoid re-encoding key for lookups. Windows []typex.Window Pane typex.PaneInfo timers.TimerMap // embed common information from set parameter. }
TimerRecv holds the timer metadata while encoding and decoding timers in exec unit.
For SDK internal use, and subject to change.
type ToString ¶
type ToString struct { // UID is the unit identifier. UID UnitID // Out is the output node. Out Node }
func (*ToString) ProcessElement ¶
func (*ToString) StartBundle ¶
type TruncateSizedRestriction ¶
type TruncateSizedRestriction struct { UID UnitID Fn *graph.DoFn Out Node // contains filtered or unexported fields }
TruncateSizedRestriction is an executor for the expanded SDF step of the same name. This step is added to the expanded SDF when the runner signals to drain the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
func (*TruncateSizedRestriction) Down ¶
func (n *TruncateSizedRestriction) Down(_ context.Context) error
Down currently does nothing.
func (*TruncateSizedRestriction) FinishBundle ¶
func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error
FinishBundle resets the invokers.
func (*TruncateSizedRestriction) ID ¶
func (n *TruncateSizedRestriction) ID() UnitID
ID return the UnitID for this unit.
func (*TruncateSizedRestriction) ProcessElement ¶
func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error
ProcessElement gets input elm as: Input Diagram:
*FullValue { Elm: *FullValue { -- mainElm Elm: *FullValue (original input) -- inp Elm2: *FullValue { Elm: Restriction -- rest Elm2: Watermark estimator state } } Elm2: float64 (size) Windows Timestamps }
Output Diagram:
*FullValue { Elm: *FullValue { Elm: *FullValue (original input) Elm2: *FullValue { Elm: Restriction Elm2: Watermark estimator state } } Elm2: float64 (size) Windows Timestamps }
func (*TruncateSizedRestriction) StartBundle ¶
func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle currently does nothing.
func (*TruncateSizedRestriction) String ¶
func (n *TruncateSizedRestriction) String() string
String outputs a human-readable description of this transform.
type Unit ¶
type Unit interface { // ID returns the unit ID. ID() UnitID // Up initializes the unit. It is separate from Unit construction to // make panic/error handling easier. Up(ctx context.Context) error // StartBundle signals that processing preconditions, such as availability // of side input, are met and starts the given bundle. StartBundle(ctx context.Context, id string, data DataContext) error // FinishBundle signals end of input and thus finishes the bundle. Any // data connections must be closed. FinishBundle(ctx context.Context) error // Down tears down the processing node. It is notably called if the unit // or plan encounters an error and must thus robustly handle cleanup of // unfinished bundles. If a unit itself (as opposed to downstream units) // is the cause of breakage, the error returned should indicate the root // cause. Down(ctx context.Context) error }
Unit represents a processing unit capable of processing multiple bundles serially. Units are not required to be concurrency-safe. Each unit is responsible for propagating each data processing call downstream, i.e., all calls except Up/Down, as appropriate.
type UserStateAdapter ¶
type UserStateAdapter interface {
NewStateProvider(ctx context.Context, reader StateReader, w typex.Window, element any) (stateProvider, error)
}
UserStateAdapter provides a state provider to be used for user state.
func NewUserStateAdapter ¶
func NewUserStateAdapter(sid StreamID, c *coder.Coder, stateIDToCoder map[string]*coder.Coder, stateIDToKeyCoder map[string]*coder.Coder, stateIDToCombineFn map[string]*graph.CombineFn) UserStateAdapter
NewUserStateAdapter returns a user state adapter for the given StreamID and coder. It expects a W<V> or W<KV<K,V>> coder, because the protocol requires windowing information.
type WindowDecoder ¶
type WindowDecoder interface { // Decode deserializes a value from the given reader. Decode(io.Reader) ([]typex.Window, error) // DecodeSingle decodes a single window from the given reader. DecodeSingle(io.Reader) (typex.Window, error) }
WindowDecoder handles Window deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeWindowDecoder ¶
func MakeWindowDecoder(c *coder.WindowCoder) WindowDecoder
MakeWindowDecoder returns a WindowDecoder for the given window coder.
type WindowEncoder ¶
type WindowEncoder interface { // Encode serializes the given value to the writer. Encode([]typex.Window, io.Writer) error EncodeSingle(typex.Window, io.Writer) error }
WindowEncoder handles Window serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeWindowEncoder ¶
func MakeWindowEncoder(c *coder.WindowCoder) WindowEncoder
MakeWindowEncoder returns a WindowEncoder for the given window coder.
type WindowInto ¶
WindowInto places each element in one or more windows.
func (*WindowInto) FinishBundle ¶
func (w *WindowInto) FinishBundle(ctx context.Context) error
func (*WindowInto) ProcessElement ¶
func (*WindowInto) StartBundle ¶
func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContext) error
func (*WindowInto) String ¶
func (w *WindowInto) String() string
Source Files ¶
- coder.go
- cogbk.go
- combine.go
- data.go
- datasampler.go
- datasink.go
- datasource.go
- decode.go
- discard.go
- emit.go
- encode.go
- flatten.go
- fn.go
- fn_arity.go
- fullvalue.go
- hash.go
- input.go
- multiplex.go
- pardo.go
- pcollection.go
- plan.go
- reshuffle.go
- sdf.go
- sdf_invokers.go
- sdf_invokers_arity.go
- sideinput.go
- status.go
- timers.go
- to_string.go
- translate.go
- unit.go
- userstate.go
- util.go
- window.go