graphx

package
v2.5.0-RC1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2018 License: Apache-2.0, BSD-3-Clause, MIT Imports: 18 Imported by: 0

Documentation

Overview

Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker.

The registry's Register function is used by transform authors to make their type's methods available for remote invocation. The runner then uses the registry's Key and Lookup methods to access information supplied by transform authors.

The Encode* and Decode* methods are used to handle serialization of both regular Go data and the specific Beam data types. The Encode* methods are used after pipeline construction to turn the plan into a serializable form that can be sent for remote execution. The Decode* methods are used by the runner to recover the execution plan from the serialized form.

Index

Constants

View Source
const (
	URNInject = "beam:go:transform:inject:v1"
	URNExpand = "beam:go:transform:expand:v1"
)
View Source
const (
	WindowedValueType = "kind:windowed_value"
	BytesType         = "kind:bytes"
	VarIntType        = "kind:varint"

	GlobalWindowType   = "kind:global_window"
	IntervalWindowType = "kind:interval_window"
)

Exported types are used for translation lookup.

View Source
const (
	URNImpulse       = "beam:transform:impulse:v1"
	URNParDo         = "urn:beam:transform:pardo:v1"
	URNFlatten       = "beam:transform:flatten:v1"
	URNGBK           = "beam:transform:group_by_key:v1"
	URNCombinePerKey = "beam:transform:combine_per_key:v1"
	URNWindow        = "beam:transform:window:v1"

	URNGlobalWindowsWindowFn  = "beam:windowfn:global_windows:v0.1"
	URNFixedWindowsWindowFn   = "beam:windowfn:fixed_windows:v0.1"
	URNSlidingWindowsWindowFn = "beam:windowfn:sliding_windows:v0.1"
	URNSessionsWindowFn       = "beam:windowfn:session_windows:v0.1"

	// URNJavaDoFn is the legacy constant for marking a DoFn.
	// TODO: remove URNJavaDoFN when the Dataflow runner
	// uses the model pipeline and no longer falls back to Java.
	URNJavaDoFn = "urn:beam:dofn:javasdk:0.1"
	URNDoFn     = "beam:go:transform:dofn:v1"
)

Model constants for interfacing with a Beam runner. TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto

Variables

This section is empty.

Functions

func DecodeCoder

func DecodeCoder(data string) (*coder.Coder, error)

DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func DecodeCoderRef

func DecodeCoderRef(c *CoderRef) (*coder.Coder, error)

DecodeCoderRef extracts a usable coder from the encoded runner form.

func DecodeCoderRefs

func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error)

DecodeCoderRefs extracts usable coders from the encoded runner form.

func DecodeFn

func DecodeFn(data string) (reflectx.Func, error)

DecodeFn encodes a function. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The parameter types must be encodable.

func DecodeMultiEdge

func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error)

DecodeMultiEdge converts the wire representation into the preprocessed components representing that edge. We deserialize to components to avoid inserting the edge into a graph or creating a detached edge.

func DecodeType

func DecodeType(data string) (reflect.Type, error)

DecodeType decodes a type. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

func DecodeUserFn

func DecodeUserFn(ref *v1.UserFn) (*funcx.Fn, error)

DecodeUserFn receives the wire representation of a Beam user function, extracting the preprocessed representation, expanding all inputs and outputs of the function.

func EncodeCoder

func EncodeCoder(c *coder.Coder) (string, error)

EncodeCoder encodes a coder as a string. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeFn

func EncodeFn(fn reflectx.Func) (string, error)

EncodeFn encodes a function and parameter types as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func EncodeMultiEdge

func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error)

EncodeMultiEdge converts the preprocessed representation into the wire representation of the multiedge, capturing input and output type information.

func EncodeType

func EncodeType(t reflect.Type) (string, error)

EncodeType encodes a type as a string. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

func EncodeUserFn

func EncodeUserFn(u *funcx.Fn) (*v1.UserFn, error)

EncodeUserFn translates the preprocessed representation of a Beam user function into the wire representation, capturing all the inputs and outputs needed.

func EnsureUniqueNames

func EnsureUniqueNames(tree *ScopeTree)

EnsureUniqueNames ensures that each name is unique within each ScopeTree recursively. Any conflict is resolved by adding '1, '2, etc to the name.

func MakeGBKUnionCoder

func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder

MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.

func MakeKVUnionCoder

func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder

MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.

func Marshal

func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error)

Marshal converts a graph to a model pipeline.

func MarshalCoders

func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pb.Coder)

MarshalCoders marshals a list of coders into model coders.

func MarshalWindowingStrategy

func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *pb.WindowingStrategy

MarshalWindowingStrategy marshals the given windowing strategy in the given coder context.

func StableCoGBKFlattenID

func StableCoGBKFlattenID(id string) string

func StableCoGBKGBKID

func StableCoGBKGBKID(id string) string

func StableCoGBKInjectID

func StableCoGBKInjectID(id string, i int) string

func StableMultiEdgeID

func StableMultiEdgeID(edge *graph.MultiEdge) string

func UnmarshalCoders

func UnmarshalCoders(ids []string, m map[string]*pb.Coder) ([]*coder.Coder, error)

UnmarshalCoders unmarshals coders.

Types

type CoderMarshaller

type CoderMarshaller struct {
	// contains filtered or unexported fields
}

CoderMarshaller incrementally builds a compact model representation of a set of coders. Identical coders are shared.

func NewCoderMarshaller

func NewCoderMarshaller() *CoderMarshaller

NewCoderMarshaller returns a new CoderMarshaller.

func (*CoderMarshaller) Add

func (b *CoderMarshaller) Add(c *coder.Coder) string

Add adds the given coder to the set and returns its id. Idempotent.

func (*CoderMarshaller) AddMulti

func (b *CoderMarshaller) AddMulti(list []*coder.Coder) []string

AddMulti adds the given coders to the set and returns their ids. Idempotent.

func (*CoderMarshaller) AddWindowCoder

func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string

AddWindowCoder adds a window coder.

func (*CoderMarshaller) Build

func (b *CoderMarshaller) Build() map[string]*pb.Coder

Build returns the set of model coders. Note that the map may be larger than the number of coders added, because component coders are included.

type CoderRef

type CoderRef struct {
	Type         string      `json:"@type,omitempty"`
	Components   []*CoderRef `json:"component_encodings,omitempty"`
	IsWrapper    bool        `json:"is_wrapper,omitempty"`
	IsPairLike   bool        `json:"is_pair_like,omitempty"`
	IsStreamLike bool        `json:"is_stream_like,omitempty"`
}

CoderRef defines the (structured) Coder in serializable form. It is an artifact of the CloudObject encoding.

func EncodeCoderRef

func EncodeCoderRef(c *coder.Coder) (*CoderRef, error)

EncodeCoderRef returns the encoded form understood by the runner.

func EncodeCoderRefs

func EncodeCoderRefs(list []*coder.Coder) ([]*CoderRef, error)

EncodeCoderRefs returns the encoded forms understood by the runner.

func WrapExtraWindowedValue

func WrapExtraWindowedValue(c *CoderRef) *CoderRef

WrapExtraWindowedValue adds an additional WV needed for side input, which expects the coder to have exactly one component with the element.

type CoderUnmarshaller

type CoderUnmarshaller struct {
	// contains filtered or unexported fields
}

CoderUnmarshaller is an incremental unmarshaller of model coders. Identical coders are shared.

func NewCoderUnmarshaller

func NewCoderUnmarshaller(m map[string]*pb.Coder) *CoderUnmarshaller

NewCoderUnmarshaller returns a new CoderUnmarshaller.

func (*CoderUnmarshaller) Coder

func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error)

Coder unmarshals a coder with the given id.

func (*CoderUnmarshaller) Coders

func (b *CoderUnmarshaller) Coders(ids []string) ([]*coder.Coder, error)

func (*CoderUnmarshaller) WindowCoder

func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error)

WindowCoder unmarshals a window coder with the given id.

type NamedEdge

type NamedEdge struct {
	Name string
	Edge *graph.MultiEdge
}

NamedEdge is a named MultiEdge.

type NamedScope

type NamedScope struct {
	Name  string
	Scope *graph.Scope
}

NamedScope is a named Scope.

type Options

type Options struct {
	// ContainerImageURL is the default environment container image.
	ContainerImageURL string
}

Options for marshalling a graph into a model pipeline.

type ScopeTree

type ScopeTree struct {
	// Scope is the named scope at the root of the (sub)tree.
	Scope NamedScope
	// Edges are the named edges directly under this scope.
	Edges []NamedEdge

	// Children are the scopes directly under this scope.
	Children []*ScopeTree
}

ScopeTree is a convenient representation of the Scope-structure as a tree. Each ScopeTree may also be a subtree of another ScopeTree. The tree structure respects the underlying Scope structure, i.e., if Scope 'a' has a parent 'b' then the ScopeTree for 'b' must have the ScopeTree for 'a' as a child.

func NewScopeTree

func NewScopeTree(edges []*graph.MultiEdge) *ScopeTree

NewScopeTree computes the ScopeTree for a set of edges.

Directories

Path Synopsis
Package v1 is a generated protocol buffer package.
Package v1 is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL