exec

package
v2.6.0-RC1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2018 License: Apache-2.0, BSD-3-Clause, MIT Imports: 30 Imported by: 0

Documentation

Overview

Package exec contains runtime plan representation and execution. A pipeline must be translated to a runtime plan to be executed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Convert

func Convert(v interface{}, to reflect.Type) interface{}

Convert converts type of the runtime value to the desired one. It is needed to drop the universal type and convert Aggregate types.

func DecodeWindowedValueHeader

func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error)

DecodeWindowedValueHeader deserializes a windowed value header.

func EncodeWindowedValueHeader

func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.EventTime, w io.Writer) error

EncodeWindowedValueHeader serializes a windowed value header.

func MultiFinishBundle

func MultiFinishBundle(ctx context.Context, list ...Node) error

MultiFinishBundle calls StartBundle on multiple nodes. Convenience function.

func MultiStartBundle

func MultiStartBundle(ctx context.Context, id string, data DataManager, list ...Node) error

MultiStartBundle calls StartBundle on multiple nodes. Convenience function.

func RegisterEmitter

func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter)

RegisterEmitter registers an emitter for the given type, such as "func(int)". If multiple emitters are registered for the same type, the last registration wins.

func RegisterInput

func RegisterInput(t reflect.Type, maker func(ReStream) ReusableInput)

RegisterInput registers an input handler for the given type, such as "func(*int)bool". If multiple input handlers are registered for the same type, the last registration wins.

Types

type Combine

type Combine struct {
	UID     UnitID
	Fn      *graph.CombineFn
	UsesKey bool
	Out     Node
	// contains filtered or unexported fields
}

Combine is a Combine executor. Combiners do not have side inputs (or output).

func (*Combine) Down

func (n *Combine) Down(ctx context.Context) error

Down runs the ParDo's TeardownFn.

func (*Combine) FinishBundle

func (n *Combine) FinishBundle(ctx context.Context) error

FinishBundle completes this node's processing of a bundle.

func (*Combine) ID

func (n *Combine) ID() UnitID

ID returns the UnitID for this node.

func (*Combine) ProcessElement

func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

ProcessElement combines elements grouped by key using the CombineFn's AddInput, MergeAccumulators, and ExtractOutput functions.

func (*Combine) StartBundle

func (n *Combine) StartBundle(ctx context.Context, id string, data DataManager) error

StartBundle initializes processing this bundle for combines.

func (*Combine) String

func (n *Combine) String() string

func (*Combine) Up

func (n *Combine) Up(ctx context.Context) error

Up initializes this CombineFn and runs its SetupFn() method.

type DataManager

type DataManager interface {
	DataReader
	DataWriter
}

DataManager manages external data byte streams. Each data stream can be opened by one consumer only.

type DataReader

type DataReader interface {
	OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
}

DataReader is the interface for reading data elements from a particular stream.

type DataSink

type DataSink struct {
	UID    UnitID
	Port   Port
	Target Target
	Coder  *coder.Coder
	// contains filtered or unexported fields
}

DataSink is a Node.

func (*DataSink) Down

func (n *DataSink) Down(ctx context.Context) error

func (*DataSink) FinishBundle

func (n *DataSink) FinishBundle(ctx context.Context) error

func (*DataSink) ID

func (n *DataSink) ID() UnitID

func (*DataSink) ProcessElement

func (n *DataSink) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

func (*DataSink) StartBundle

func (n *DataSink) StartBundle(ctx context.Context, id string, data DataManager) error

func (*DataSink) String

func (n *DataSink) String() string

func (*DataSink) Up

func (n *DataSink) Up(ctx context.Context) error

type DataSource

type DataSource struct {
	UID    UnitID
	Port   Port
	Target Target
	Coder  *coder.Coder
	Out    Node
	// contains filtered or unexported fields
}

DataSource is a Root execution unit.

func (*DataSource) Down

func (n *DataSource) Down(ctx context.Context) error

func (*DataSource) FinishBundle

func (n *DataSource) FinishBundle(ctx context.Context) error

func (*DataSource) ID

func (n *DataSource) ID() UnitID

func (*DataSource) Process

func (n *DataSource) Process(ctx context.Context) error

func (*DataSource) Progress

func (n *DataSource) Progress() ProgressReportSnapshot

Progress returns a snapshot of the source's progress.

func (*DataSource) StartBundle

func (n *DataSource) StartBundle(ctx context.Context, id string, data DataManager) error

func (*DataSource) String

func (n *DataSource) String() string

func (*DataSource) Up

func (n *DataSource) Up(ctx context.Context) error

type DataWriter

type DataWriter interface {
	OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
}

DataWriter is the interface for writing data elements to a particular stream.

type Decoder

type Decoder interface {
	// Decode decodes the []byte in to a value of the given type.
	Decode(reflect.Type, []byte) (interface{}, error)
}

Decoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.

type Discard

type Discard struct {
	// UID is the unit identifier.
	UID UnitID
}

Discard silently discard all elements. It is implicitly inserted for any loose ends in the pipeline.

func (*Discard) Down

func (d *Discard) Down(ctx context.Context) error

func (*Discard) FinishBundle

func (d *Discard) FinishBundle(ctx context.Context) error

func (*Discard) ID

func (d *Discard) ID() UnitID

func (*Discard) ProcessElement

func (d *Discard) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

func (*Discard) StartBundle

func (d *Discard) StartBundle(ctx context.Context, id string, data DataManager) error

func (*Discard) String

func (d *Discard) String() string

func (*Discard) Up

func (d *Discard) Up(ctx context.Context) error

type ElementDecoder

type ElementDecoder interface {
	// Decode deserializes a value from the given reader.
	Decode(io.Reader) (FullValue, error)
}

ElementDecoder handles FullValue deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeElementDecoder

func MakeElementDecoder(c *coder.Coder) ElementDecoder

MakeElementDecoder returns a ElementDecoder for the given coder. It panics if given a coder with stream types, such as GBK.

type ElementEncoder

type ElementEncoder interface {
	// Encode serializes the given value to the writer.
	Encode(FullValue, io.Writer) error
}

ElementEncoder handles FullValue serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeElementEncoder

func MakeElementEncoder(c *coder.Coder) ElementEncoder

MakeElementEncoder returns a ElementCoder for the given coder. It panics if given a coder with stream types, such as GBK.

type ElementProcessor

type ElementProcessor interface {
	// Call processes a single element. If GBK or CoGBK result, the values
	// are populated. Otherwise, they're empty.
	ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error
}

ElementProcessor presents a component that can process an element.

type Encoder

type Encoder interface {
	// Encode encodes the given value (of the given type).
	Encode(reflect.Type, interface{}) ([]byte, error)
}

Encoder is a uniform custom encoder interface. It wraps various forms of reflectx.Funcs.

type Expand

type Expand struct {
	// UID is the unit identifier.
	UID UnitID

	ValueDecoders []ElementDecoder

	Out Node
}

func (*Expand) Down

func (n *Expand) Down(ctx context.Context) error

func (*Expand) FinishBundle

func (n *Expand) FinishBundle(ctx context.Context) error

func (*Expand) ID

func (n *Expand) ID() UnitID

func (*Expand) ProcessElement

func (n *Expand) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*Expand) StartBundle

func (n *Expand) StartBundle(ctx context.Context, id string, data DataManager) error

func (*Expand) String

func (n *Expand) String() string

func (*Expand) Up

func (n *Expand) Up(ctx context.Context) error

type ExtractOutput

type ExtractOutput struct {
	*Combine
}

ExtractOutput is an executor for extracting output from a lifted combine.

func (*ExtractOutput) ProcessElement

func (n *ExtractOutput) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

ProcessElement accepts an accumulator value, and extracts the final return type from it.

func (*ExtractOutput) String

func (n *ExtractOutput) String() string

type FixedReStream

type FixedReStream struct {
	Buf []FullValue
}

FixedReStream is a simple in-memory ReSteam.

func (*FixedReStream) Open

func (n *FixedReStream) Open() Stream

type FixedStream

type FixedStream struct {
	Buf []FullValue
	// contains filtered or unexported fields
}

FixedStream is a simple in-memory Stream from a fixed array.

func (*FixedStream) Close

func (s *FixedStream) Close() error

func (*FixedStream) Read

func (s *FixedStream) Read() (FullValue, error)

type Flatten

type Flatten struct {
	// UID is the unit identifier.
	UID UnitID
	// N is the number of incoming edges.
	N int
	// Out is the output node.
	Out Node
	// contains filtered or unexported fields
}

Flatten is a fan-in node. It ensures that Start/FinishBundle are only called once downstream.

func (*Flatten) Down

func (m *Flatten) Down(ctx context.Context) error

func (*Flatten) FinishBundle

func (m *Flatten) FinishBundle(ctx context.Context) error

func (*Flatten) ID

func (m *Flatten) ID() UnitID

func (*Flatten) ProcessElement

func (m *Flatten) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*Flatten) StartBundle

func (m *Flatten) StartBundle(ctx context.Context, id string, data DataManager) error

func (*Flatten) String

func (m *Flatten) String() string

func (*Flatten) Up

func (m *Flatten) Up(ctx context.Context) error

type FullValue

type FullValue struct {
	Elm  interface{} // Element or KV key.
	Elm2 interface{} // KV value, if not invalid

	Timestamp typex.EventTime
	Windows   []typex.Window
}

FullValue represents the full runtime value for a data element, incl. the implicit context. The result of a GBK or CoGBK is not a single FullValue. The consumer is responsible for converting the values to the correct type.

func Invoke

func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)

Invoke invokes the fn with the given values. The extra values must match the non-main side input and emitters. It returns the direct output, if any.

func InvokeWithoutEventTime

func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error)

func ReadAll

func ReadAll(s Stream) ([]FullValue, error)

ReadAll read the full stream and returns the result. It always closes the stream.

func (FullValue) String

func (v FullValue) String() string

type GenID

type GenID struct {
	// contains filtered or unexported fields
}

GenID is a simple UnitID generator.

func (*GenID) New

func (g *GenID) New() UnitID

New returns a fresh ID.

type Inject

type Inject struct {
	// UID is the unit identifier.
	UID UnitID
	// N is the index (tag) in the union.
	N int
	// ValueCoder is the encoder for the value part of the incoming KV<K,V>.
	ValueEncoder ElementEncoder
	// Out is the successor node.
	Out Node
}

Inject injects the predecessor index into each FullValue and encodes the value, effectively converting KV<X,Y> into KV<X,KV<int,[]byte>>. Used in combination with Expand.

func (*Inject) Down

func (n *Inject) Down(ctx context.Context) error

func (*Inject) FinishBundle

func (n *Inject) FinishBundle(ctx context.Context) error

func (*Inject) ID

func (n *Inject) ID() UnitID

func (*Inject) ProcessElement

func (n *Inject) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*Inject) StartBundle

func (n *Inject) StartBundle(ctx context.Context, id string, data DataManager) error

func (*Inject) String

func (n *Inject) String() string

func (*Inject) Up

func (n *Inject) Up(ctx context.Context) error

type LiftedCombine

type LiftedCombine struct {
	*Combine
	// contains filtered or unexported fields
}

LiftedCombine is an executor for combining values before grouping by keys for a lifted combine. Partially groups values by key within a bundle, accumulating them in an in memory cache, before emitting them in the FinishBundle step.

func (*LiftedCombine) Down

func (n *LiftedCombine) Down(ctx context.Context) error

Down tears down the cache.

func (*LiftedCombine) FinishBundle

func (n *LiftedCombine) FinishBundle(ctx context.Context) error

FinishBundle iterates through the cached (key, accumulator) pairs, and then processes the value in the bundle as normal.

func (*LiftedCombine) ProcessElement

func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

ProcessElement takes a KV pair and combines values with the same into an accumulator, caching them until the bundle is complete.

func (*LiftedCombine) StartBundle

func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataManager) error

StartBundle initializes the in memory cache of keys to accumulators.

func (*LiftedCombine) String

func (n *LiftedCombine) String() string

type MainInput

type MainInput struct {
	Key    FullValue
	Values []ReStream
}

MainInput is the main input and is unfolded in the invocation, if present.

type MergeAccumulators

type MergeAccumulators struct {
	*Combine
}

MergeAccumulators is an executor for merging accumulators from a lifted combine.

func (*MergeAccumulators) ProcessElement

func (n *MergeAccumulators) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error

ProcessElement accepts a stream of accumulator values with the same key and runs the MergeAccumulatorsFn over them repeatedly.

func (*MergeAccumulators) String

func (n *MergeAccumulators) String() string

func (*MergeAccumulators) Up

Up eagerly gets the optimized binary merge function.

type Multiplex

type Multiplex struct {
	// UID is the unit identifier.
	UID UnitID
	// Out is a list of output nodes.
	Out []Node
}

Multiplex is a fan-out node. It simply forwards any call to all downstream nodes.

func (*Multiplex) Down

func (m *Multiplex) Down(ctx context.Context) error

func (*Multiplex) FinishBundle

func (m *Multiplex) FinishBundle(ctx context.Context) error

func (*Multiplex) ID

func (m *Multiplex) ID() UnitID

func (*Multiplex) ProcessElement

func (m *Multiplex) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*Multiplex) StartBundle

func (m *Multiplex) StartBundle(ctx context.Context, id string, data DataManager) error

func (*Multiplex) String

func (m *Multiplex) String() string

func (*Multiplex) Up

func (m *Multiplex) Up(ctx context.Context) error

type Node

type Node interface {
	Unit
	ElementProcessor
}

Node represents an single-bundle processing unit. Each node contains its processing continuation, notably other nodes.

type ParDo

type ParDo struct {
	UID     UnitID
	Fn      *graph.DoFn
	Inbound []*graph.Inbound
	Side    []ReStream
	Out     []Node

	PID string
	// contains filtered or unexported fields
}

ParDo is a DoFn executor.

func (*ParDo) Down

func (n *ParDo) Down(ctx context.Context) error

func (*ParDo) FinishBundle

func (n *ParDo) FinishBundle(ctx context.Context) error

func (*ParDo) ID

func (n *ParDo) ID() UnitID

func (*ParDo) ProcessElement

func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*ParDo) StartBundle

func (n *ParDo) StartBundle(ctx context.Context, id string, data DataManager) error

func (*ParDo) String

func (n *ParDo) String() string

func (*ParDo) Up

func (n *ParDo) Up(ctx context.Context) error

type Plan

type Plan struct {
	// contains filtered or unexported fields
}

Plan represents the bundle execution plan. It will generally be constructed from a part of a pipeline. A plan can be used to process multiple bundles serially.

func NewPlan

func NewPlan(id string, units []Unit) (*Plan, error)

NewPlan returns a new bundle execution plan from the given units.

func UnmarshalPlan

func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error)

UnmarshalPlan converts a model bundle descriptor into an execution Plan.

func (*Plan) Down

func (p *Plan) Down(ctx context.Context) error

Down takes the plan and associated units down. Does not panic.

func (*Plan) Execute

func (p *Plan) Execute(ctx context.Context, id string, manager DataManager) error

Execute executes the plan with the given data context and bundle id. Units are brought up on the first execution. If a bundle fails, the plan cannot be reused for further bundles. Does not panic. Blocking.

func (*Plan) ID

func (p *Plan) ID() string

ID returns the plan identifier.

func (*Plan) Metrics

func (p *Plan) Metrics() *fnpb.Metrics

Metrics returns a snapshot of input progress of the plan, and associated metrics.

func (*Plan) String

func (p *Plan) String() string

type Port

type Port struct {
	URL string
}

Port represents the connection port of external operations.

type ProgressReportSnapshot

type ProgressReportSnapshot struct {
	ID, Name string
	Count    int64
}

ProgressReportSnapshot captures the progress reading an input source.

type ReStream

type ReStream interface {
	Open() Stream
}

ReStream is Stream factory.

type ReusableEmitter

type ReusableEmitter interface {
	// Init resets the value. Can be called multiple times.
	Init(ctx context.Context, ws []typex.Window, t typex.EventTime) error
	// Value returns the side input value. Constant value.
	Value() interface{}
}

ReusableEmitter is a resettable value needed to hold the implicit context and emit event time.

type ReusableInput

type ReusableInput interface {
	// Init initializes the value before use.
	Init() error
	// Value returns the side input value.
	Value() interface{}
	// Reset resets the value after use.
	Reset() error
}

ReusableInput is a resettable value, notably used to unwind iterators cheaply and cache materialized side input across invocations.

type Root

type Root interface {
	Unit

	// Process processes the entire source, notably emitting elements to
	// downstream nodes.
	Process(ctx context.Context) error
}

Root represents a root processing unit. It contains its processing continuation, notably other nodes.

type Status

type Status int

Status is the status of a unit or plan.

const (
	Initializing Status = iota
	Up
	Active
	Broken
	Down
)

type Stream

type Stream interface {
	io.Closer
	Read() (FullValue, error)
}

Stream is a FullValue reader. It returns io.EOF when complete, but can be prematurely closed.

type StreamID

type StreamID struct {
	Port   Port
	Target Target
	InstID string
}

StreamID represents the information needed to identify a data stream.

func (StreamID) String

func (id StreamID) String() string

type Target

type Target struct {
	ID   string
	Name string
}

Target represents the target of external operations.

type Unit

type Unit interface {
	// ID returns the unit ID.
	ID() UnitID

	// Up initializes the unit. It is separate from Unit construction to
	// make panic/error handling easier.
	Up(ctx context.Context) error

	// StartBundle signals that processing preconditions, such as availability
	// of side input, are met and starts the given bundle.
	StartBundle(ctx context.Context, id string, data DataManager) error

	// FinishBundle signals end of input and thus finishes the bundle. Any
	// data connections must be closed.
	FinishBundle(ctx context.Context) error

	// Down tears down the processing node. It is notably called if the unit
	// or plan encounters an error and must thus robustly handle cleanup of
	// unfinished bundles. If a unit itself (as opposed to downstream units)
	// is the cause of breakage, the error returned should indicate the root
	// cause.
	Down(ctx context.Context) error
}

Unit represents a processing unit capable of processing multiple bundles serially. Units are not required to be concurrency-safe. Each unit is responsible for propagating each data processing call downstream, i.e., all calls except Up/Down, as appropriate.

type UnitID

type UnitID int

UnitID is a unit identifier. Used for debugging.

func IDs

func IDs(list ...Node) []UnitID

IDs returns the unit IDs of the given nodes.

type WindowDecoder

type WindowDecoder interface {
	// Decode deserializes a value from the given reader.
	Decode(io.Reader) ([]typex.Window, error)
}

WindowDecoder handles Window deserialization from a byte stream. The decoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeWindowDecoder

func MakeWindowDecoder(c *coder.WindowCoder) WindowDecoder

MakeWindowDecoder returns a WindowDecoder for the given window coder.

type WindowEncoder

type WindowEncoder interface {
	// Encode serializes the given value to the writer.
	Encode([]typex.Window, io.Writer) error
}

WindowEncoder handles Window serialization to a byte stream. The encoder can be reused, even if an error is encountered. Concurrency-safe.

func MakeWindowEncoder

func MakeWindowEncoder(c *coder.WindowCoder) WindowEncoder

MakeWindowEncoder returns a WindowEncoder for the given window coder.

type WindowInto

type WindowInto struct {
	UID UnitID
	Fn  *window.Fn
	Out Node
}

WindowInto places each element in one or more windows.

func (*WindowInto) Down

func (w *WindowInto) Down(ctx context.Context) error

func (*WindowInto) FinishBundle

func (w *WindowInto) FinishBundle(ctx context.Context) error

func (*WindowInto) ID

func (w *WindowInto) ID() UnitID

func (*WindowInto) ProcessElement

func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error

func (*WindowInto) StartBundle

func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataManager) error

func (*WindowInto) String

func (w *WindowInto) String() string

func (*WindowInto) Up

func (w *WindowInto) Up(ctx context.Context) error

Directories

Path Synopsis
Package optimized contains type-specialized shims for faster execution.
Package optimized contains type-specialized shims for faster execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL