Documentation ¶
Index ¶
- func MustRegisterGlobalUDF(name string, f UDF)
- func MustRegisterGlobalUDSCreator(typeName string, c UDSCreator)
- func MustRegisterGlobalUDSFCreator(typeName string, c UDSFCreator)
- func RegisterGlobalUDF(name string, f UDF) error
- func RegisterGlobalUDSCreator(typeName string, c UDSCreator) error
- func RegisterGlobalUDSFCreator(typeName string, c UDSFCreator) error
- type FunctionManager
- type FunctionRegistry
- type UDF
- func BinaryFunc(f func(*core.Context, data.Value, data.Value) (data.Value, error)) UDF
- func ConvertGeneric(function interface{}) (UDF, error)
- func ConvertGenericAggregate(function interface{}, aggParams []bool) (UDF, error)
- func Func(f func(*core.Context, ...data.Value) (data.Value, error), arity int) UDF
- func MustConvertGeneric(function interface{}) UDF
- func MustConvertGenericAggregate(function interface{}, aggParams []bool) UDF
- func NullaryFunc(f func(*core.Context) (data.Value, error)) UDF
- func TernaryFunc(f func(*core.Context, data.Value, data.Value, data.Value) (data.Value, error)) UDF
- func UnaryFunc(f func(*core.Context, data.Value) (data.Value, error)) UDF
- func VariadicFunc(f func(*core.Context, ...data.Value) (data.Value, error)) UDF
- type UDSCreator
- type UDSCreatorRegistry
- type UDSF
- type UDSFCreator
- type UDSFCreatorRegistry
- type UDSFDeclarer
- type UDSFInputConfig
- type UDSLoader
- type UDSStorage
- type UDSStorageWriter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustRegisterGlobalUDF ¶
MustRegisterGlobalUDF is like RegisterGlobalUDF but panics if an error occurred.
func MustRegisterGlobalUDSCreator ¶
func MustRegisterGlobalUDSCreator(typeName string, c UDSCreator)
MustRegisterGlobalUDSCreator is like RegisterGlobalUDSCreator but panics if an error occurred.
func MustRegisterGlobalUDSFCreator ¶
func MustRegisterGlobalUDSFCreator(typeName string, c UDSFCreator)
MustRegisterGlobalUDSFCreator is like RegisterGlobalUDSFCreator but panics if an error occurred.
func RegisterGlobalUDF ¶
RegisterGlobalUDF adds a UDF which is visible to all topologies. UDFs registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.
func RegisterGlobalUDSCreator ¶
func RegisterGlobalUDSCreator(typeName string, c UDSCreator) error
RegisterGlobalUDSCreator adds a UDSCreator which can be referred from all topologies. UDSCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.
func RegisterGlobalUDSFCreator ¶
func RegisterGlobalUDSFCreator(typeName string, c UDSFCreator) error
RegisterGlobalUDSFCreator adds a UDSFCreator which can be referred from all topologies. UDSFCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.
Types ¶
type FunctionManager ¶
type FunctionManager interface { FunctionRegistry // Register allows to add a function. Register(name string, f UDF) error }
FunctionManager is a FunctionRegistry that allows to register additional functions.
func CopyGlobalUDFRegistry ¶
func CopyGlobalUDFRegistry(ctx *core.Context) FunctionManager
CopyGlobalUDFRegistry creates a new FunctionManager which has all UDFs registered to the global function manager.
func NewDefaultFunctionRegistry ¶
func NewDefaultFunctionRegistry(ctx *core.Context) FunctionManager
NewDefaultFunctionRegistry returns a new instance of the default FunctionRegistry implementation.
type FunctionRegistry ¶
type FunctionRegistry interface { // Context returns a core.Context associated with the registry. Context() *core.Context // Lookup will return a function with the given name and arity // or core.NotExistError if there is none. Note that this interface allows // multiple functions with the same name but different arity, // and it also allows functions with an arbitrary number of // parameters. However, a function returned must never be used // with a different arity than the one given in the Lookup call. Lookup(name string, arity int) (UDF, error) }
FunctionRegistry is an interface to lookup functions for use in BQL statements by their name.
type UDF ¶
type UDF interface { // Call calls the UDF. data.Values must not be nil. Call(*core.Context, ...data.Value) (data.Value, error) // Accept checks if the function accepts the given number of arguments // excluding core.Context. Accept(arity int) bool // IsAggregationParameter returns true if the k-th parameter expects // aggregated values. A UDF with Accept(n) == true is an aggregate // function if and only if this function returns true for one or more // values of k in the range 0, ..., n-1. IsAggregationParameter(k int) bool }
UDF is an interface having a user defined function.
func BinaryFunc ¶
BinaryFunc creates a UDF that takes two data.Value parameters.
func ConvertGeneric ¶
ConvertGeneric creates a new UDF from various form of functions. Arguments of the function don't have to be tuple types, but some standard types are allowed. The UDF returned provide a weak type conversion, that is it uses data.To{Type} function to convert values. Therefore, a string may be passed as an integer or vice versa. If the function wants to provide strict type conversion, generate UDF by Func function.
Acceptable types:
- bool
- standard integers
- standard floats
- string
- time.Time
- data.Bool, data.Int, data.Float, data.String, data.Blob, data.Timestamp, data.Array, data.Map, data.Value
- a slice of types above
func ConvertGenericAggregate ¶
ConvertGenericAggregate creates a new aggregate UDF from various form of functions. aggParams argument is used to indicate which arguments of the function are aggregation parameter. receives aggregation parameter. Supported and acceptable types are the same as ConvertGeneric.
func MustConvertGeneric ¶
func MustConvertGeneric(function interface{}) UDF
MustConvertGeneric is like ConvertGeneric, but panics on errors.
func MustConvertGenericAggregate ¶
MustConvertGenericAggregate is like ConvertGenericAggregate, but panics on errors.
func NullaryFunc ¶
NullaryFunc creates a UDF that takes no parameters (except the Context).
func TernaryFunc ¶
TernaryFunc creates a UDF that takes three data.Value parameters.
type UDSCreator ¶
type UDSCreator interface { // CreateState creates an instance of the state type. CreateState must not // call core.SharedState.Init. CreateState(ctx *core.Context, params data.Map) (core.SharedState, error) }
UDSCreator creates a User Defined State based on core.SharedState.
func UDSCreatorFunc ¶
func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator
UDSCreatorFunc creates a UDSCreator from a function.
type UDSCreatorRegistry ¶
type UDSCreatorRegistry interface { // Register adds a UDS creator to the registry. It returns an error if // the type name is already registered. Register(typeName string, c UDSCreator) error // Lookup returns a UDS creator having the type name. It returns // core.NotExistError if it doesn't have the creator. Lookup(typeName string) (UDSCreator, error) // List returns all creators the registry has. The caller can safely modify // the map returned from this method. List() (map[string]UDSCreator, error) // Unregister removes a creator from the registry. It returns core.NotExistError // when the registry doesn't have a creator having the type name. // // The registry itself doesn't support cascading delete. It should properly // done by the caller. Unregister(typeName string) error }
UDSCreatorRegistry manages creators of UDSs.
func CopyGlobalUDSCreatorRegistry ¶
func CopyGlobalUDSCreatorRegistry() (UDSCreatorRegistry, error)
CopyGlobalUDSCreatorRegistry creates a new independent copy of the global UDSCreatorRegistry.
func NewDefaultUDSCreatorRegistry ¶
func NewDefaultUDSCreatorRegistry() UDSCreatorRegistry
NewDefaultUDSCreatorRegistry returns a UDSCreatorRegistry having a default implementation.
type UDSF ¶
type UDSF interface { // Process sends a tuple to the UDSF. // // When the UDSF is running in the stream mode, this method must not block // and return immediately after it finished processing the received tuple. // It is called everytime a tuple is received from streams. It behaves just // like core.Box. // // When the UDSF is running in the source mode, this method is only called // once. It can block until it generates all tuples or Terminate method // is called. A tuple passed to this method in the source mode doesn't // contain anything meaningful. It behaves like core.Source although the // interface is like core.Box. The core.Writer returns core.ErrSourceStopped // when the UDSF running in the source mode is stopped. Therefore, if // Process method returns on that error, the implementation of Terminate // can just be resource deallocation. Process(ctx *core.Context, t *core.Tuple, w core.Writer) error // Terminate terminates the UDSF. Resources allocated when the UDSF is // created by UDFSCreator should be released in this method. Also, when // the UDSF is running in the source mode, Process method must return as // soon as Terminate is called. Terminate(ctx *core.Context) error }
UDSF is a user-defined stream-generating function. While a regular UDF generates a tuple from an input tuple, UDSF can generate multiple tuples from a tuple or even drop tuples. It's also a little different from a "function". UDSF can have a state. Thus, a UDSF is created by UDSFCreator for each invocation in a BQL statement.
UDSF doesn't have Init method because initialization should be done in UDSFCreator.CreateUDSF.
There're two kinds of processing modes of UDSFs: stream mode and source mode. A UDSF is processed in the stream mode when it has at least one input stream. Otherwise, the UDSF is processed in the source mode. Process and Terminate methods change their behavior based on the mode. See documentation of each method for details.
type UDSFCreator ¶
type UDSFCreator interface { // CreateUDSF returns a new UDSF instance. UDSF can be configured (via decl) // to accept inputs from multiple existing streams. It also receives // arguments passed to the UDSF in a BQL statement. The caller will call // UDSF.Terminate when the UDSF becomes unnecessary. // // When a UDSF needs inputs from other sources or streams, call Input method // of UDSFDeclarer. For example let's assume there's a UDSF below: // // func createMyUDSF(decl udf.UDSFDeclarer, input1, input2 string) (udf.UDSF, error) { // ... // decl.Input(input1, &udf.UDSFInputConfig{ // InputName: "custom_input_name_1", // }) // decl.Input(input2, &udf.UDSFInputConfig{ // InputName: "custom_input_name_2", // }) // ... // } // // func init() { // udf.MustRegisterGlobalUDSFCreator("my_udsf", // udf.MustConvertToUDSFCreator(createMyUDSF)) // } // // Then a user can specify input stream by the following statement: // // CREATE STREAM stream1 AS SELECT ...; // CREATE STREAM stream2 AS SELECT ...; // CREATE STREAM join_by_udsf // SELECT RSTREAM * FROM my_udsf("stream1", "stream2") [RANGE 1 TUPLES]; // // In this example, my_udsf receives two streams: stream1 and stream2 // created in advance. // // A UDSF doesn't have to have an input. For example, there can be a UDSF // which generates sequential numbers and doesn't depend on any stream. // Such UDSFs will be run as the source mode. See UDSF for more details. CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error) // Accept returns true if the UDSF supports the given arity. Accept(arity int) bool }
UDSFCreator creates a new UDSF instance.
func ConvertToUDSFCreator ¶
func ConvertToUDSFCreator(function interface{}) (UDSFCreator, error)
ConvertToUDSFCreator converts a function to a UDSFCreator.
func MustConvertToUDSFCreator ¶
func MustConvertToUDSFCreator(function interface{}) UDSFCreator
MustConvertToUDSFCreator converts a function to a UDSFCreator. It panics if there is an error during conversion.
type UDSFCreatorRegistry ¶
type UDSFCreatorRegistry interface { // Register adds a UDSF creator to the registry. It returns an error if // the type name is already registered. Register(typeName string, c UDSFCreator) error // Lookup returns a UDSF creator having the type name. It returns // core.NotExistError if it doesn't have the creator. Lookup(typeName string, arity int) (UDSFCreator, error) // List returns all creators the registry has. The caller can safely modify // the map returned from this method. List() (map[string]UDSFCreator, error) // Unregister removes a creator from the registry. It returns core.NotExistError // when the registry doesn't have a creator having the type name. // // The registry itself doesn't support cascading delete. It should properly // done by the caller. Unregister(typeName string) error }
UDSFCreatorRegistry manages creators of UDSFs.
func CopyGlobalUDSFCreatorRegistry ¶
func CopyGlobalUDSFCreatorRegistry() (UDSFCreatorRegistry, error)
CopyGlobalUDSFCreatorRegistry creates a new independent copy of the global UDSFCreatorRegistry.
func NewDefaultUDSFCreatorRegistry ¶
func NewDefaultUDSFCreatorRegistry() UDSFCreatorRegistry
NewDefaultUDSFCreatorRegistry returns a UDSFCreatorRegistry having a default implementation.
type UDSFDeclarer ¶
type UDSFDeclarer interface { // Input adds an input from an existing stream. Input(name string, config *UDSFInputConfig) error // ListInputs returns all inputs declared by a UDSF. The caller can safely // modify the map returned from this method. ListInputs() map[string]*UDSFInputConfig }
UDSFDeclarer allow UDSFs to customize their behavior.
type UDSFInputConfig ¶
type UDSFInputConfig struct { // InputName is a custom name attached to incoming tuples. If this name is // empty, "*" will be used. InputName string }
UDSFInputConfig has input configuration parameters for UDSF.
type UDSLoader ¶
type UDSLoader interface { UDSCreator // LoadState loads a state from saved data. The saved data can be read from // io.Reader. Parameters given by SET clause are passed as params. LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error) }
UDSLoader loads a User Defined State from saved data. A UDS cannot be loaded if a UDSCreator doesn't implement UDSLoader even if the UDS implements core.LoadableSharedState.
When a UDS isn't created yet, UDSLoader.LoadState will be used to load the state and core.LoadableSharedState.Load will not be used.
When a UDS is already created or loaded and it implements core.LoadableSharedState, its Load method is called to load a model and UDSLoader.LoadState will not be called. If a UDS doesn't implement core.LoadableSharedState but UDSLoader is provided for its type, then UDSLoader.LoadState creates a new instance and the previous instance is replaced with it, which means loading the UDS could consume twice as much memory as core.LoadableSharedState.Load does. When a UDS doesn't implement core.LoadableSharedState and its UDSCreator doesn't implement UDSLoader, the UDS cannot be loaded.
type UDSStorage ¶
type UDSStorage interface { // Save returns a writer to write the state data. Save doesn't discard the // previously saved data until UDSStorageWriter.Commit is called. // // Save can be called while a state is being loaded. In such case, behavior // is up to each storage. Some possible implementations are: (1) Save // creates a new entry for the state and conflicting Loads continue to read // the previous data, (2) Save blocks until conflicting Loads finish, and // (3) Save returns an error. Implementation (3) isn't recommended because // it might result in starvation in highly concurrent workload. // // Either Commit or Abort of a UDSStorageWriter returned from this method // has to be called. When Commit is called, the data is persisted. When // Abort is called, the data is discarded and the previous data remains. // // Save can write header information or other data such as a space for // storing checksum later to UDSStorageWriter before returning it. Save can // also manipulate the written data as long as the data can be loaded again. // // A caller can assign a tag to the saved state so that multiple versions of // the UDS can be managed with unique names. When a tag is an empty string, // "default" will be used. The valid format of tags is same as node names, // which is validated by core.ValidateSymbol. Save(topology, state, tag string) (UDSStorageWriter, error) // Load loads the previously saved data of the state. io.ReadCloser.Close // has to be called when it gets unnecessary. // // Load can be called while a state is being saved. In such case, behavior // is up to each storage. A storage's Load can block until Save is done, // can return an error, or can even return a reader of the previously saved // data. // // Load returns core.NotExistError when the state doesn't exist. // // When a tag is an empty string, "default" will be used. Load(topology, state, tag string) (io.ReadCloser, error) // ListTopologies returns a list of topologies that have saved states. ListTopologies() ([]string, error) // List returns a list of names of saved states in a topology as a map // whose key is a name of a UDS. Each value contains tags assigned to // the state as an array. List(topology string) (map[string][]string, error) }
UDSStorage is an interface to support saving and loading UDSs.
func NewInMemoryUDSStorage ¶
func NewInMemoryUDSStorage() UDSStorage
NewInMemoryUDSStorage creates a new UDSStorage which store all data in memory. This storage should only be used for experiment or test purpose.
type UDSStorageWriter ¶
type UDSStorageWriter interface { io.Writer // Commit persists the data written to the writer so far and closes it. // Write cannot be called once the data is committed. Commit() error // Abort discard the data written to the writer. Write cannot be called // after calling Abort. Abort() error }
UDSStorageWriter is used to save a state. An instance of UDSStorageWriter doesn't have to be thread-safe. It means that an instance may not be able to be used from multiple goroutines. However, different instances can be used concurrently so that multiple states can be saved simultaneously.