Documentation ¶
Index ¶
- Variables
- func NewNestedContextReader(readers []expr.ContextReader, ts time.Time) expr.ContextReader
- func Register(name string, source DataSource)
- func SourceIterChannel(iter Iterator, filter expr.Node, sigCh <-chan bool) <-chan Message
- type Aggregations
- type ContextSimple
- func (m *ContextSimple) All() map[string]value.Value
- func (m *ContextSimple) Body() interface{}
- func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error
- func (m *ContextSimple) Delete(row map[string]value.Value) error
- func (m ContextSimple) Get(key string) (value.Value, bool)
- func (m *ContextSimple) Id() uint64
- func (m *ContextSimple) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
- func (m *ContextSimple) Row() map[string]value.Value
- func (m *ContextSimple) Ts() time.Time
- type ContextUrlValues
- func (m *ContextUrlValues) Body() interface{}
- func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error
- func (m ContextUrlValues) Get(key string) (value.Value, bool)
- func (m *ContextUrlValues) Id() uint64
- func (m ContextUrlValues) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
- func (m ContextUrlValues) Row() map[string]value.Value
- func (m *ContextUrlValues) String() string
- func (m ContextUrlValues) Ts() time.Time
- type ContextWriterEmpty
- type CsvDataSource
- func (m *CsvDataSource) Close() error
- func (m *CsvDataSource) Columns() []string
- func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator
- func (m *CsvDataSource) MesgChan(filter expr.Node) <-chan Message
- func (m *CsvDataSource) Next() Message
- func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)
- func (m *CsvDataSource) Tables() []string
- type DataSource
- type DataSourceFeatures
- type DataSources
- type Deletion
- type Features
- type Field
- type FieldData
- type GroupBy
- type Iterator
- type JsonHelperScannable
- type JsonWrapper
- type Key
- type KeyCol
- type KeyInt
- type KeyInt64
- type Message
- type Mutator
- type NestedContextReader
- type NodeConfig
- type PatchWhere
- type Projection
- type RuntimeSchema
- type Scanner
- type Schema
- func (m *Schema) AddSourceSchema(ss *SourceSchema)
- func (m *Schema) AddTableName(tableName string, ss *SourceSchema)
- func (m *Schema) Current() bool
- func (m *Schema) RefreshSchema()
- func (m *Schema) Since(dur time.Duration) bool
- func (m *Schema) Table(tableName string) (*Table, error)
- func (m *Schema) Tables() []string
- type SchemaColumns
- type SchemaConfig
- type SchemaProvider
- type Seeker
- type Sort
- type SourceConfig
- type SourceConn
- type SourceMutation
- type SourcePlanner
- type SourceSchema
- type SourceSelectPlanner
- type SqlDriverMessage
- type SqlDriverMessageMap
- func (m *SqlDriverMessageMap) Body() interface{}
- func (m *SqlDriverMessageMap) Copy() *SqlDriverMessageMap
- func (m *SqlDriverMessageMap) Get(key string) (value.Value, bool)
- func (m *SqlDriverMessageMap) Id() uint64
- func (m *SqlDriverMessageMap) Key() string
- func (m *SqlDriverMessageMap) Row() map[string]value.Value
- func (m *SqlDriverMessageMap) SetKey(key string)
- func (m *SqlDriverMessageMap) SetKeyHashed(key string)
- func (m *SqlDriverMessageMap) SetRow(row []driver.Value)
- func (m *SqlDriverMessageMap) Ts() time.Time
- func (m *SqlDriverMessageMap) Values() []driver.Value
- type StringArray
- type Table
- func (m *Table) AddField(fld *Field)
- func (m *Table) AddFieldType(name string, valType value.ValueType)
- func (m *Table) AddValues(values []driver.Value)
- func (m *Table) Columns() []string
- func (m *Table) Current() bool
- func (m *Table) FieldNamesPositions() map[string]int
- func (m *Table) HasField(name string) bool
- func (m *Table) SetColumns(cols []string)
- func (m *Table) SetRefreshed()
- func (m *Table) Since(dur time.Duration) bool
- type TimeValue
- type Upsert
- type UrlValuesMsg
- type ValueContextWrapper
- type WhereFilter
Constants ¶
This section is empty.
Variables ¶
var ( // default schema Refresh Interval SchemaRefreshInterval = -time.Minute * 5 // Static list of common field names for describe header DescribeCols = []string{"Field", "Type", "Null", "Key", "Default", "Extra"} DescribeHeaders = NewDescribeHeaders() )
var ( // Some common errors ErrNotFound = fmt.Errorf("Not Found") )
Functions ¶
func NewNestedContextReader ¶
func NewNestedContextReader(readers []expr.ContextReader, ts time.Time) expr.ContextReader
NewNestedContextReader provides a context reader which is a composite of ordered child readers the first reader with a key will be used
func Register ¶
func Register(name string, source DataSource)
Register makes a datasource available by the provided name. If Register is called twice with the same name or if source is nil, it panics.
Types ¶
type Aggregations ¶
type Aggregations interface { DataSource Aggregate(expr.SqlStatement) error }
type ContextSimple ¶
func NewContextSimple ¶
func NewContextSimple() *ContextSimple
func NewContextSimpleData ¶
func NewContextSimpleData(data map[string]value.Value) *ContextSimple
func NewContextSimpleTs ¶
func (*ContextSimple) Body ¶
func (m *ContextSimple) Body() interface{}
func (*ContextSimple) Commit ¶
func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error
func (*ContextSimple) Id ¶
func (m *ContextSimple) Id() uint64
func (*ContextSimple) Put ¶
func (m *ContextSimple) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
func (*ContextSimple) Ts ¶
func (m *ContextSimple) Ts() time.Time
type ContextUrlValues ¶
func NewContextUrlValues ¶
func NewContextUrlValues(uv url.Values) *ContextUrlValues
func NewContextUrlValuesTs ¶
func NewContextUrlValuesTs(uv url.Values, ts time.Time) *ContextUrlValues
func (*ContextUrlValues) Body ¶
func (m *ContextUrlValues) Body() interface{}
func (*ContextUrlValues) Delete ¶
func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error
func (*ContextUrlValues) Id ¶
func (m *ContextUrlValues) Id() uint64
func (ContextUrlValues) Put ¶
func (m ContextUrlValues) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
func (*ContextUrlValues) String ¶
func (m *ContextUrlValues) String() string
func (ContextUrlValues) Ts ¶
func (m ContextUrlValues) Ts() time.Time
type ContextWriterEmpty ¶
type ContextWriterEmpty struct{}
func (*ContextWriterEmpty) Delete ¶
func (m *ContextWriterEmpty) Delete(delRow map[string]value.Value) error
func (*ContextWriterEmpty) Put ¶
func (m *ContextWriterEmpty) Put(col expr.SchemaInfo, rctx expr.ContextReader, v value.Value) error
type CsvDataSource ¶
type CsvDataSource struct {
// contains filtered or unexported fields
}
Csv DataStoure, implements qlbridge DataSource to scan through data
- very, very naive scanner, forward only single pass
- can open a file with .Open()
- if FROM name in sql is "stdin" or "stdio" will open from stdin
- assumes comma delimited
func NewCsvSource ¶
func NewCsvSource(table string, indexCol int, ior io.Reader, exit <-chan bool) (*CsvDataSource, error)
Csv reader assumes we are getting first row as headers
func (*CsvDataSource) Close ¶
func (m *CsvDataSource) Close() error
func (*CsvDataSource) Columns ¶
func (m *CsvDataSource) Columns() []string
func (*CsvDataSource) CreateIterator ¶
func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator
func (*CsvDataSource) Next ¶
func (m *CsvDataSource) Next() Message
func (*CsvDataSource) Open ¶
func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)
func (*CsvDataSource) Tables ¶
func (m *CsvDataSource) Tables() []string
type DataSource ¶
type DataSource interface { Tables() []string Open(connInfo string) (SourceConn, error) Close() error }
A datasource is most likely a database, file, api, in-mem data etc something that provides data rows. If the source is a sql database it can do its own planning/implementation.
However sources do not have to implement all features of a database scan/seek/sort/filter/group/aggregate, in which case we will use our own execution engine to "Polyfill" the features
Minimum Features:
- Scanning: iterate through messages/rows, use expr to evaluate this is the minium we need to implement sql select
- Schema Tables: at a minium tables available, the column level data can be introspected so is optional
Planning:
- ?? Accept() or VisitSelect() not yet implemented
Optional Select Features:
- Seek ie, key-value lookup, or indexed rows
- Projection ie, selecting specific fields
- Where filtering response
- GroupBy
- Aggregations ie, count(*), avg() etc
- Sort sort response, very important for fast joins
Non Select based Sql DML Operations:
- Deletion: (sql delete) Delete() DeleteExpression()
- Upsert Interface (sql Update, Upsert, Insert) Put() PutMulti()
DDL/Schema Operations
- schema discovery
- create
- index
type DataSourceFeatures ¶
type DataSourceFeatures struct { Features *Features DataSource }
func NewFeaturedSource ¶
func NewFeaturedSource(src DataSource) *DataSourceFeatures
type DataSources ¶
type DataSources struct {
// contains filtered or unexported fields
}
Our internal map of different types of datasources that are registered for our runtime system to use
func DataSourcesRegistry ¶
func DataSourcesRegistry() *DataSources
get registry of all datasource types
func (*DataSources) Get ¶
func (m *DataSources) Get(sourceType string) *DataSourceFeatures
func (*DataSources) String ¶
func (m *DataSources) String() string
type Features ¶
type Features struct { SourcePlanner bool Scanner bool Seeker bool WhereFilter bool GroupBy bool Sort bool Aggregations bool Projection bool SourceMutation bool Upsert bool PatchWhere bool Deletion bool }
We do type introspection in advance to speed up runtime feature detection for datasources
func NewFeatures ¶
func NewFeatures(src DataSource) *Features
type Field ¶
type Field struct { Name string Description string Data FieldData Length uint32 Type value.ValueType DefaultValueLength uint64 DefaultValue driver.Value Indexed bool }
Field Describes the column info, name, data type, defaults, index
func NewDescribeHeaders ¶
func NewDescribeHeaders() []*Field
type GroupBy ¶
type GroupBy interface { DataSource GroupBy(expr.SqlStatement) error }
type Iterator ¶
type Iterator interface {
Next() Message
}
simple iterator interface for paging through a datastore Messages/rows
- used for scanning
- for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows
type JsonHelperScannable ¶
type JsonHelperScannable u.JsonHelper
func (*JsonHelperScannable) MarshalJSON ¶
func (m *JsonHelperScannable) MarshalJSON() ([]byte, error)
func (*JsonHelperScannable) Scan ¶
func (m *JsonHelperScannable) Scan(src interface{}) error
func (*JsonHelperScannable) UnmarshalJSON ¶
func (m *JsonHelperScannable) UnmarshalJSON(data []byte) error
Unmarshall bytes into this typed struct
type JsonWrapper ¶
type JsonWrapper json.RawMessage
func (*JsonWrapper) MarshalJSON ¶
func (m *JsonWrapper) MarshalJSON() ([]byte, error)
func (*JsonWrapper) Scan ¶
func (m *JsonWrapper) Scan(src interface{}) error
func (*JsonWrapper) Unmarshal ¶
func (m *JsonWrapper) Unmarshal(v interface{}) error
func (*JsonWrapper) UnmarshalJSON ¶
func (m *JsonWrapper) UnmarshalJSON(data []byte) error
Unmarshall bytes into this typed struct
type Key ¶
Key interface is the Unique Key identifying a row
func KeyFromWhere ¶
Given a Where expression, lets try to create a key which
requires form `idenity = "value"`
type Message ¶
type Message interface { Id() uint64 Body() interface{} }
represents a message, the Id() method provides a consistent uint64 which can be used by consistent-hash algorithms for topologies that split messages up amongst multiple machines
Body() returns interface allowing this to be generic structure for routing ¶
see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid
type NestedContextReader ¶
type NestedContextReader struct {
// contains filtered or unexported fields
}
func (*NestedContextReader) Ts ¶
func (n *NestedContextReader) Ts() time.Time
type NodeConfig ¶
type NodeConfig struct { Name string `json:"name"` // Name of this Node optional Source string `json:"source"` // Name of source this node belongs to Address string `json:"address"` // host/ip Settings u.JsonHelper `json:"settings"` // Arbitrary settings }
Nodes are Servers
- this represents a single source type
- may have config info in Settings
- user = username
- password = password
- idleconns = # of idle connections
type PatchWhere ¶
type PatchWhere interface {
PatchWhere(ctx context.Context, where expr.Node, patch interface{}) (int64, error)
}
Patch Where, pass through where expression to underlying datasource
Used for update statements WHERE x = y
type Projection ¶
type Projection interface { // Describe the Columns etc Projection() (*expr.Projection, error) }
Some data sources that implement more features, can provide
their own projection.
type RuntimeSchema ¶
type RuntimeSchema struct { Sources *DataSources // All registered DataSources DisableRecover bool // If disableRecover=true, we will not capture/suppress panics // contains filtered or unexported fields }
The RuntimeSchema provides info on available datasources
given connection info, get datasource
func NewRuntimeSchema ¶
func NewRuntimeSchema() *RuntimeSchema
func (*RuntimeSchema) Conn ¶
func (m *RuntimeSchema) Conn(db string) SourceConn
Get connection for given Database
@db database name
func (*RuntimeSchema) DataSource ¶
func (m *RuntimeSchema) DataSource(connInfo string) DataSource
given connection info, get datasource
@connInfo = csv:///dev/stdin mockcsv
func (*RuntimeSchema) SetConnInfo ¶
func (m *RuntimeSchema) SetConnInfo(connInfo string)
Our RunTime configuration possibly only supports a single schema/connection info. for example, the sql/driver interface, so will be set here.
@connInfo = csv:///dev/stdin
type Scanner ¶
type Scanner interface { SchemaColumns SourceConn // create a new iterator for underlying datasource CreateIterator(filter expr.Node) Iterator MesgChan(filter expr.Node) <-chan Message }
A scanner, most basic of data sources, just iterate through
rows without any optimizations
type Schema ¶
type Schema struct { Name string `json:"name"` SourceSchemas map[string]*SourceSchema // map[source_name]:Source Schemas // contains filtered or unexported fields }
Schema is a "Virtual" Schema Database. Made up of
- Multiple DataSource(s) (each may be discrete source type)
- each datasource supplies tables to the virtual table pool
- each table from each source must be unique (or aliased)
func (*Schema) AddSourceSchema ¶
func (m *Schema) AddSourceSchema(ss *SourceSchema)
func (*Schema) AddTableName ¶
func (m *Schema) AddTableName(tableName string, ss *SourceSchema)
func (*Schema) RefreshSchema ¶
func (m *Schema) RefreshSchema()
type SchemaColumns ¶
type SchemaColumns interface {
Columns() []string
}
Interface for a data source exposing column positions for []driver.Value iteration
type SchemaConfig ¶
type SchemaConfig struct { Name string `json:"name"` // Virtual Schema Name, must be unique Sources []string `json:"sources"` // List of sources , the names of the "Db" in source NodeConfig []string `json:"-"` // List of backend Servers }
A Schema is a Virtual Schema, and may have multiple backend's
type SchemaProvider ¶
type SchemaProvider interface { DataSource Table(table string) (*Table, error) }
A backend data source provider that also provides schema
type Seeker ¶
type Seeker interface { DataSource // Just because we have Get, Multi-Get, doesn't mean we can seek all // expressions, find out with CanSeek for given expression CanSeek(*expr.SqlSelect) bool Get(key driver.Value) (Message, error) MultiGet(keys []driver.Value) ([]Message, error) }
Interface for Seeking row values instead of scanning (ie, Indexed)
type Sort ¶
type Sort interface { DataSource Sort(expr.SqlStatement) error }
type SourceConfig ¶
type SourceConfig struct { Name string `json:"name"` // Name SourceType string `json:"type"` // [mysql,elasticsearch,csv,etc] Name in DataSource Registry TablesToLoad []string `json:"tables_to_load"` // if non empty, only load these tables Nodes []*NodeConfig `json:"nodes"` // List of nodes Settings u.JsonHelper `json:"settings"` // Arbitrary settings specific to each source type }
Config for Source are storage/database/csvfiles
- this represents a single source type
- may have more than one node
- belongs to a Schema ( or schemas)
func NewSourceConfig ¶
func NewSourceConfig(name, sourceType string) *SourceConfig
func (*SourceConfig) Init ¶
func (m *SourceConfig) Init()
func (*SourceConfig) String ¶
func (m *SourceConfig) String() string
type SourceConn ¶
type SourceConn interface {
Close() error
}
DataSource Connection, only one guaranteed feature, although
should implement many more (scan, seek, etc)
func OpenConn ¶
func OpenConn(sourceName, sourceConfig string) (SourceConn, error)
Open a datasource
sourcename = "csv", "elasticsearch"
type SourceMutation ¶
type SourceMutation interface {
Create(tbl *Table, stmt expr.SqlStatement) (Mutator, error)
}
SourceMutation, is a statefull connetion similar to Open() connection for select
- accepts the tble used in this upsert/insert/update
type SourcePlanner ¶
type SourcePlanner interface { // Accept a sql statement, to plan the execution ideally, this would be done // by planner but, we need source specific planners, as each backend has different features Accept(expr.SubVisitor) (Scanner, error) }
Some sources can do their own planning for sub-select statements
type SourceSchema ¶
type SourceSchema struct { Name string // Source specific Schema name, generally underlying db name Conf *SourceConfig // source configuration Schema *Schema // Schema this is participating in Nodes []*NodeConfig // List of nodes config DSFeatures *DataSourceFeatures // The datasource Interface DS DataSource // This datasource Interface // contains filtered or unexported fields }
SourceSchema is a schema for a single DataSource (elasticsearch, mysql, filesystem, elasticsearch)
each DataSource would have multiple tables
func NewSourceSchema ¶
func NewSourceSchema(name, sourceType string) *SourceSchema
func (*SourceSchema) AddTable ¶
func (m *SourceSchema) AddTable(tbl *Table)
func (*SourceSchema) AddTableName ¶
func (m *SourceSchema) AddTableName(tableName string)
func (*SourceSchema) Tables ¶
func (m *SourceSchema) Tables() []string
type SourceSelectPlanner ¶
type SourceSelectPlanner interface { // Accept a sql statement, to plan the execution ideally, this would be done // by planner but, we need source specific planners, as each backend has different features //Accept(expr.Visitor) (Scanner, error) VisitSelect(stmt *expr.SqlSelect) (interface{}, error) }
Some sources can do their own planning
type SqlDriverMessage ¶
func (*SqlDriverMessage) Body ¶
func (m *SqlDriverMessage) Body() interface{}
func (*SqlDriverMessage) Id ¶
func (m *SqlDriverMessage) Id() uint64
type SqlDriverMessageMap ¶
type SqlDriverMessageMap struct { IdVal uint64 // id() // contains filtered or unexported fields }
func NewSqlDriverMessageMap ¶
func NewSqlDriverMessageMapEmpty ¶
func NewSqlDriverMessageMapEmpty() *SqlDriverMessageMap
func NewSqlDriverMessageMapVals ¶
func NewSqlDriverMessageMapVals(id uint64, row []driver.Value, cols []string) *SqlDriverMessageMap
func (*SqlDriverMessageMap) Body ¶
func (m *SqlDriverMessageMap) Body() interface{}
func (*SqlDriverMessageMap) Copy ¶
func (m *SqlDriverMessageMap) Copy() *SqlDriverMessageMap
func (*SqlDriverMessageMap) Id ¶
func (m *SqlDriverMessageMap) Id() uint64
func (*SqlDriverMessageMap) Key ¶
func (m *SqlDriverMessageMap) Key() string
func (*SqlDriverMessageMap) SetKey ¶
func (m *SqlDriverMessageMap) SetKey(key string)
func (*SqlDriverMessageMap) SetKeyHashed ¶
func (m *SqlDriverMessageMap) SetKeyHashed(key string)
func (*SqlDriverMessageMap) SetRow ¶
func (m *SqlDriverMessageMap) SetRow(row []driver.Value)
func (*SqlDriverMessageMap) Ts ¶
func (m *SqlDriverMessageMap) Ts() time.Time
func (*SqlDriverMessageMap) Values ¶
func (m *SqlDriverMessageMap) Values() []driver.Value
type StringArray ¶
type StringArray []string
func (*StringArray) MarshalJSON ¶
func (m *StringArray) MarshalJSON() ([]byte, error)
func (*StringArray) Scan ¶
func (m *StringArray) Scan(src interface{}) error
func (*StringArray) UnmarshalJSON ¶
func (m *StringArray) UnmarshalJSON(data []byte) error
type Table ¶
type Table struct { Name string // Name of table lowercased NameOriginal string // Name of table FieldPositions map[string]int // Maps name of column to ordinal position in array of []driver.Value's Fields []*Field // List of Fields, in order FieldMap map[string]*Field // List of Fields, in order DescribeValues [][]driver.Value // The Values that will be output for Describe Schema *Schema // The schema this is member of SourceSchema *SourceSchema // The source schema this is member of Charset uint16 // Character set, default = utf8 // contains filtered or unexported fields }
Table represents traditional definition of Database Table
It belongs to a Schema and can be used to create a Datasource used to read this table
func NewTable ¶
func NewTable(table string, s *SourceSchema) *Table
func (*Table) FieldNamesPositions ¶
List of Field Names and ordinal position in Column list
func (*Table) SetColumns ¶
type Upsert ¶
type Upsert interface { Put(ctx context.Context, key Key, value interface{}) (Key, error) PutMulti(ctx context.Context, keys []Key, src interface{}) ([]Key, error) }
Mutation interface for Put
- assumes datasource understands key(s?)
type UrlValuesMsg ¶
type UrlValuesMsg struct {
// contains filtered or unexported fields
}
func NewUrlValuesMsg ¶
func NewUrlValuesMsg(id uint64, body *ContextUrlValues) *UrlValuesMsg
func (*UrlValuesMsg) Body ¶
func (m *UrlValuesMsg) Body() interface{}
func (*UrlValuesMsg) Id ¶
func (m *UrlValuesMsg) Id() uint64
func (*UrlValuesMsg) String ¶
func (m *UrlValuesMsg) String() string
type ValueContextWrapper ¶
type ValueContextWrapper struct { *SqlDriverMessage // contains filtered or unexported fields }
func NewValueContextWrapper ¶
func NewValueContextWrapper(msg *SqlDriverMessage, cols map[string]*expr.Column) *ValueContextWrapper
func (*ValueContextWrapper) Ts ¶
func (m *ValueContextWrapper) Ts() time.Time
type WhereFilter ¶
type WhereFilter interface { DataSource Filter(expr.SqlStatement) error }