Documentation
¶
Index ¶
- func Register(name string, source DataSource)
- func SourceIterChannel(iter Iterator, filter expr.Node, sigCh <-chan bool) <-chan Message
- type Aggregations
- type ContextSimple
- func (m *ContextSimple) All() map[string]value.Value
- func (m *ContextSimple) Body() interface{}
- func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error
- func (m *ContextSimple) Delete(row map[string]value.Value) error
- func (m ContextSimple) Get(key string) (value.Value, bool)
- func (m *ContextSimple) Key() uint64
- func (m *ContextSimple) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
- func (m *ContextSimple) Row() map[string]value.Value
- func (m *ContextSimple) Ts() time.Time
- type ContextUrlValues
- func (m *ContextUrlValues) Body() interface{}
- func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error
- func (m ContextUrlValues) Get(key string) (value.Value, bool)
- func (m *ContextUrlValues) Key() uint64
- func (m ContextUrlValues) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
- func (m ContextUrlValues) Row() map[string]value.Value
- func (m *ContextUrlValues) String() string
- func (m ContextUrlValues) Ts() time.Time
- type ContextWriterEmpty
- type CsvDataSource
- func (m *CsvDataSource) Close() error
- func (m *CsvDataSource) Columns() []string
- func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator
- func (m *CsvDataSource) MesgChan(filter expr.Node) <-chan Message
- func (m *CsvDataSource) Next() Message
- func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)
- func (m *CsvDataSource) Tables() []string
- type DataSource
- type DataSourceFeatures
- type DataSources
- type Features
- type GroupBy
- type Iterator
- type JsonWrapper
- type Message
- type Projection
- type RuntimeConfig
- type Scanner
- type ScannerColumns
- type Seeker
- type Sort
- type SourceConn
- type SourcePlanner
- type SqlDriverMessage
- type SqlDriverMessageMap
- type StaticDataSource
- func (m *StaticDataSource) Close() error
- func (m *StaticDataSource) Columns() []string
- func (m *StaticDataSource) CreateIterator(filter expr.Node) Iterator
- func (m *StaticDataSource) MesgChan(filter expr.Node) <-chan Message
- func (m *StaticDataSource) Next() Message
- func (m *StaticDataSource) Open(connInfo string) (SourceConn, error)
- func (m *StaticDataSource) Tables() []string
- type TimeValue
- type UrlValuesMsg
- type ValueContextWrapper
- type WhereFilter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Register ¶
func Register(name string, source DataSource)
Register makes a datasource available by the provided name. If Register is called twice with the same name or if source is nil, it panics.
Types ¶
type Aggregations ¶
type Aggregations interface { DataSource Aggregate(expr.SqlStatement) error }
type ContextSimple ¶
func NewContextSimple ¶
func NewContextSimple() *ContextSimple
func NewContextSimpleData ¶
func NewContextSimpleData(data map[string]value.Value) *ContextSimple
func NewContextSimpleTs ¶
func (*ContextSimple) Body ¶
func (m *ContextSimple) Body() interface{}
func (*ContextSimple) Commit ¶
func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error
func (*ContextSimple) Key ¶
func (m *ContextSimple) Key() uint64
func (*ContextSimple) Put ¶
func (m *ContextSimple) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
func (*ContextSimple) Ts ¶
func (m *ContextSimple) Ts() time.Time
type ContextUrlValues ¶
func NewContextUrlValues ¶
func NewContextUrlValues(uv url.Values) *ContextUrlValues
func NewContextUrlValuesTs ¶
func NewContextUrlValuesTs(uv url.Values, ts time.Time) *ContextUrlValues
func (*ContextUrlValues) Body ¶
func (m *ContextUrlValues) Body() interface{}
func (*ContextUrlValues) Delete ¶
func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error
func (*ContextUrlValues) Key ¶
func (m *ContextUrlValues) Key() uint64
func (ContextUrlValues) Put ¶
func (m ContextUrlValues) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
func (*ContextUrlValues) String ¶
func (m *ContextUrlValues) String() string
func (ContextUrlValues) Ts ¶
func (m ContextUrlValues) Ts() time.Time
type ContextWriterEmpty ¶
type ContextWriterEmpty struct{}
func (*ContextWriterEmpty) Delete ¶
func (m *ContextWriterEmpty) Delete(delRow map[string]value.Value) error
func (*ContextWriterEmpty) Put ¶
func (m *ContextWriterEmpty) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
type CsvDataSource ¶
type CsvDataSource struct {
// contains filtered or unexported fields
}
Csv DataStoure, implements qlbridge DataSource to scan through data
see interfaces possible but they are
func NewCsvSource ¶
func NewCsvSource(ior io.Reader, exit <-chan bool) (*CsvDataSource, error)
Csv reader assumes we are getting first row as headers
func (*CsvDataSource) Close ¶
func (m *CsvDataSource) Close() error
func (*CsvDataSource) Columns ¶
func (m *CsvDataSource) Columns() []string
func (*CsvDataSource) CreateIterator ¶
func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator
func (*CsvDataSource) Next ¶
func (m *CsvDataSource) Next() Message
func (*CsvDataSource) Open ¶
func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)
func (*CsvDataSource) Tables ¶
func (m *CsvDataSource) Tables() []string
type DataSource ¶
type DataSource interface { Tables() []string Open(connInfo string) (SourceConn, error) Close() error }
A datasource is most likely a database, file, api, in-mem data etc something that provides data rows. If the source is a regular database it can do its own Filter, Seek, Sort, etc. It may not implement all features of a database, in which case we will use our own execution engine.
Minimum Features:
- Scanning: iterate through messages/rows, use expr to evaluate this is the minium we need to implement sql select
- Schema Tables: at a minium tables available, the column level data can be introspected so is optional
Optional Features:
- Seek ie, key-value lookup, or indexed rows
- Projection ie, selecting specific fields
- Where filtering response
- GroupBy
- Aggregations ie, count(*), avg() etc
- Sort sort response, very important for fast joins
Non Select based Sql DML Operations:
- Delete
- Update
- Upsert
- Insert
DDL/Schema Operations
- schema discovery
- create
- index
type DataSourceFeatures ¶
type DataSourceFeatures struct { Features Features DataSource }
func NewFeaturedSource ¶
func NewFeaturedSource(src DataSource) *DataSourceFeatures
type DataSources ¶
type DataSources struct {
// contains filtered or unexported fields
}
Our internal map of different types of datasources that are registered for our runtime system to use
func DataSourcesRegistry ¶
func DataSourcesRegistry() *DataSources
get registry of all datasource types
func (*DataSources) Get ¶
func (m *DataSources) Get(sourceType string) *DataSourceFeatures
func (*DataSources) String ¶
func (m *DataSources) String() string
type Features ¶
We do type introspection in advance to speed up runtime feature detection for datasources
type GroupBy ¶
type GroupBy interface { DataSource GroupBy(expr.SqlStatement) error }
type Iterator ¶
type Iterator interface {
Next() Message
}
simple iterator interface for paging through a datastore Messages/rows
- used for scanning
- for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows
type JsonWrapper ¶
type JsonWrapper json.RawMessage
func (*JsonWrapper) MarshalJSON ¶
func (m *JsonWrapper) MarshalJSON() ([]byte, error)
func (*JsonWrapper) Scan ¶
func (m *JsonWrapper) Scan(src interface{}) error
func (*JsonWrapper) Unmarshal ¶
func (m *JsonWrapper) Unmarshal(v interface{}) error
func (*JsonWrapper) UnmarshalJSON ¶
func (m *JsonWrapper) UnmarshalJSON(data []byte) error
Unmarshall bytes into this typed struct
type Message ¶
type Message interface { Key() uint64 Body() interface{} }
represents a message routable by the topology. The Key() method is used to route the message in certain topologies. Body() is used to express something user specific. see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid
type Projection ¶
type Projection interface { // Describe the Columns etc Projection() (*expr.Projection, error) }
Some data sources that implement more features, can provide
their own projection.
type RuntimeConfig ¶
type RuntimeConfig struct { Sources *DataSources // All registered DataSources from which we can create connections DisableRecover bool // contains filtered or unexported fields }
The RuntimeSchema config providing access to available datasources
given connection info, get datasource
func NewRuntimeConfig ¶
func NewRuntimeConfig() *RuntimeConfig
func (*RuntimeConfig) Conn ¶
func (m *RuntimeConfig) Conn(db string) SourceConn
Get connection for given Database
@db database name
func (*RuntimeConfig) DataSource ¶
func (m *RuntimeConfig) DataSource(connInfo string) DataSource
given connection info, get datasource
@connInfo = csv:///dev/stdin mockcsv
func (*RuntimeConfig) SetConnInfo ¶
func (m *RuntimeConfig) SetConnInfo(connInfo string)
Our RunTime configuration possibly only supports a single schema/connection info. for example, the sql/driver interface, so will be set here.
@connInfo = csv:///dev/stdin
type Scanner ¶
type Scanner interface { ScannerColumns // create a new iterator for underlying datasource CreateIterator(filter expr.Node) Iterator MesgChan(filter expr.Node) <-chan Message }
A scanner, most basic of data sources, just iterate through
rows without any optimizations
type ScannerColumns ¶
type ScannerColumns interface {
Columns() []string
}
Interface for a data source exposing column positions for []driver.Value iteration
type Seeker ¶
type Seeker interface { DataSource // Just because we have Get, Multi-Get, doesn't mean we can seek all // expressions, find out. CanSeek(*expr.SqlSelect) Get(key string) Message MultiGet(keys []string) []Message }
Interface for Seeking row values instead of scanning (ie, Indexed)
type Sort ¶
type Sort interface { DataSource Sort(expr.SqlStatement) error }
type SourceConn ¶
type SourceConn interface {
Close() error
}
Connection, only one guaranteed feature, although should implement many more (scan, seek, etc)
func OpenConn ¶
func OpenConn(sourceName, sourceConfig string) (SourceConn, error)
Open a datasource
sourcename = "csv", "elasticsearch"
type SourcePlanner ¶
type SourcePlanner interface { // Accept a sql statement, to plan the execution // ideally, this would be done by planner but, we need // source specific planners, as each backend has different features Accept(expr.SubVisitor) (Scanner, error) }
Some sources can do their own planning
type SqlDriverMessage ¶
func (*SqlDriverMessage) Body ¶
func (m *SqlDriverMessage) Body() interface{}
func (*SqlDriverMessage) Key ¶
func (m *SqlDriverMessage) Key() uint64
type SqlDriverMessageMap ¶
func NewSqlDriverMessageMap ¶
func NewSqlDriverMessageMap() *SqlDriverMessageMap
func (*SqlDriverMessageMap) Body ¶
func (m *SqlDriverMessageMap) Body() interface{}
func (*SqlDriverMessageMap) Key ¶
func (m *SqlDriverMessageMap) Key() uint64
func (*SqlDriverMessageMap) Ts ¶
func (m *SqlDriverMessageMap) Ts() time.Time
type StaticDataSource ¶
type StaticDataSource struct {
// contains filtered or unexported fields
}
Static DataSource, implements qlbridge DataSource to allow
in memory native go data to have a Schema and implement other DataSource interfaces such as Open, Close
func NewStaticDataSource ¶
func NewStaticDataSource(name string, data [][]driver.Value, cols []string) *StaticDataSource
func NewStaticDataValue ¶
func NewStaticDataValue(data interface{}, name string) *StaticDataSource
func (*StaticDataSource) Close ¶
func (m *StaticDataSource) Close() error
func (*StaticDataSource) Columns ¶
func (m *StaticDataSource) Columns() []string
func (*StaticDataSource) CreateIterator ¶
func (m *StaticDataSource) CreateIterator(filter expr.Node) Iterator
func (*StaticDataSource) MesgChan ¶
func (m *StaticDataSource) MesgChan(filter expr.Node) <-chan Message
func (*StaticDataSource) Next ¶
func (m *StaticDataSource) Next() Message
func (*StaticDataSource) Open ¶
func (m *StaticDataSource) Open(connInfo string) (SourceConn, error)
func (*StaticDataSource) Tables ¶
func (m *StaticDataSource) Tables() []string
type UrlValuesMsg ¶
type UrlValuesMsg struct {
// contains filtered or unexported fields
}
func NewUrlValuesMsg ¶
func NewUrlValuesMsg(id uint64, body *ContextUrlValues) *UrlValuesMsg
func (*UrlValuesMsg) Body ¶
func (m *UrlValuesMsg) Body() interface{}
func (*UrlValuesMsg) Key ¶
func (m *UrlValuesMsg) Key() uint64
func (*UrlValuesMsg) String ¶
func (m *UrlValuesMsg) String() string
type ValueContextWrapper ¶
type ValueContextWrapper struct { *SqlDriverMessage // contains filtered or unexported fields }
func NewValueContextWrapper ¶
func NewValueContextWrapper(msg *SqlDriverMessage, cols map[string]*expr.Column) *ValueContextWrapper
func (*ValueContextWrapper) Ts ¶
func (m *ValueContextWrapper) Ts() time.Time
type WhereFilter ¶
type WhereFilter interface { DataSource Filter(expr.SqlStatement) error }