xlangx

package
v2.54.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 25 Imported by: 0

Documentation

Overview

Package xlangx contains various low-level utilities needed for adding cross-language transforms to the pipeline.

Index

Constants

View Source
const (
	// Separator is the canonical separator between a namespace and optional configuration.
	Separator = ":"
	// ClasspathSeparator is the canonical separator between a classpath namespace config string from other namespace-configuration string.
	ClasspathSeparator = ";"
)

Variables

This section is empty.

Functions

func CreateExternalConfigurationPayload

func CreateExternalConfigurationPayload(pl any) (*pipepb.ExternalConfigurationPayload, error)

CreateExternalConfigurationPayload takes a native Go struct and returns an ExternalConfigurationPayload proto with the struct encoded as a Row and its associated schema.

func DecodeStructPayload

func DecodeStructPayload(plBytes []byte) (any, error)

DecodeStructPayload takes a marshaled ExternalConfigurationPayload proto and returns a native Go struct, with its type converted from the Schema representation and its value decoded from the Row.

func EncodeStructPayload

func EncodeStructPayload(pl any) ([]byte, error)

EncodeStructPayload takes a native Go struct and returns a marshaled ExternalConfigurationPayload proto, containing a Schema representation of the original type and the original value encoded as a Row. This is intended to be used as the expansion payload for an External transform.

func Expand

func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) error

Expand expands an unexpanded graph.ExternalTransform as a graph.ExpandedTransform and assigns it to the ExternalTransform's Expanded field. This requires querying an expansion service based on the configuration details within the ExternalTransform.

For framework use only. Users should call beam.CrossLanguage to access foreign transforms rather than calling this function directly.

func QueryAutomatedExpansionService

func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error)

QueryAutomatedExpansionService submits an external transform to be expanded by the expansion service and then eagerly materializes the artifacts for staging. The given transform should be the external transform, and the components are any additional components necessary for the pipeline snippet.

The address to be queried is determined by the Config field of the HandlerParams after the prefix tag indicating the automated service is in use.

func QueryExpansionService

func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error)

QueryExpansionService submits an external transform to be expanded by the expansion service. The given transform should be the external transform, and the components are any additional components necessary for the pipeline snippet.

The address to be queried is determined by the Config field of HandlerParams.

This HandlerFunc is exported to simplify building custom handler functions that do end up calling a Beam ExpansionService, either as a fallback or as part of normal flow.

func QueryPythonExpansionService

func QueryPythonExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error)

QueryPythonExpansionService submits an external python transform to be expanded by the expansion service and then eagerly materializes the artifacts for staging. The given transform should be the external transform, and the components are any additional components necessary for the pipeline snippet.

The address to be queried is determined by the Config field of the HandlerParams after the prefix tag indicating the automated service is in use.

func RegisterHandler

func RegisterHandler(namespace string, handler HandlerFunc)

RegisterHandler associates a namespace with a HandlerFunc which can be used to replace calls to a Beam ExpansionService.

Then, expansion addresses of the forms

"<namespace>" or
"<namespace>:<configuration>"

can be used with beam.CrossLanguage. Any configuration after the separator is provided to the HandlerFunc on call for the handler func to use at it's leisure.

func RegisterOverrideForUrn

func RegisterOverrideForUrn(urn, expansionAddr string)

RegisterOverrideForUrn overrides which expansion address is used to expand a specific transform URN. The expansion address must be a URL or be a namespaced handler registered with RegisterHandler.

When the expansion address is for a handler, it may take the forms

"<namespace>" or
"<namespace>:<configuration>"

func Require

func Require(expansionAddr string) string

Require takes an expansionAddr and requires cross language expansion to use it and it's associated handler. If the transform's urn has a specific override, it will be ignored.

Intended for use by cross language wrappers to permit per-call overrides of the expansion address within a single pipeline, such as for testing purposes.

func ResolveArtifacts

func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline)

ResolveArtifacts acquires all dependencies for a cross-language transform

func ResolveArtifactsWithConfig

func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error)

ResolveArtifactsWithConfig acquires all dependencies for cross-language transforms, but with some additional configuration to behavior. By default, this function performs the following steps for each cross-language transform in the list of edges:

  1. Retrieves a list of dependencies needed from the expansion service.
  2. Retrieves each dependency as an artifact and stages it to a default local filepath.
  3. Adds the dependencies to the transform's stored environment proto.

The changes that can be configured are documented in ResolveConfig.

This returns a map of "local path" to "sdk path". By default these are identical, unless ResolveConfig.SdkPath has been set.

func UpdateArtifactTypeFromFileToURL

func UpdateArtifactTypeFromFileToURL(edges []*graph.MultiEdge)

UpdateArtifactTypeFromFileToURL changes the type of the artifact from FILE to URL when the file path contains the suffix element ("://") of the URI scheme.

func UseAutomatedJavaExpansionService

func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServiceOption) string

UseAutomatedJavaExpansionService takes a gradle target and creates a tagged string to indicate that it should be used to start up an automated expansion service for a cross-language expansion.

Intended for use by cross language wrappers to permit spinning up an expansion service for a user if no expansion service address is provided.

func UseAutomatedPythonExpansionService

func UseAutomatedPythonExpansionService(service string, opts ...ExpansionServiceOption) string

UseAutomatedPythonExpansionService takes a expansion service module name and creates a tagged string to indicate that it should be used to start up an automated expansion service for a cross-language expansion.

Intended for use by cross language wrappers to permit spinning up an expansion service for a user if no expansion service address is provided.

Types

type ExpansionServiceOption

type ExpansionServiceOption func(*string)

ExpansionServiceOption provides an option for xlangx.UseAutomatedJavaExpansionService()

func AddClasspaths

func AddClasspaths(classpaths []string) ExpansionServiceOption

AddClasspaths is an expansion service option for xlangx.UseAutomatedExpansionService that accepts a classpaths slice and creates a tagged expansion address string suffixed with classpath separator and classpaths provided.

func AddExtraPackages

func AddExtraPackages(packages []string) ExpansionServiceOption

AddExtraPackages is an expansion service option for xlangx.UseAutomatedPythonExpansionService that accepts a extra packages slice and creates a tagged expansion address string suffixed with classpath separator and service module provided.

type HandlerFunc

type HandlerFunc func(context.Context, *HandlerParams) (*jobpb.ExpansionResponse, error)

HandlerFunc abstracts making an ExpansionService request.

type HandlerParams

type HandlerParams struct {
	// Additional parameterization string, if any.
	Config string

	Req *jobpb.ExpansionRequest
	// contains filtered or unexported fields
}

HandlerParams is the parameter to an expansion service handler.

func (*HandlerParams) CoderMarshaller

func (p *HandlerParams) CoderMarshaller() *graphx.CoderMarshaller

CoderMarshaller returns a coder marshaller initialized with the request's namespace.

func (*HandlerParams) Inputs

func (p *HandlerParams) Inputs() []PCol

Inputs returns the provided input PCollections, if any, for the PTransform to expand in this expansion service request.

func (*HandlerParams) Outputs

func (p *HandlerParams) Outputs() []PCol

Outputs returns the provided output PCollections, if any, for expected outputs for this expansion service request.

If no collections are returned, none are currently expected, but may be provided by the expansion.

type PCol

type PCol struct {
	Index   int          // Positional index of this input or output
	Local   string       // Local name of the PCollection (may be used in the cross language PTransform)
	Coder   *coder.Coder // Contains the full type and other coder information.
	Bounded pipepb.IsBounded_Enum
	// contains filtered or unexported fields
}

PCol represents input or output pcollections to the cross language transform being expanded.

func (*PCol) ID

func (p *PCol) ID() string

ID produces a standard format globally namespaced id for a PCollection from the local identifier.

func (*PCol) WSID

func (p *PCol) WSID() string

WSID produces a standard format globally namespaced id for a WindowingStrategy from the local identifier.

func (*PCol) WindowingStrategy

func (p *PCol) WindowingStrategy(cm *graphx.CoderMarshaller) (string, *pipepb.WindowingStrategy)

WindowingStrategy returns the id to this PCollection's windowing strategy, and the associated proto.

TODO: intern windowing strategies.

type ResolveConfig

type ResolveConfig struct {
	// SdkPath replaces the default filepath for dependencies, but only in the
	// external environment proto to be used by the SDK Harness during pipeline
	// execution. This is used to specify alternate staging directories, such
	// as for staging artifacts remotely.
	//
	// Setting an SdkPath does not change staging behavior otherwise. All
	// artifacts still get staged to the default local filepath, and it is the
	// user's responsibility to stage those local artifacts to the SdkPath.
	SdkPath string

	// JoinFn is a function for combining SdkPath and individual artifact names.
	// If not specified, it defaults to using filepath.Join.
	JoinFn func(path, name string) string
}

ResolveConfig contains fields for configuring the behavior for resolving artifacts.

Directories

Path Synopsis
Package expansionx contains utilities for starting expansion services for cross-language transforms.
Package expansionx contains utilities for starting expansion services for cross-language transforms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL