Documentation ¶
Index ¶
- func MustRegisterGlobalSinkCreator(typeName string, c SinkCreator)
- func MustRegisterGlobalSourceCreator(typeName string, c SourceCreator)
- func NewBQLBox(stmt *parser.SelectStmt, reg udf.FunctionRegistry) *bqlBox
- func RegisterGlobalSinkCreator(typeName string, c SinkCreator) error
- func RegisterGlobalSourceCreator(typeName string, c SourceCreator) error
- type IOParams
- type SinkCreator
- type SinkCreatorRegistry
- type SourceCreator
- type SourceCreatorRegistry
- type TopologyBuilder
- func (tb *TopologyBuilder) AddSelectStmt(stmt *parser.SelectStmt) (core.SinkNode, <-chan *core.Tuple, error)
- func (tb *TopologyBuilder) AddSelectUnionStmt(stmts *parser.SelectUnionStmt) (core.SinkNode, <-chan *core.Tuple, error)
- func (tb *TopologyBuilder) AddStmt(stmt interface{}) (core.Node, error)
- func (tb *TopologyBuilder) RunEvalStmt(stmt *parser.EvalStmt) (data.Value, error)
- func (tb *TopologyBuilder) Topology() core.Topology
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustRegisterGlobalSinkCreator ¶
func MustRegisterGlobalSinkCreator(typeName string, c SinkCreator)
MustRegisterGlobalSinkCreator is like RegisterGlobalSinkCreator but panics if an error occurred.
func MustRegisterGlobalSourceCreator ¶
func MustRegisterGlobalSourceCreator(typeName string, c SourceCreator)
MustRegisterGlobalSourceCreator is like RegisterGlobalSourceCreator but panics if an error occurred.
func NewBQLBox ¶
func NewBQLBox(stmt *parser.SelectStmt, reg udf.FunctionRegistry) *bqlBox
func RegisterGlobalSinkCreator ¶
func RegisterGlobalSinkCreator(typeName string, c SinkCreator) error
RegisterGlobalSinkCreator adds a SinkCreator which can be referred from all topologies. SinkCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.
func RegisterGlobalSourceCreator ¶
func RegisterGlobalSourceCreator(typeName string, c SourceCreator) error
RegisterGlobalSourceCreator adds a SourceCreator which can be referred from alltopologies. SourceCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.
Types ¶
type IOParams ¶
type IOParams struct { // TypeName is the name of the type registered to SensorBee. TypeName string // Name is the name of the instance specified in a CREATE statement. Name string }
IOParams has parameters for IO plugins.
type SinkCreator ¶
type SinkCreator interface { // CreateSink creates a new Sink instance using given parameters. CreateSink(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Sink, error) }
SinkCreator is an interface which creates instances of a Sink.
type SinkCreatorRegistry ¶
type SinkCreatorRegistry interface { // Register adds a Sink creator to the registry. It returns an error if // the type name is already registered. Register(typeName string, c SinkCreator) error // Lookup returns a Sink creator having the type name. It returns // core.NotExistError if it doesn't have the creator. Lookup(typeName string) (SinkCreator, error) // List returns all creators the registry has. The caller can safely modify // the map returned from this method. List() (map[string]SinkCreator, error) // Unregister removes a creator from the registry. It returns core.NotExistError // when the registry doesn't have a creator having the type name. // // The registry itself doesn't support cascading delete. It should properly // done by the caller. Unregister(typeName string) error }
SinkCreatorRegistry manages creators of Sinks.
func CopyGlobalSinkCreatorRegistry ¶
func CopyGlobalSinkCreatorRegistry() (SinkCreatorRegistry, error)
CopyGlobalSinkCreatorRegistry creates a new independent copy of the global SinkCreatorRegistry.
func NewDefaultSinkCreatorRegistry ¶
func NewDefaultSinkCreatorRegistry() SinkCreatorRegistry
NewDefaultSinkCreatorRegistry returns a SinkCreatorRegistry having a default implementation.
type SourceCreator ¶
type SourceCreator interface { // CreateSource creates a new Source instance using given parameters. CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error) }
SourceCreator is an interface which creates instances of a Source.
type SourceCreatorRegistry ¶
type SourceCreatorRegistry interface { // Register adds a Source creator to the registry. It returns an error if // the type name is already registered. Register(typeName string, c SourceCreator) error // Lookup returns a Source creator having the type name. It returns // core.NotExistError if it doesn't have the creator. Lookup(typeName string) (SourceCreator, error) // List returns all creators the registry has. The caller can safely modify // the map returned from this method. List() (map[string]SourceCreator, error) // Unregister removes a creator from the registry. It returns core.NotExistError // when the registry doesn't have a creator having the type name. // // The registry itself doesn't support cascading delete. It should properly // done by the caller. Unregister(typeName string) error }
SourceCreatorRegistry manages creators of Sources.
func CopyGlobalSourceCreatorRegistry ¶
func CopyGlobalSourceCreatorRegistry() (SourceCreatorRegistry, error)
CopyGlobalSourceCreatorRegistry creates a new independent copy of the global SourceCreatorRegistry.
func NewDefaultSourceCreatorRegistry ¶
func NewDefaultSourceCreatorRegistry() SourceCreatorRegistry
NewDefaultSourceCreatorRegistry returns a SourceCreatorRegistry having a default implementation.
type TopologyBuilder ¶
type TopologyBuilder struct { Reg udf.FunctionManager UDSFCreators udf.UDSFCreatorRegistry UDSCreators udf.UDSCreatorRegistry SourceCreators SourceCreatorRegistry SinkCreators SinkCreatorRegistry UDSStorage udf.UDSStorage // contains filtered or unexported fields }
func NewTopologyBuilder ¶
func NewTopologyBuilder(t core.Topology) (*TopologyBuilder, error)
NewTopologyBuilder creates a new TopologyBuilder which dynamically creates nodes from BQL statements. The target Topology can be shared by multiple TopologyBuilders.
TopologyBuilder doesn't support atomic topology building. For example, when a user wants to add three statement and the second statement fails, only the node created from the first statement is registered to the topology and it starts to generate tuples. Others won't be registered.
func (*TopologyBuilder) AddSelectStmt ¶
func (tb *TopologyBuilder) AddSelectStmt(stmt *parser.SelectStmt) (core.SinkNode, <-chan *core.Tuple, error)
AddSelectStmt creates nodes handling a SELECT statement in the topology. It returns the Sink node and the channel tied to it, the chan receiving tuples from the Sink, and an error if happens. The caller must stop the Sink node once it get unnecessary.
func (*TopologyBuilder) AddSelectUnionStmt ¶
func (tb *TopologyBuilder) AddSelectUnionStmt(stmts *parser.SelectUnionStmt) (core.SinkNode, <-chan *core.Tuple, error)
AddSelectUnionStmt creates nodes handling a SELECT ... UNION ALL statement in the topology. It returns the Sink node and the channel tied to it, the chan receiving tuples from the Sink, and an error if happens. The caller must stop the Sink node once it get unnecessary.
func (*TopologyBuilder) AddStmt ¶
func (tb *TopologyBuilder) AddStmt(stmt interface{}) (core.Node, error)
AddStmt add a node created from a statement to the topology. It returns a created node. It returns a nil node when the statement is CREATE STATE.
func (*TopologyBuilder) RunEvalStmt ¶
RunEvalStmt evaluates the expression contained in the given EvalStmt and returns the evaluation result.
func (*TopologyBuilder) Topology ¶
func (tb *TopologyBuilder) Topology() core.Topology