Documentation ¶
Overview ¶
Package exec contains runtime plan representation and execution. A pipeline must be translated to a runtime plan to be executed.
Index ¶
- func Convert(v interface{}, to reflect.Type) interface{}
- func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error)
- func EncodeElement(c ElementEncoder, val interface{}) ([]byte, error)
- func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error)
- func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, w io.Writer) error
- func MultiFinishBundle(ctx context.Context, list ...Node) error
- func MultiStartBundle(ctx context.Context, id string, data DataContext, list ...Node) error
- func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)
- func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)
- type Combine
- func (n *Combine) Down(ctx context.Context) error
- func (n *Combine) FinishBundle(ctx context.Context) error
- func (n *Combine) ID() UnitID
- func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
- func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Combine) String() string
- func (n *Combine) Up(ctx context.Context) error
- type DataContext
- type DataManager
- type DataSink
- func (n *DataSink) Down(ctx context.Context) error
- func (n *DataSink) FinishBundle(ctx context.Context) error
- func (n *DataSink) ID() UnitID
- func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
- func (n *DataSink) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *DataSink) String() string
- func (n *DataSink) Up(ctx context.Context) error
- type DataSource
- func (n *DataSource) Down(ctx context.Context) error
- func (n *DataSource) FinishBundle(ctx context.Context) error
- func (n *DataSource) ID() UnitID
- func (n *DataSource) Process(ctx context.Context) error
- func (n *DataSource) Progress() ProgressReportSnapshot
- func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *DataSource) String() string
- func (n *DataSource) Up(ctx context.Context) error
- type Decoder
- type Discard
- func (d *Discard) Down(ctx context.Context) error
- func (d *Discard) FinishBundle(ctx context.Context) error
- func (d *Discard) ID() UnitID
- func (d *Discard) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
- func (d *Discard) StartBundle(ctx context.Context, id string, data DataContext) error
- func (d *Discard) String() string
- func (d *Discard) Up(ctx context.Context) error
- type ElementDecoder
- type ElementEncoder
- type ElementProcessor
- type Encoder
- type Expand
- func (n *Expand) Down(ctx context.Context) error
- func (n *Expand) FinishBundle(ctx context.Context) error
- func (n *Expand) ID() UnitID
- func (n *Expand) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (n *Expand) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Expand) String() string
- func (n *Expand) Up(ctx context.Context) error
- type ExtractOutput
- type FixedKey
- func (n *FixedKey) Down(ctx context.Context) error
- func (n *FixedKey) FinishBundle(ctx context.Context) error
- func (n *FixedKey) ID() UnitID
- func (n *FixedKey) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (n *FixedKey) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *FixedKey) String() string
- func (n *FixedKey) Up(ctx context.Context) error
- type FixedReStream
- type FixedStream
- type Flatten
- func (m *Flatten) Down(ctx context.Context) error
- func (m *Flatten) FinishBundle(ctx context.Context) error
- func (m *Flatten) ID() UnitID
- func (m *Flatten) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (m *Flatten) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *Flatten) String() string
- func (m *Flatten) Up(ctx context.Context) error
- type FullValue
- type GenID
- type Inject
- func (n *Inject) Down(ctx context.Context) error
- func (n *Inject) FinishBundle(ctx context.Context) error
- func (n *Inject) ID() UnitID
- func (n *Inject) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (n *Inject) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *Inject) String() string
- func (n *Inject) Up(ctx context.Context) error
- type LiftedCombine
- func (n *LiftedCombine) Down(ctx context.Context) error
- func (n *LiftedCombine) FinishBundle(ctx context.Context) error
- func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
- func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *LiftedCombine) String() string
- type MainInput
- type MergeAccumulators
- type Multiplex
- func (m *Multiplex) Down(ctx context.Context) error
- func (m *Multiplex) FinishBundle(ctx context.Context) error
- func (m *Multiplex) ID() UnitID
- func (m *Multiplex) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (m *Multiplex) StartBundle(ctx context.Context, id string, data DataContext) error
- func (m *Multiplex) String() string
- func (m *Multiplex) Up(ctx context.Context) error
- type Node
- type ParDo
- func (n *ParDo) Down(ctx context.Context) error
- func (n *ParDo) FinishBundle(ctx context.Context) error
- func (n *ParDo) ID() UnitID
- func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error
- func (n *ParDo) String() string
- func (n *ParDo) Up(ctx context.Context) error
- type Plan
- type Port
- type ProgressReportSnapshot
- type ReStream
- type ReusableEmitter
- type ReusableInput
- type Root
- type SideInputAdapter
- type SideInputReader
- type Status
- type Stream
- type StreamID
- type Target
- type Unit
- type UnitID
- type WindowDecoder
- type WindowEncoder
- type WindowInto
- func (w *WindowInto) Down(ctx context.Context) error
- func (w *WindowInto) FinishBundle(ctx context.Context) error
- func (w *WindowInto) ID() UnitID
- func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
- func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContext) error
- func (w *WindowInto) String() string
- func (w *WindowInto) Up(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Convert ¶
Convert converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.
func DecodeWindowedValueHeader ¶
func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error)
DecodeWindowedValueHeader deserializes a windowed value header.
func EncodeElement ¶
func EncodeElement(c ElementEncoder, val interface{}) ([]byte, error)
EncodeElement is a convenience function for encoding a single element into a byte slice.
func EncodeWindow ¶
func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error)
EncodeWindow is a convenience function for encoding a single window into a byte slice.
func EncodeWindowedValueHeader ¶
func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, w io.Writer) error
EncodeWindowedValueHeader serializes a windowed value header.
func MultiFinishBundle ¶
MultiFinishBundle calls StartBundle on multiple nodes. Convenience function.
func MultiStartBundle ¶
MultiStartBundle calls StartBundle on multiple nodes. Convenience function.
func RegisterEmitter ¶
func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)
RegisterEmitter registers an emitter for the given type, such as "func(int)". If multiple emitters are registered for the same type, the last registration wins.
func RegisterInput ¶
func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)
RegisterInput registers an input handler for the given type, such as "func(*int)bool". If multiple input handlers are registered for the same type, the last registration wins.
Types ¶
type Combine ¶
type Combine struct { UID UnitID Fn *graph.CombineFn UsesKey bool Out Node // contains filtered or unexported fields }
Combine is a Combine executor. Combiners do not have side inputs (or output).
func (*Combine) FinishBundle ¶
FinishBundle completes this node's processing of a bundle.
func (*Combine) ProcessElement ¶
ProcessElement combines elements grouped by key using the CombineFn's AddInput, MergeAccumulators, and ExtractOutput functions.
func (*Combine) StartBundle ¶
StartBundle initializes processing this bundle for combines.
type DataContext ¶
type DataContext struct { Data DataManager SideInput SideInputReader }
DataContext holds connectors to various data connections, incl. state and side input.
type DataManager ¶
type DataManager interface { // OpenRead opens a closable byte stream for reading. OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error) // OpenWrite opens a closable byte stream for writing. OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error) }
DataManager manages external data byte streams. Each data stream can be opened by one consumer only.
type DataSink ¶
type DataSink struct { UID UnitID SID StreamID Coder *coder.Coder // contains filtered or unexported fields }
DataSink is a Node.
func (*DataSink) ProcessElement ¶
func (*DataSink) StartBundle ¶
type DataSource ¶
type DataSource struct { UID UnitID SID StreamID Coder *coder.Coder Out Node // contains filtered or unexported fields }
DataSource is a Root execution unit.
func (*DataSource) FinishBundle ¶
func (n *DataSource) FinishBundle(ctx context.Context) error
func (*DataSource) ID ¶
func (n *DataSource) ID() UnitID
func (*DataSource) Progress ¶
func (n *DataSource) Progress() ProgressReportSnapshot
Progress returns a snapshot of the source's progress.
func (*DataSource) StartBundle ¶
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error
func (*DataSource) String ¶
func (n *DataSource) String() string
type Decoder ¶
type Decoder interface { // Decode decodes the []byte in to a value of the given type. Decode(reflect.Type, []byte) (interface{}, error) }
Decoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.
type Discard ¶
type Discard struct { // UID is the unit identifier. UID UnitID }
Discard silently discard all elements. It is implicitly inserted for any loose ends in the pipeline.
func (*Discard) ProcessElement ¶
func (*Discard) StartBundle ¶
type ElementDecoder ¶
type ElementDecoder interface { // Decode deserializes a value from the given reader. Decode(io.Reader) (FullValue, error) }
ElementDecoder handles FullValue deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeElementDecoder ¶
func MakeElementDecoder(c *coder.Coder) ElementDecoder
MakeElementDecoder returns a ElementDecoder for the given coder. It panics if given a coder with stream types, such as GBK.
type ElementEncoder ¶
type ElementEncoder interface { // Encode serializes the given value to the writer. Encode(FullValue, io.Writer) error }
ElementEncoder handles FullValue serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeElementEncoder ¶
func MakeElementEncoder(c *coder.Coder) ElementEncoder
MakeElementEncoder returns a ElementCoder for the given coder. It panics if given a coder with stream types, such as GBK.
type ElementProcessor ¶
type ElementProcessor interface { // Call processes a single element. If GBK or CoGBK result, the values // are populated. Otherwise, they're empty. ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error }
ElementProcessor presents a component that can process an element.
type Encoder ¶
type Encoder interface { // Encode encodes the given value (of the given type). Encode(reflect.Type, interface{}) ([]byte, error) }
Encoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.
type Expand ¶
type Expand struct { // UID is the unit identifier. UID UnitID ValueDecoders []ElementDecoder Out Node }
func (*Expand) ProcessElement ¶
func (*Expand) StartBundle ¶
type ExtractOutput ¶
type ExtractOutput struct {
*Combine
}
ExtractOutput is an executor for extracting output from a lifted combine.
func (*ExtractOutput) ProcessElement ¶
func (n *ExtractOutput) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
ProcessElement accepts an accumulator value, and extracts the final return type from it.
func (*ExtractOutput) String ¶
func (n *ExtractOutput) String() string
type FixedKey ¶
type FixedKey struct { // UID is the unit identifier. UID UnitID // Key is the given key Key interface{} // Out is the successor node. Out Node }
FixedKey transform any value into KV<K, V> for a fixed K.
func (*FixedKey) ProcessElement ¶
func (*FixedKey) StartBundle ¶
type FixedReStream ¶
type FixedReStream struct {
Buf []FullValue
}
FixedReStream is a simple in-memory ReSteam.
func (*FixedReStream) Open ¶
func (n *FixedReStream) Open() (Stream, error)
type FixedStream ¶
type FixedStream struct { Buf []FullValue // contains filtered or unexported fields }
FixedStream is a simple in-memory Stream from a fixed array.
func (*FixedStream) Close ¶
func (s *FixedStream) Close() error
func (*FixedStream) Read ¶
func (s *FixedStream) Read() (FullValue, error)
type Flatten ¶
type Flatten struct { // UID is the unit identifier. UID UnitID // N is the number of incoming edges. N int // Out is the output node. Out Node // contains filtered or unexported fields }
Flatten is a fan-in node. It ensures that Start/FinishBundle are only called once downstream.
func (*Flatten) ProcessElement ¶
func (*Flatten) StartBundle ¶
type FullValue ¶
type FullValue struct { Elm interface{} // Element or KV key. Elm2 interface{} // KV value, if not invalid Timestamp typex.EventTime Windows []typex.Window }
FullValue represents the full runtime value for a data element, incl. the implicit context. The result of a GBK or CoGBK is not a single FullValue. The consumer is responsible for converting the values to the correct type.
func Invoke ¶
func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)
Invoke invokes the fn with the given values. The extra values must match the non-main side input and emitters. It returns the direct output, if any.
func InvokeWithoutEventTime ¶
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)
InvokeWithoutEventTime runs the given function at time 0 in the global window.
type GenID ¶
type GenID struct {
// contains filtered or unexported fields
}
GenID is a simple UnitID generator.
type Inject ¶
type Inject struct { // UID is the unit identifier. UID UnitID // N is the index (tag) in the union. N int // ValueCoder is the encoder for the value part of the incoming KV<K,V>. ValueEncoder ElementEncoder // Out is the successor node. Out Node }
Inject injects the predecessor index into each FullValue and encodes the value, effectively converting KV<X,Y> into KV<X,KV<int,[]byte>>. Used in combination with Expand.
func (*Inject) ProcessElement ¶
func (*Inject) StartBundle ¶
type LiftedCombine ¶
type LiftedCombine struct { *Combine // contains filtered or unexported fields }
LiftedCombine is an executor for combining values before grouping by keys for a lifted combine. Partially groups values by key within a bundle, accumulating them in an in memory cache, before emitting them in the FinishBundle step.
func (*LiftedCombine) Down ¶
func (n *LiftedCombine) Down(ctx context.Context) error
Down tears down the cache.
func (*LiftedCombine) FinishBundle ¶
func (n *LiftedCombine) FinishBundle(ctx context.Context) error
FinishBundle iterates through the cached (key, accumulator) pairs, and then processes the value in the bundle as normal.
func (*LiftedCombine) ProcessElement ¶
func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
ProcessElement takes a KV pair and combines values with the same into an accumulator, caching them until the bundle is complete.
func (*LiftedCombine) StartBundle ¶
func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error
StartBundle initializes the in memory cache of keys to accumulators.
func (*LiftedCombine) String ¶
func (n *LiftedCombine) String() string
type MergeAccumulators ¶
type MergeAccumulators struct {
*Combine
}
MergeAccumulators is an executor for merging accumulators from a lifted combine.
func (*MergeAccumulators) ProcessElement ¶
func (n *MergeAccumulators) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error
ProcessElement accepts a stream of accumulator values with the same key and runs the MergeAccumulatorsFn over them repeatedly.
func (*MergeAccumulators) String ¶
func (n *MergeAccumulators) String() string
type Multiplex ¶
type Multiplex struct { // UID is the unit identifier. UID UnitID // Out is a list of output nodes. Out []Node }
Multiplex is a fan-out node. It simply forwards any call to all downstream nodes.
func (*Multiplex) ProcessElement ¶
func (*Multiplex) StartBundle ¶
type Node ¶
type Node interface { Unit ElementProcessor }
Node represents an single-bundle processing unit. Each node contains its processing continuation, notably other nodes.
type ParDo ¶
type ParDo struct { UID UnitID Fn *graph.DoFn Inbound []*graph.Inbound Side []SideInputAdapter Out []Node PID string // contains filtered or unexported fields }
ParDo is a DoFn executor.
func (*ParDo) ProcessElement ¶
func (*ParDo) StartBundle ¶
type Plan ¶
type Plan struct {
// contains filtered or unexported fields
}
Plan represents the bundle execution plan. It will generally be constructed from a part of a pipeline. A plan can be used to process multiple bundles serially.
func UnmarshalPlan ¶
func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error)
UnmarshalPlan converts a model bundle descriptor into an execution Plan.
func (*Plan) Execute ¶
Execute executes the plan with the given data context and bundle id. Units are brought up on the first execution. If a bundle fails, the plan cannot be reused for further bundles. Does not panic. Blocking.
type Port ¶
type Port struct {
URL string
}
Port represents the connection port of external operations.
type ProgressReportSnapshot ¶
ProgressReportSnapshot captures the progress reading an input source.
type ReusableEmitter ¶
type ReusableEmitter interface { // Init resets the value. Can be called multiple times. Init(ctx context.Context, ws []typex.Window, t typex.EventTime) error // Value returns the side input value. Constant value. Value() interface{} }
ReusableEmitter is a resettable value needed to hold the implicit context and emit event time.
type ReusableInput ¶
type ReusableInput interface { // Init initializes the value before use. Init() error // Value returns the side input value. Value() interface{} // Reset resets the value after use. Reset() error }
ReusableInput is a resettable value, notably used to unwind iterators cheaply and cache materialized side input across invocations.
type Root ¶
type Root interface { Unit // Process processes the entire source, notably emitting elements to // downstream nodes. Process(ctx context.Context) error }
Root represents a root processing unit. It contains its processing continuation, notably other nodes.
type SideInputAdapter ¶
type SideInputAdapter interface {
NewIterable(ctx context.Context, reader SideInputReader, w typex.Window) (ReStream, error)
}
SideInputAdapter provides a concrete ReStream from a low-level side input reader. It encapsulates StreamID and coding as needed.
func NewSideInputAdapter ¶
func NewSideInputAdapter(sid StreamID, c *coder.Coder) SideInputAdapter
NewSideInputAdapter returns a side input adapter for the given StreamID and coder. It expects a W<KV<K,V>> coder, because the protocol supports MultiSet access only.
type SideInputReader ¶
type SideInputReader interface { // Open opens a byte stream for reading iterable side input. Open(ctx context.Context, id StreamID, key, w []byte) (io.ReadCloser, error) }
SideInputReader is the interface for reading side input data.
type Stream ¶
Stream is a FullValue reader. It returns io.EOF when complete, but can be prematurely closed.
type StreamID ¶
StreamID represents the static information needed to identify a data stream. Dynamic information, notably bundleID, is provided implicitly by the managers.
type Target ¶
type Target struct { // ID is the transform ID. ID string // Name is a local name in the context of the transform. Name string }
Target represents the static target of external operations.
type Unit ¶
type Unit interface { // ID returns the unit ID. ID() UnitID // Up initializes the unit. It is separate from Unit construction to // make panic/error handling easier. Up(ctx context.Context) error // StartBundle signals that processing preconditions, such as availability // of side input, are met and starts the given bundle. StartBundle(ctx context.Context, id string, data DataContext) error // FinishBundle signals end of input and thus finishes the bundle. Any // data connections must be closed. FinishBundle(ctx context.Context) error // Down tears down the processing node. It is notably called if the unit // or plan encounters an error and must thus robustly handle cleanup of // unfinished bundles. If a unit itself (as opposed to downstream units) // is the cause of breakage, the error returned should indicate the root // cause. Down(ctx context.Context) error }
Unit represents a processing unit capable of processing multiple bundles serially. Units are not required to be concurrency-safe. Each unit is responsible for propagating each data processing call downstream, i.e., all calls except Up/Down, as appropriate.
type WindowDecoder ¶
type WindowDecoder interface { // Decode deserializes a value from the given reader. Decode(io.Reader) ([]typex.Window, error) }
WindowDecoder handles Window deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeWindowDecoder ¶
func MakeWindowDecoder(c *coder.WindowCoder) WindowDecoder
MakeWindowDecoder returns a WindowDecoder for the given window coder.
type WindowEncoder ¶
type WindowEncoder interface { // Encode serializes the given value to the writer. Encode([]typex.Window, io.Writer) error }
WindowEncoder handles Window serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.
func MakeWindowEncoder ¶
func MakeWindowEncoder(c *coder.WindowCoder) WindowEncoder
MakeWindowEncoder returns a WindowEncoder for the given window coder.
type WindowInto ¶
WindowInto places each element in one or more windows.
func (*WindowInto) FinishBundle ¶
func (w *WindowInto) FinishBundle(ctx context.Context) error
func (*WindowInto) ID ¶
func (w *WindowInto) ID() UnitID
func (*WindowInto) ProcessElement ¶
func (*WindowInto) StartBundle ¶
func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContext) error
func (*WindowInto) String ¶
func (w *WindowInto) String() string