Documentation ¶
Overview ¶
The core Relational Algrebra schema objects such as Table, Schema, DataSource, Fields, Headers, Index.
Package schema is a generated protocol buffer package. It is generated from these files: schema.proto It has these top-level messages: TablePartition Partition
Index ¶
- Constants
- Variables
- type ConfigNode
- type ConfigSchema
- type ConfigSource
- type Conn
- type ConnAll
- type ConnColumns
- type ConnDeletion
- type ConnMutation
- type ConnMutator
- type ConnPatchWhere
- type ConnScanner
- type ConnScannerIterator
- type ConnSeeker
- type ConnUpsert
- type DialectWriter
- type Field
- type FieldData
- type Index
- type Iterator
- type Key
- type KeyUint
- type Message
- type Partition
- type Schema
- func (m *Schema) AddSourceSchema(ss *SchemaSource)
- func (m *Schema) AddTableName(tableName string, ss *SchemaSource)
- func (m *Schema) Current() bool
- func (m *Schema) Open(tableName string) (Conn, error)
- func (m *Schema) RefreshSchema()
- func (m *Schema) Since(dur time.Duration) bool
- func (m *Schema) Source(tableName string) (*SchemaSource, error)
- func (m *Schema) Table(tableName string) (*Table, error)
- func (m *Schema) Tables() []string
- type SchemaSource
- type Source
- type SourceAll
- type SourcePartitionable
- type SourceSetup
- type SourceTableSchema
- type Table
- func (m *Table) AddField(fld *Field)
- func (m *Table) AddFieldType(name string, valType value.ValueType)
- func (m *Table) AsRows() [][]driver.Value
- func (m *Table) Body() interface{}
- func (m *Table) Columns() []string
- func (m *Table) Current() bool
- func (m *Table) FieldNamesPositions() map[string]int
- func (m *Table) FieldsAsMessages() []Message
- func (m *Table) HasField(name string) bool
- func (m *Table) Id() uint64
- func (m *Table) SetColumns(cols []string)
- func (m *Table) SetRefreshed()
- func (m *Table) SetRows(rows [][]driver.Value)
- func (m *Table) Since(dur time.Duration) bool
- type TablePartition
- func (m *TablePartition) Marshal() (data []byte, err error)
- func (m *TablePartition) MarshalTo(data []byte) (int, error)
- func (*TablePartition) ProtoMessage()
- func (m *TablePartition) Reset()
- func (m *TablePartition) Size() (n int)
- func (m *TablePartition) String() string
- func (m *TablePartition) Unmarshal(data []byte) error
Constants ¶
const ( NoNulls = false AllowNulls = true )
Variables ¶
var ( // ErrNotFound is a common error for not found ErrNotFound = fmt.Errorf("Not Found") // ErrNotImplemented Common error for not implemented type errors ErrNotImplemented = fmt.Errorf("Not Implemented") )
var ( // default schema Refresh Interval SchemaRefreshInterval = -time.Minute * 5 DescribeFullCols = []string{"Field", "Type", "Collation", "Null", "Key", "Default", "Extra", "Privileges", "Comment"} DescribeFullColMap = map[string]int{"Field": 0, "Type": 1, "Collation": 2, "Null": 3, "Key": 4, "Default": 5, "Extra": 6, "Privileges": 7, "Comment": 8} DescribeCols = []string{"Field", "Type", "Null", "Key", "Default", "Extra"} DescribeColMap = map[string]int{"Field": 0, "Type": 1, "Null": 2, "Key": 3, "Default": 4, "Extra": 5} ShowTableColumns = []string{"Table", "Table_Type"} ShowVariablesColumns = []string{"Variable_name", "Value"} ShowDatabasesColumns = []string{"Database"} //columnColumns = []string{"Field", "Type", "Null", "Key", "Default", "Extra"} ShowTableColumnMap = map[string]int{"Table": 0} //columnsColumnMap = map[string]int{"Field": 0, "Type": 1, "Null": 2, "Key": 3, "Default": 4, "Extra": 5} ShowIndexCols = []string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Index_comment"} DescribeFullHeaders = NewDescribeFullHeaders() DescribeHeaders = NewDescribeHeaders() )
var ( ErrInvalidLengthSchema = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowSchema = fmt.Errorf("proto: integer overflow") )
Functions ¶
This section is empty.
Types ¶
type ConfigNode ¶
type ConfigNode struct { Name string `json:"name"` // Name of this Node optional Source string `json:"source"` // Name of source this node belongs to Address string `json:"address"` // host/ip Settings u.JsonHelper `json:"settings"` // Arbitrary settings }
Nodes are Servers/Services, ie a running instance of said Source
- each must represent a single source type
- may have arbitrary config info in Settings such as
- user = username
- password = password
- # connections
type ConfigSchema ¶
type ConfigSchema struct { Name string `json:"name"` // Virtual Schema Name, must be unique Sources []string `json:"sources"` // List of sources , the names of the "Db" in source ConfigNode []string `json:"-"` // List of backend Servers }
A SchemaConfig is the json/config block for Schema, the data-sources that make up this Virtual Schema
- config to map name to multiple sources
- connection info
type ConfigSource ¶
type ConfigSource struct { Name string `json:"name"` // Name SourceType string `json:"type"` // [mysql,elasticsearch,csv,etc] Name in DataSource Registry TablesToLoad []string `json:"tables_to_load"` // if non empty, only load these tables Nodes []*ConfigNode `json:"nodes"` // List of nodes Settings u.JsonHelper `json:"settings"` // Arbitrary settings specific to each source type Partitions []*TablePartition `json:"partitions"` // List of partitions per table (optional) PartitionCt int `json:"partition_count"` // Instead of array of per table partitions, raw partition count }
Config for Source are storage/database/csvfiles
- this represents a single source type
- may have more than one node
- belongs to one or more virtual schemas
func NewSourceConfig ¶
func NewSourceConfig(name, sourceType string) *ConfigSource
func (*ConfigSource) String ¶
func (m *ConfigSource) String() string
type Conn ¶
type Conn interface {
Close() error
}
Conn A Connection/Session to a file, api, backend database. Provides DML operations.
Minimum Read Features to provide Sql Select:
- Scanning: iterate through messages/rows
- Schema Tables: at a minium list of tables available, the column level data can be introspected so is optional
Planning:
- CreateMutator(ctx *plan.Context) : execute a mutation task insert, delete, update
Non Select based Sql DML Operations for Mutator:
- Deletion: (sql delete) Delete() DeleteExpression()
- Upsert Interface (sql Update, Upsert, Insert) Put() PutMulti()
DataSource Connection Session that is Stateful. this is really a generic interface, will actually implement features below: SchemaColumns, Scanner, Seeker, Mutator
type ConnAll ¶
type ConnAll interface { Close() error ConnColumns Iterator ConnSeeker ConnUpsert ConnDeletion }
ConnAll interface
type ConnColumns ¶
type ConnColumns interface { //Conn Columns() []string }
ConnColumns Interface for a data source connection exposing column positions for []driver.Value iteration
type ConnDeletion ¶
type ConnDeletion interface { // Delete using this key Delete(driver.Value) (int, error) // Delete with given expression DeleteExpression(expr.Node) (int, error) }
ConnDeletion deletion interface for data sources
type ConnMutation ¶
type ConnMutation interface {
CreateMutator(pc interface{}) (ConnMutator, error)
}
ConnMutation creates a Mutator connection similar to Open() connection for select
- accepts the plan context used in this upsert/insert/update
- returns a connection which must be closed
type ConnMutator ¶
type ConnMutator interface { ConnUpsert ConnDeletion }
ConnMutator Mutator Connection
type ConnPatchWhere ¶
type ConnPatchWhere interface {
PatchWhere(ctx context.Context, where expr.Node, patch interface{}) (int64, error)
}
ConnPatchWhere pass through where expression to underlying datasource
Used for update statements WHERE x = y
type ConnScanner ¶
ConnScanner is the most basic of data sources, just iterate through
rows without any optimizations. Key-Value store like csv, redis, cassandra.
type ConnScannerIterator ¶
type ConnScannerIterator interface { //Conn // create a new iterator to scan through row by row CreateIterator() Iterator MesgChan() <-chan Message }
ConnScannerIterator Another advanced iterator, probably deprecate?
type ConnSeeker ¶
type ConnSeeker interface { //Conn // Just because we have Get, Multi-Get, doesn't mean we can seek all // expressions, find out with CanSeek for given expression CanSeek(*rel.SqlSelect) bool Get(key driver.Value) (Message, error) MultiGet(keys []driver.Value) ([]Message, error) }
ConnSeeker is a datsource that is Key-Value store, allows relational
implementation to be faster for Seeking row values instead of scanning
type ConnUpsert ¶
type ConnUpsert interface { Put(ctx context.Context, key Key, value interface{}) (Key, error) PutMulti(ctx context.Context, keys []Key, src interface{}) ([]Key, error) }
ConnUpsert Mutation interface for Put
- assumes datasource understands key(s?)
type DialectWriter ¶
type DialectWriter interface { Dialect() string Table(tbl *Table) string FieldType(t value.ValueType) string }
DialectWriter knows how to format the schema output
specific to a dialect like mysql
type Field ¶
type Field struct { Name string // Column Name Description string // Comment/Description Key string // Key info (primary, etc) should be stored in indexes Extra string // no idea difference with Description Data FieldData // Pre-generated dialect specific data??? Length uint32 // field-size, ie varchar(20) Type value.ValueType // Value type, there needs to be dialect specific converters DefaultValueLength uint64 // Default DefaultValue driver.Value // Default value Indexed bool // Is this indexed, if so we will have a list of indexes NoNulls bool // Do we allow nulls? default = false = yes allow nulls Collation string // ie, utf8, none Roles []string // ie, {select,insert,update,delete} Indexes []*Index // Indexes this participates in // contains filtered or unexported fields }
Field Describes the column info, name, data type, defaults, index, null
- dialects (mysql, mongo, cassandra) have their own descriptors for these, so this is generic meant to be converted to Frontend at runtime
func NewDescribeFullHeaders ¶
func NewDescribeFullHeaders() []*Field
func NewDescribeHeaders ¶
func NewDescribeHeaders() []*Field
func NewFieldBase ¶
type Index ¶
type Index struct { Name string Fields []string PrimaryKey bool HashPartition []string PartitionSize int }
Index a description of how data is/should be indexed
type Iterator ¶
type Iterator interface {
Next() Message
}
Iterator is simple iterator for paging through a datastore Messages/rows
- used for scanning
- for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows
type KeyUint ¶
type KeyUint struct {
ID uint64
}
KeyUint implements Key interface and is simple uint64 key
type Message ¶
type Message interface { Id() uint64 Body() interface{} }
Message is a row/event, the Id() method provides a consistent uint64 which can be used by consistent-hash algorithms for topologies that split messages up amongst multiple machines
Body() returns interface allowing this to be generic structure for routing ¶
see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid
type Partition ¶
type Partition struct { Id string `protobuf:"bytes,1,req,name=id" json:"id"` Left string `protobuf:"bytes,2,req,name=left" json:"left"` Right string `protobuf:"bytes,3,req,name=right" json:"right"` XXX_unrecognized []byte `json:"-"` }
Partition describes a range of data
the left-key is contained in this partition the right key is not contained in this partition, in the next one
func (*Partition) ProtoMessage ¶
func (*Partition) ProtoMessage()
type Schema ¶
type Schema struct { Name string // Name of schema InfoSchema *Schema // represent this Schema as sql schema like "information_schema" SchemaSources map[string]*SchemaSource // map[source_name]:Source Schemas // contains filtered or unexported fields }
Schema is a "Virtual" Schema Database.
- Multiple DataSource(s) (each may be discrete source type such as mysql, elasticsearch, etc)
- each datasource supplies tables to the virtual table pool
- each table name across source's for single schema must be unique (or aliased)
func (*Schema) AddSourceSchema ¶
func (m *Schema) AddSourceSchema(ss *SchemaSource)
func (*Schema) AddTableName ¶
func (m *Schema) AddTableName(tableName string, ss *SchemaSource)
func (*Schema) RefreshSchema ¶
func (m *Schema) RefreshSchema()
type SchemaSource ¶
type SchemaSource struct { Name string // Source specific Schema name, generally underlying db name Conf *ConfigSource // source configuration Schema *Schema // Schema this is participating in Nodes []*ConfigNode // List of nodes config Partitions []*TablePartition // List of partitions per table (optional) DS Source // This datasource Interface // contains filtered or unexported fields }
SchemaSource is a schema for a single DataSource (elasticsearch, mysql, filesystem, elasticsearch)
each DataSource would have multiple tables
func NewSchemaSource ¶
func NewSchemaSource(name, sourceType string) *SchemaSource
func (*SchemaSource) AddTable ¶
func (m *SchemaSource) AddTable(tbl *Table)
func (*SchemaSource) AddTableName ¶
func (m *SchemaSource) AddTableName(tableName string)
func (*SchemaSource) HasTable ¶
func (m *SchemaSource) HasTable(table string) bool
func (*SchemaSource) Tables ¶
func (m *SchemaSource) Tables() []string
type Source ¶
Source A datasource is factory registered to create connections to a custom dastasource. (most likely a database, file, api, in-mem data etc) It is thread-safe, singleton, responsible for creating connections and exposing schema and ddl operations.
It also exposes partition information optionally.
DDL/Schema Operations
- schema discovery, tables, columns etc
- create
- index
type SourcePartitionable ¶
type SourcePartitionable interface { // Many databases's already have internal Partitions, allow those to // be exposed for use in our partitioning Partitions() []*Partition PartitionSource(p *Partition) (Conn, error) }
SourcePartitionable DataSource that is partitionable into ranges for splitting
reads, writes onto different nodes.
type SourceSetup ¶
type SourceSetup interface {
Setup(*SchemaSource) error
}
SourceSetup A Datasource optional interface for getting the SourceSchema injected
during creation.
type SourceTableSchema ¶
SourceTableSchema A data source provider that also provides table schema info
type Table ¶
type Table struct { Name string // Name of table lowercased NameOriginal string // Name of table FieldPositions map[string]int // Maps name of column to ordinal position in array of []driver.Value's Fields []*Field // List of Fields, in order FieldMap map[string]*Field // Map of Field-name -> Field Schema *Schema // The schema this is member of SchemaSource *SchemaSource // The source schema this is member of Charset uint16 // Character set, default = utf8 Partition *TablePartition // Partitions in this table, optional may be empty PartitionCt int // Partition Count Indexes []*Index // List of indexes for this table // contains filtered or unexported fields }
Table represents traditional definition of Database Table. It belongs to a Schema and can be used to create a Datasource used to read this table.
func NewTable ¶
func NewTable(table string, s *SchemaSource) *Table
func (*Table) Current ¶
Is this schema object current? ie, have we refreshed it from
source since refresh interval
func (*Table) FieldNamesPositions ¶
List of Field Names and ordinal position in Column list
func (*Table) FieldsAsMessages ¶
type TablePartition ¶
type TablePartition struct { Table string `protobuf:"bytes,1,req,name=table" json:"table"` Keys []string `protobuf:"bytes,2,rep,name=keys" json:"keys"` Partitions []*Partition `protobuf:"bytes,3,rep,name=partitions" json:"partitions"` XXX_unrecognized []byte `json:"-"` }
Partition describes a range of data
the left-key is contained in this partition the right key is not contained in this partition, in the next one
func (*TablePartition) Marshal ¶
func (m *TablePartition) Marshal() (data []byte, err error)
func (*TablePartition) ProtoMessage ¶
func (*TablePartition) ProtoMessage()
func (*TablePartition) Reset ¶
func (m *TablePartition) Reset()
func (*TablePartition) Size ¶
func (m *TablePartition) Size() (n int)
func (*TablePartition) String ¶
func (m *TablePartition) String() string
func (*TablePartition) Unmarshal ¶
func (m *TablePartition) Unmarshal(data []byte) error