Documentation ¶
Overview ¶
Package graph is the internal representation of the Beam execution plan. This package is used by the public-facing Beam package to organize the user's pipeline into a connected graph structure. This graph is a precise, strongly-typed representation of the user's intent, and allows the runtime to verify typing of collections, and tracks the data dependency relationships to allow an optimizer to schedule the work.
Index ¶
- Constants
- func Bounded(ns []*Node) bool
- func NodeTypes(list []*Node) []typex.FullType
- func NumMainInputs(num mainInputs) func(*config)
- type CombineFn
- func (f *CombineFn) AddInputFn() *funcx.Fn
- func (f *CombineFn) CompactFn() *funcx.Fn
- func (f *CombineFn) CreateAccumulatorFn() *funcx.Fn
- func (f *CombineFn) ExtractOutputFn() *funcx.Fn
- func (f *CombineFn) MergeAccumulatorsFn() *funcx.Fn
- func (f *CombineFn) Name() string
- func (f *CombineFn) SetupFn() *funcx.Fn
- func (f *CombineFn) TeardownFn() *funcx.Fn
- type DoFn
- type DynFn
- type Fn
- type Graph
- func (g *Graph) Build() ([]*MultiEdge, []*Node, error)
- func (g *Graph) NewEdge(parent *Scope) *MultiEdge
- func (g *Graph) NewNode(t typex.FullType, w *window.WindowingStrategy, bounded bool) *Node
- func (g *Graph) NewScope(parent *Scope, name string) *Scope
- func (g *Graph) Root() *Scope
- func (g *Graph) String() string
- type Inbound
- type InputKind
- type MultiEdge
- func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error)
- func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder, ...) (*MultiEdge, error)
- func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out []typex.FullType, ...) *MultiEdge
- func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error)
- func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge
- func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.Coder, ...) (*MultiEdge, error)
- func NewReshuffle(g *Graph, s *Scope, in *Node) (*MultiEdge, error)
- func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge
- type Node
- type Opcode
- type Outbound
- type Payload
- type Scope
- type SplittableDoFn
- func (f *SplittableDoFn) CreateInitialRestrictionFn() *funcx.Fn
- func (f *SplittableDoFn) CreateTrackerFn() *funcx.Fn
- func (f *SplittableDoFn) Name() string
- func (f *SplittableDoFn) RestrictionSizeFn() *funcx.Fn
- func (f *SplittableDoFn) RestrictionT() reflect.Type
- func (f *SplittableDoFn) SplitRestrictionFn() *funcx.Fn
Constants ¶
const ( MainUnknown mainInputs = -1 // Number of inputs is unknown for DoFn validation. MainSingle mainInputs = 1 // Number of inputs for single value elements. MainKv mainInputs = 2 // Number of inputs for KV elements. )
The following constants prefixed with "Main" represent valid numbers of DoFn main inputs for DoFn construction and validation.
const CombinePerKeyScope = "CombinePerKey"
CombinePerKeyScope is the Go SDK canonical name for the combine composite scope. With Beam Portability, "primitive" composite transforms like combine have their URNs & payloads attached to a high level scope, with a default representation beneath. The use of this const permits the translation layer to confirm the SDK expects this combine to be liftable by a runner and should set this scope's URN and Payload accordingly.
Variables ¶
This section is empty.
Functions ¶
func NumMainInputs ¶
func NumMainInputs(num mainInputs) func(*config)
NumMainInputs is an optional config to NewDoFn which specifies the number of main inputs to the DoFn being created, allowing for more complete validation. Valid inputs are the package constants of type mainInputs.
Example usage:
graph.NewDoFn(fn, graph.NumMainInputs(graph.MainKv))
Types ¶
type CombineFn ¶
type CombineFn Fn
CombineFn represents a CombineFn.
func AsCombineFn ¶
AsCombineFn converts a Fn to a CombineFn, if possible.
func NewCombineFn ¶
NewCombineFn constructs a CombineFn from the given value, if possible.
func (*CombineFn) AddInputFn ¶
AddInputFn returns the "AddInput" function, if present.
func (*CombineFn) CreateAccumulatorFn ¶
CreateAccumulatorFn returns the "CreateAccumulator" function, if present.
func (*CombineFn) ExtractOutputFn ¶
ExtractOutputFn returns the "ExtractOutput" function, if present.
func (*CombineFn) MergeAccumulatorsFn ¶
MergeAccumulatorsFn returns the "MergeAccumulators" function. If it is the only method present, then InputType == AccumulatorType == OutputType.
func (*CombineFn) TeardownFn ¶
TeardownFn returns the "Teardown" function, if present.
type DoFn ¶
type DoFn Fn
DoFn represents a DoFn.
func AsDoFn ¶
AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many main inputs are expected in the DoFn's method signatures. Valid inputs are the package constants of type mainInputs. If that number is MainUnknown then validation is done by best effort and may miss some edge cases.
func (*DoFn) FinishBundleFn ¶
FinishBundleFn returns the "FinishBundle" function, if present.
func (*DoFn) IsSplittable ¶
IsSplittable returns whether the DoFn is a valid Splittable DoFn.
func (*DoFn) ProcessElementFn ¶
ProcessElementFn returns the "ProcessElement" function.
func (*DoFn) StartBundleFn ¶
StartBundleFn returns the "StartBundle" function, if present.
func (*DoFn) TeardownFn ¶
TeardownFn returns the "Teardown" function, if present.
type DynFn ¶
type DynFn struct { // Name is the name of the function. It does not have to be a valid symbol. Name string // T is the type of the generated function T reflect.Type // Data holds the data, if any, for the generator. Each function // generator typically needs some configuration data, which is // required by the DynFn to be encoded. Data []byte // Gen is the function generator. The function generator itself must be a // function with a unique symbol. Gen func(string, reflect.Type, []byte) reflectx.Func }
DynFn is a generator for dynamically-created functions:
gen: (name string, t reflect.Type, []byte) -> func : T
where the generated function, fn : T, is re-created at runtime. This concept allows serialization of dynamically-generated functions, which do not have a valid (unique) symbol such as one created via reflect.MakeFunc.
type Fn ¶
type Fn struct { // Fn holds the function, if present. If Fn is nil, Recv must be // non-nil. Fn *funcx.Fn // Recv hold the struct receiver, if present. If Recv is nil, Fn // must be non-nil. Recv interface{} // DynFn holds the function-generator, if dynamic. If not nil, Fn // holds the generated function. DynFn *DynFn // contains filtered or unexported fields }
Fn holds either a function or struct receiver.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph represents an in-progress deferred execution graph and is easily translatable to the model graph. This graph representation allows precise control over scope and connectivity.
func (*Graph) Build ¶
Build performs finalization on the graph. It verifies the correctness of the graph structure, typechecks the plan and returns a slice of the edges in the graph.
type Inbound ¶
type Inbound struct { // Kind presents the form of the data that the edge expects. Main input // must be processed element-wise, but side input may take several // convenient forms. For example, a DoFn that processes ints may choose // among the following parameter types: // // * Main: int // * Singleton: int // * Slice: []int // * Iter: func(*int) bool // * ReIter: func() func(*int) bool // // If the DoFn is generic then int may be replaced by any of the type // variables. For example, // // * Slice: []typex.T // * Iter: func(*typex.X) bool // // If the input type is KV<int,string>, say, then the options are: // // * Main: int, string (as two separate parameters) // * Map: map[int]string // * MultiMap: map[int][]string // * Iter: func(*int, *string) bool // * ReIter: func() func(*int, *string) bool // // As above, either int, string, or both can be replaced with type // variables. For example, // // * Map: map[typex.X]typex.Y // * MultiMap: map[typex.T][]string // * Iter: func(*typex.Z, *typex.Z) bool // // Note that in the last case the parameter type requires that both // the key and value types are identical. Bind enforces such constraints. Kind InputKind // From is the incoming node in the graph. From *Node // Type is the fulltype matching the actual type used by the transform. // Due to the loose signatures of DoFns, we can only determine the // inbound structure when the fulltypes of the incoming links are present. // For example, // // func (ctx context.Context, key int, value typex.X) error // // is a generic DoFn that if bound to KV<int,string> would have one // Inbound link with type KV<int, X>. Type typex.FullType }
Inbound represents an inbound data link from a Node.
type InputKind ¶
type InputKind string
InputKind represents the role of the input and its shape.
const ( Main InputKind = "Main" Singleton InputKind = "Singleton" Slice InputKind = "Slice" Map InputKind = "Map" // TODO: allow? MultiMap InputKind = "MultiMap" // TODO: allow? Iter InputKind = "Iter" ReIter InputKind = "ReIter" )
Valid input kinds.
func Bind ¶
func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error)
Bind returns the inbound, outbound and underlying output types for a Fn, when bound to the underlying input types. The complication of bind is primarily that UserFns have loose signatures and bind must produce valid type information for the execution plan.
For example,
func (t EventTime, k typex.X, v int, emit func(string, typex.X))
or
func (context.Context, k typex.X, v int) (string, typex.X, error)
are UserFns that may take one or two incoming fulltypes: either KV<X,int> or X with a singleton side input of type int. For the purpose of the shape of data processing, the two forms are equivalent. The non-data types, context.Context and error, are not part of the data signature, but in play only at runtime.
If either was bound to the input type [KV<string,int>], bind would return:
inbound: [Main: KV<X,int>] outbound: [KV<string,X>] output: [KV<string,string>]
Note that it propagates the assignment of X to string in the output type.
If either was instead bound to the input fulltypes [float, int], the result would be:
inbound: [Main: X, Singleton: int] outbound: [KV<string,X>] output: [KV<string, float>]
Here, the inbound shape and output types are different from before.
type MultiEdge ¶
type MultiEdge struct { Op Opcode DoFn *DoFn // ParDo RestrictionCoder *coder.Coder // SplittableParDo CombineFn *CombineFn // Combine AccumCoder *coder.Coder // Combine Value []byte // Impulse Payload *Payload // External WindowFn *window.Fn // WindowInto Input []*Inbound Output []*Outbound // contains filtered or unexported fields }
MultiEdge represents a primitive data processing operation. Each non-user code operation may be implemented by either the harness or the runner.
func NewCombine ¶
func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder, typedefs map[string]reflect.Type) (*MultiEdge, error)
NewCombine inserts a new Combine edge into the graph. Combines cannot have side input.
func NewExternal ¶
func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out []typex.FullType, bounded bool) *MultiEdge
NewExternal inserts an External transform. The system makes no assumptions about what this transform might do.
func NewFlatten ¶
NewFlatten inserts a new Flatten edge in the graph. Flatten output type is the shared input type.
func NewImpulse ¶
NewImpulse inserts a new Impulse edge into the graph. It must use the built-in bytes coder.
func NewParDo ¶
func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.Coder, typedefs map[string]reflect.Type) (*MultiEdge, error)
NewParDo inserts a new ParDo edge into the graph.
func NewReshuffle ¶
NewReshuffle inserts a new Reshuffle edge into the graph.
func NewWindowInto ¶
NewWindowInto inserts a new WindowInto edge into the graph.
type Node ¶
type Node struct { // Coder defines the data encoding. It can be changed, but must be of // the underlying type, t. Coder *coder.Coder // contains filtered or unexported fields }
Node is a typed connector describing the data type and encoding. A node may have multiple inbound and outbound connections. The underlying type must be a complete type, i.e., not include any type variables.
func (*Node) WindowingStrategy ¶
func (n *Node) WindowingStrategy() *window.WindowingStrategy
WindowingStrategy returns the window applied to the data.
type Outbound ¶
type Outbound struct { // To is the outgoing node in the graph. To *Node // Type is the fulltype matching the actual type used by the transform. // For DoFns, unlike inbound, the outbound types closely mimic the type // signature. For example, // // func (ctx context.Context, emit func (key int, value typex.X)) error // // is a generic DoFn that produces one Outbound link of type KV<int,X>. Type typex.FullType // representation type of data }
Outbound represents an outbound data link to a Node.
type Scope ¶
type Scope struct { // Label is the human-visible label for this scope. Label string // Parent is the parent scope, if nested. Parent *Scope // contains filtered or unexported fields }
Scope is a syntactic Scope, such as arising from a composite Transform. It has no semantic meaning at execution time. Used by monitoring.
type SplittableDoFn ¶
type SplittableDoFn DoFn
SplittableDoFn represents a DoFn implementing SDF methods.
func (*SplittableDoFn) CreateInitialRestrictionFn ¶
func (f *SplittableDoFn) CreateInitialRestrictionFn() *funcx.Fn
CreateInitialRestrictionFn returns the "CreateInitialRestriction" function, if present.
func (*SplittableDoFn) CreateTrackerFn ¶
func (f *SplittableDoFn) CreateTrackerFn() *funcx.Fn
CreateTrackerFn returns the "CreateTracker" function, if present.
func (*SplittableDoFn) Name ¶
func (f *SplittableDoFn) Name() string
Name returns the name of the function or struct.
func (*SplittableDoFn) RestrictionSizeFn ¶
func (f *SplittableDoFn) RestrictionSizeFn() *funcx.Fn
RestrictionSizeFn returns the "RestrictionSize" function, if present.
func (*SplittableDoFn) RestrictionT ¶
func (f *SplittableDoFn) RestrictionT() reflect.Type
RestrictionT returns the restriction type from the SDF.
func (*SplittableDoFn) SplitRestrictionFn ¶
func (f *SplittableDoFn) SplitRestrictionFn() *funcx.Fn
SplitRestrictionFn returns the "SplitRestriction" function, if present.
Directories ¶
Path | Synopsis |
---|---|
Package coder contains coder representation and utilities.
|
Package coder contains coder representation and utilities. |
Package mtime contains a millisecond representation of time.
|
Package mtime contains a millisecond representation of time. |
Package window contains window representation, windowing strategies and utilities.
|
Package window contains window representation, windowing strategies and utilities. |