funcx

package
v3.0.0-...-7ba4d6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 9 Imported by: 0

Documentation

Overview

Package funcx contains functions and types used to perform type analysis of Beam functions. This package is primarily used in pipeline construction to typecheck pipelines, but could be used by anyone who wants to perform type analysis of Beam user functions. For performing typechecking of code, consider the methods Try* in the beam package, as they are built on these primitives and may be more useful.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsEmit

func IsEmit(t reflect.Type) bool

IsEmit returns true iff the supplied type is an emitter.

func IsEmitWithEventTime

func IsEmitWithEventTime(t reflect.Type) bool

IsEmitWithEventTime return true iff the supplied type is an emitter and the first argument is the optional EventTime.

func IsIter

func IsIter(t reflect.Type) bool

IsIter returns true iff the supplied type is a "single sweep functional iterator".

A single sweep functional iterator is a function taking one or more pointers of data as arguments, that returns a single boolean value. The semantics of the function are that when called, if there are values to be supplied, they will be copied into the supplied pointers. The function returns true if data was copied, and false if there is no more data available.

func IsMalformedEmit

func IsMalformedEmit(t reflect.Type) (bool, error)

IsMalformedEmit returns true iff the supplied type is an illegal emitter and an error explaining why it is illegal. For example, an emitter is illegal if one of its parameters is not concrete. If the type does not have the structure of an emitter or it is a legal emitter, IsMalformedEmit returns false and no error.

func IsMalformedIter

func IsMalformedIter(t reflect.Type) (bool, error)

IsMalformedIter returns true iff the supplied type is an illegal "single sweep functional iterator" and an error explaining why it is illegal. For example, an iterator is not legal if one of its parameters is not concrete, universal, or a container type. If the type does not have the structure of an iter or it is a legal iter, IsMalformedIter returns false and no error.

func IsMalformedMultiMap

func IsMalformedMultiMap(t reflect.Type) (bool, error)

IsMalformedMultiMap returns true iff the supplied type is an illegal keyed functional iterator generator and an error explaining why it is illegal. A keyed iterator generator is not legal if its output is not of type iterator. If the type does not have the structure of a keyed iterator generator or it is a legal iterator generator, IsMalformedMultiMap returns false and no error.

func IsMalformedReIter

func IsMalformedReIter(t reflect.Type) (bool, error)

IsMalformedReIter returns true iff the supplied type is an illegal functional iterator generator and an error explaining why it is illegal. An iterator generator is not legal if its output is not of type iterator. If the type does not have the structure of an iterator generator or it is a legal iterator generator, IsMalformedReIter returns false and no error.

func IsMultiMap

func IsMultiMap(t reflect.Type) bool

IsMultiMap returns true iff the supplied type is a keyed functional iterator generator.

A keyed functional iterator generator is a function taking a single parameter (a key) and returns a corresponding single sweep functional iterator.

func IsReIter

func IsReIter(t reflect.Type) bool

IsReIter returns true iff the supplied type is a functional iterator generator.

A functional iterator generator is a parameter-less function that returns single sweep functional iterators.

func MustSatisfy

func MustSatisfy(fn any, sig *Signature)

MustSatisfy panics if the given fn does not satisfy the signature.

func Satisfy

func Satisfy(fn any, sig *Signature) error

Satisfy returns nil iff the fn can satisfy the signature, respecting generics. For example, for

foo : (context.Context, X) -> bool
bar : (int) -> bool

both would satisfy a signature of (context.Context?, int) -> bool. Only "foo" would satisfy (context.Context, string) -> bool and only "bar" would satisfy (int) -> bool.

func UnfoldEmit

func UnfoldEmit(t reflect.Type) ([]reflect.Type, bool)

UnfoldEmit returns the parameter types, if an emitter. For example:

func (int)                  returns {int}
func (string, int)          returns {string, int}
func (typex.EventTime, int) returns {typex.EventTime, int}

func UnfoldIter

func UnfoldIter(t reflect.Type) ([]reflect.Type, bool)

UnfoldIter returns the parameter types, if a single sweep functional iterator. For example:

func (*int) bool                   returns {int}
func (*string, *int) bool          returns {string, int}

EventTimes are not allowed in iterator types as per the Beam model (see https://github.com/apache/beam/issues/22404) for more information.

func UnfoldMultiMap

func UnfoldMultiMap(t reflect.Type) ([]reflect.Type, bool)

UnfoldMultiMap returns the parameter types for the input key and the output values iff the type is a keyed functional iterator generator.

func UnfoldReIter

func UnfoldReIter(t reflect.Type) ([]reflect.Type, bool)

UnfoldReIter returns the parameter types, if a functional iterator generator.

Types

type Fn

type Fn struct {
	Fn reflectx.Func

	Param []FnParam
	Ret   []ReturnParam
}

Fn is the reflected user function or method, preprocessed. This wrapper is useful both at graph construction time as well as execution time.

func New

func New(fn reflectx.Func) (*Fn, error)

New returns a Fn from a user function, if valid. Closures and dynamically created functions are considered valid here, but will be rejected if they are attempted to be serialized.

func (*Fn) BundleFinalization

func (u *Fn) BundleFinalization() (pos int, exists bool)

BundleFinalization returns (index, true) iff the function expects a parameter that implements typex.BundleFinalization.

func (*Fn) Context

func (u *Fn) Context() (pos int, exists bool)

Context returns (index, true) iff the function expects a context.Context. The context should be the first parameter by convention.

func (*Fn) Emits

func (u *Fn) Emits() (pos int, num int, exists bool)

Emits returns (index, num, true) iff the function expects one or more emitters. The index returned is the index of the first emitter param in the signature. The num return value is the number of emitters in the signature. When there are multiple emitters in the signature, they will all be located contiguously, so the range of emitter params is [index, index+num).

func (*Fn) Error

func (u *Fn) Error() (pos int, exists bool)

Error returns (index, true) iff the function returns an error.

func (*Fn) EventTime

func (u *Fn) EventTime() (pos int, exists bool)

EventTime returns (index, true) iff the function expects an event timestamp.

func (*Fn) Inputs

func (u *Fn) Inputs() (pos int, num int, exists bool)

Inputs returns (index, num, true) iff the function expects one or more inputs, consisting of the main input followed by any number of side inputs. The index returned is the index of the first input, which is always the main input. The num return value is the number of total inputs in the signature. The main input and all side inputs are located contiguously

func (*Fn) OutEventTime

func (u *Fn) OutEventTime() (pos int, exists bool)

OutEventTime returns (index, true) iff the function returns an event timestamp.

func (*Fn) Pane

func (u *Fn) Pane() (pos int, exists bool)

Pane returns (index, true) iff the function expects a PaneInfo.

func (*Fn) Params

func (u *Fn) Params(mask FnParamKind) []int

Params returns the parameter indices that matches the given mask.

func (*Fn) ProcessContinuation

func (u *Fn) ProcessContinuation() (pos int, exists bool)

ProcessContinuation returns (index, true) iff the function returns a process continuation.

func (*Fn) RTracker

func (u *Fn) RTracker() (pos int, exists bool)

RTracker returns (index, true) iff the function expects an sdf.RTracker.

func (*Fn) Returns

func (u *Fn) Returns(mask ReturnKind) []int

Returns returns the return indices that matches the given mask.

func (*Fn) StateProvider

func (u *Fn) StateProvider() (pos int, exists bool)

StateProvider returns (index, true) iff the function expects a parameter that implements state.Provider.

func (*Fn) String

func (u *Fn) String() string

func (*Fn) TimerProvider

func (u *Fn) TimerProvider() (pos int, exists bool)

TimerProvider returns (index, true) iff the function expects a parameter that implements timers.Provider.

func (*Fn) Type

func (u *Fn) Type() (pos int, exists bool)

Type returns (index, true) iff the function expects a reflect.FullType.

func (*Fn) WatermarkEstimator

func (u *Fn) WatermarkEstimator() (pos int, exists bool)

WatermarkEstimator returns (index, true) iff the function expects a parameter that implements sdf.WatermarkEstimator.

func (*Fn) Window

func (u *Fn) Window() (pos int, exists bool)

Window returns (index, true) iff the function expects a window.

type FnParam

type FnParam struct {
	Kind FnParamKind
	T    reflect.Type
}

FnParam captures the kind and type of a single user function parameter.

func SubParams

func SubParams(list []FnParam, indices ...int) []FnParam

SubParams returns the subsequence of the given params with the given indices.

type FnParamKind

type FnParamKind int

FnParamKind represents the kinds of parameters a user function may take.

const (
	// FnIllegal is an illegal function input parameter type.
	FnIllegal FnParamKind = 0x0
	// FnContext marks a function input parameter of type context.Context.
	FnContext FnParamKind = 0x1
	// FnEventTime indicates a function input parameter of type typex.EventTime.
	FnEventTime FnParamKind = 0x2
	// FnValue indicates a function input parameter of an ordinary Go type.
	FnValue FnParamKind = 0x4
	// FnIter indicates a function input parameter that is an iterator.
	// Examples of iterators:
	//       "func (*int) bool"
	//       "func (*string, *T) bool"
	// If there are 2 parameters, a KV input is implied.
	FnIter FnParamKind = 0x08
	// FnReIter indicates a function input parameter that is a reiterable
	// iterator.
	// The function signature is a function returning a function matching
	// the iterator signature.
	//   "func() func (*int) bool"
	//   "func() func (*string, *T) bool"
	// are reiterable versions of the FnIter examples.
	FnReIter FnParamKind = 0x10
	// FnEmit indicates a function input parameter that is an emitter.
	// Examples of emitters:
	//       "func (int)"
	//       "func (string, T)"
	//       "func (EventTime, int)"
	//       "func (EventTime, string, T)"
	// If there are 2 regular parameters, a KV output is implied. An optional
	// EventTime is allowed as well. Emitters cannot fail.
	FnEmit FnParamKind = 0x20
	// FnType indicates a function input parameter that is a type for a coder. It
	// is only valid for coders.
	FnType FnParamKind = 0x40
	// FnWindow indicates a function input parameter that implements typex.Window.
	FnWindow FnParamKind = 0x80
	// FnRTracker indicates a function input parameter that implements
	// sdf.RTracker.
	FnRTracker FnParamKind = 0x100
	// FnMultiMap indicates a function input parameter that is a multimap.
	// The function signature is a function taking a value and returning a function
	// matching the iterator signature.
	// Example:
	//				"func(string) func (*int) bool"
	FnMultiMap FnParamKind = 0x200
	// FnPane indicates a function input parameter that is a PaneInfo
	FnPane FnParamKind = 0x400
	// FnBundleFinalization indicates a function input parameter that implements typex.BundleFinalization.
	FnBundleFinalization FnParamKind = 0x800
	// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
	FnWatermarkEstimator FnParamKind = 0x1000
	// FnStateProvider indicates a function input parameter that implements state.Provider
	FnStateProvider FnParamKind = 0x2000
	// FnTimerProvider indicates a function input parameter that implements timer.Provider
	FnTimerProvider FnParamKind = 0x4000
)

func (FnParamKind) String

func (k FnParamKind) String() string

type ReturnKind

type ReturnKind int

ReturnKind represents the kinds of return values a user function may provide.

const (
	RetIllegal             ReturnKind = 0x0
	RetEventTime           ReturnKind = 0x1
	RetValue               ReturnKind = 0x2
	RetError               ReturnKind = 0x4
	RetRTracker            ReturnKind = 0x8
	RetProcessContinuation ReturnKind = 0x10
)

The supported types of ReturnKind.

func (ReturnKind) String

func (k ReturnKind) String() string

type ReturnParam

type ReturnParam struct {
	Kind ReturnKind
	T    reflect.Type
}

ReturnParam captures the kind and type of a single user function return value.

func SubReturns

func SubReturns(list []ReturnParam, indices ...int) []ReturnParam

SubReturns returns the subsequence of the given return params with the given indices.

type Signature

type Signature struct {
	// OptArgs is the optional arguments allowed in order, if any, before any
	// required arguments. Must be concrete types.
	OptArgs []reflect.Type
	// Args is the required arguments allowed in order, if any.
	Args []reflect.Type
	// Return is the required returns allowed in order, if any.
	Return []reflect.Type
	// OptReturn is the optional returns allowed in order, if any, after any
	// required returns. Must be concrete types.
	OptReturn []reflect.Type
}

Signature is a concise representation of a group of function types. The function types in a group can differ in optional leading arguments and trailing returns only. For example, a signature can represent:

(context.Context?, int, string) -> (bool, error?)

where the context arguments and error return are optional. The int and string parameters as well as the bool return are mandatory.

func MakePredicate

func MakePredicate(args ...reflect.Type) *Signature

MakePredicate creates a simple N-ary predicate: <args> -> bool.

func Replace

func Replace(sig *Signature, old, new reflect.Type) *Signature

Replace substitutes the old top-level type for the new one. It is intended to specialize generic signatures to concrete ones.

func (*Signature) String

func (sig *Signature) String() string

type TypeMismatchError

type TypeMismatchError struct {
	Got, Want reflect.Type
}

TypeMismatchError indicates we didn't get the type we expected.

func (*TypeMismatchError) Error

func (e *TypeMismatchError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL