schema

package
v0.0.0-...-261b5b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2016 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

The core Relational Algrebra schema objects such as Table, Schema, DataSource, Fields, Headers, Index.

Package schema is a generated protocol buffer package.

It is generated from these files:
	schema.proto

It has these top-level messages:
	TablePartition
	Partition

Index

Constants

View Source
const (
	NoNulls    = false
	AllowNulls = true
)

Variables

View Source
var (
	// ErrNotFound is a common error for not found
	ErrNotFound = fmt.Errorf("Not Found")
	// ErrNotImplemented Common error for not implemented type errors
	ErrNotImplemented = fmt.Errorf("Not Implemented")
)
View Source
var (

	// default schema Refresh Interval
	SchemaRefreshInterval = -time.Minute * 5

	DescribeFullCols     = []string{"Field", "Type", "Collation", "Null", "Key", "Default", "Extra", "Privileges", "Comment"}
	DescribeFullColMap   = map[string]int{"Field": 0, "Type": 1, "Collation": 2, "Null": 3, "Key": 4, "Default": 5, "Extra": 6, "Privileges": 7, "Comment": 8}
	DescribeCols         = []string{"Field", "Type", "Null", "Key", "Default", "Extra"}
	DescribeColMap       = map[string]int{"Field": 0, "Type": 1, "Null": 2, "Key": 3, "Default": 4, "Extra": 5}
	ShowTableColumns     = []string{"Table", "Table_Type"}
	ShowVariablesColumns = []string{"Variable_name", "Value"}
	ShowDatabasesColumns = []string{"Database"}
	//columnColumns       = []string{"Field", "Type", "Null", "Key", "Default", "Extra"}
	ShowTableColumnMap = map[string]int{"Table": 0}
	//columnsColumnMap = map[string]int{"Field": 0, "Type": 1, "Null": 2, "Key": 3, "Default": 4, "Extra": 5}
	ShowIndexCols       = []string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Index_comment"}
	DescribeFullHeaders = NewDescribeFullHeaders()
	DescribeHeaders     = NewDescribeHeaders()
)
View Source
var (
	ErrInvalidLengthSchema = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowSchema   = fmt.Errorf("proto: integer overflow")
)

Functions

This section is empty.

Types

type ConfigNode

type ConfigNode struct {
	Name     string       `json:"name"`     // Name of this Node optional
	Source   string       `json:"source"`   // Name of source this node belongs to
	Address  string       `json:"address"`  // host/ip
	Settings u.JsonHelper `json:"settings"` // Arbitrary settings
}

Nodes are Servers/Services, ie a running instance of said Source

  • each must represent a single source type
  • may have arbitrary config info in Settings such as
  • user = username
  • password = password
  • # connections

type ConfigSchema

type ConfigSchema struct {
	Name       string   `json:"name"`    // Virtual Schema Name, must be unique
	Sources    []string `json:"sources"` // List of sources , the names of the "Db" in source
	ConfigNode []string `json:"-"`       // List of backend Servers
}

A SchemaConfig is the json/config block for Schema, the data-sources that make up this Virtual Schema

  • config to map name to multiple sources
  • connection info

type ConfigSource

type ConfigSource struct {
	Name         string            `json:"name"`            // Name
	SourceType   string            `json:"type"`            // [mysql,elasticsearch,csv,etc] Name in DataSource Registry
	TablesToLoad []string          `json:"tables_to_load"`  // if non empty, only load these tables
	Nodes        []*ConfigNode     `json:"nodes"`           // List of nodes
	Settings     u.JsonHelper      `json:"settings"`        // Arbitrary settings specific to each source type
	Partitions   []*TablePartition `json:"partitions"`      // List of partitions per table (optional)
	PartitionCt  int               `json:"partition_count"` // Instead of array of per table partitions, raw partition count
}

Config for Source are storage/database/csvfiles

  • this represents a single source type
  • may have more than one node
  • belongs to one or more virtual schemas

func NewSourceConfig

func NewSourceConfig(name, sourceType string) *ConfigSource

func (*ConfigSource) String

func (m *ConfigSource) String() string

type Conn

type Conn interface {
	Close() error
}

Conn A Connection/Session to a file, api, backend database. Provides DML operations.

Minimum Read Features to provide Sql Select:

  • Scanning: iterate through messages/rows
  • Schema Tables: at a minium list of tables available, the column level data can be introspected so is optional

Planning:

  • CreateMutator(ctx *plan.Context) : execute a mutation task insert, delete, update

Non Select based Sql DML Operations for Mutator:

  • Deletion: (sql delete) Delete() DeleteExpression()
  • Upsert Interface (sql Update, Upsert, Insert) Put() PutMulti()

DataSource Connection Session that is Stateful. this is really a generic interface, will actually implement features below: SchemaColumns, Scanner, Seeker, Mutator

type ConnAll

type ConnAll interface {
	Close() error
	ConnColumns
	Iterator
	ConnSeeker
	ConnUpsert
	ConnDeletion
}

ConnAll interface

type ConnColumns

type ConnColumns interface {
	//Conn
	Columns() []string
}

ConnColumns Interface for a data source connection exposing column positions for []driver.Value iteration

type ConnDeletion

type ConnDeletion interface {
	// Delete using this key
	Delete(driver.Value) (int, error)
	// Delete with given expression
	DeleteExpression(expr.Node) (int, error)
}

ConnDeletion deletion interface for data sources

type ConnMutation

type ConnMutation interface {
	CreateMutator(pc interface{}) (ConnMutator, error)
}

ConnMutation creates a Mutator connection similar to Open() connection for select

  • accepts the plan context used in this upsert/insert/update
  • returns a connection which must be closed

type ConnMutator

type ConnMutator interface {
	ConnUpsert
	ConnDeletion
}

ConnMutator Mutator Connection

type ConnPatchWhere

type ConnPatchWhere interface {
	PatchWhere(ctx context.Context, where expr.Node, patch interface{}) (int64, error)
}

ConnPatchWhere pass through where expression to underlying datasource

Used for update statements WHERE x = y

type ConnScanner

type ConnScanner interface {
	Conn
	Iterator
}

ConnScanner is the most basic of data sources, just iterate through

rows without any optimizations.  Key-Value store like csv, redis, cassandra.

type ConnScannerIterator

type ConnScannerIterator interface {
	//Conn
	// create a new iterator to scan through row by row
	CreateIterator() Iterator
	MesgChan() <-chan Message
}

ConnScannerIterator Another advanced iterator, probably deprecate?

type ConnSeeker

type ConnSeeker interface {
	//Conn
	// Just because we have Get, Multi-Get, doesn't mean we can seek all
	// expressions, find out with CanSeek for given expression
	CanSeek(*rel.SqlSelect) bool
	Get(key driver.Value) (Message, error)
	MultiGet(keys []driver.Value) ([]Message, error)
}

ConnSeeker is a datsource that is Key-Value store, allows relational

implementation to be faster for Seeking row values instead of scanning

type ConnUpsert

type ConnUpsert interface {
	Put(ctx context.Context, key Key, value interface{}) (Key, error)
	PutMulti(ctx context.Context, keys []Key, src interface{}) ([]Key, error)
}

ConnUpsert Mutation interface for Put

  • assumes datasource understands key(s?)

type DialectWriter

type DialectWriter interface {
	Dialect() string
	Table(tbl *Table) string
	FieldType(t value.ValueType) string
}

DialectWriter knows how to format the schema output

specific to a dialect like mysql

type Field

type Field struct {
	Name               string          // Column Name
	Description        string          // Comment/Description
	Key                string          // Key info (primary, etc) should be stored in indexes
	Extra              string          // no idea difference with Description
	Data               FieldData       // Pre-generated dialect specific data???
	Length             uint32          // field-size, ie varchar(20)
	Type               value.ValueType // Value type, there needs to be dialect specific converters
	DefaultValueLength uint64          // Default
	DefaultValue       driver.Value    // Default value
	Indexed            bool            // Is this indexed, if so we will have a list of indexes
	NoNulls            bool            // Do we allow nulls?  default = false = yes allow nulls
	Collation          string          // ie, utf8, none
	Roles              []string        // ie, {select,insert,update,delete}
	Indexes            []*Index        // Indexes this participates in
	// contains filtered or unexported fields
}

Field Describes the column info, name, data type, defaults, index, null

  • dialects (mysql, mongo, cassandra) have their own descriptors for these, so this is generic meant to be converted to Frontend at runtime

func NewDescribeFullHeaders

func NewDescribeFullHeaders() []*Field

func NewDescribeHeaders

func NewDescribeHeaders() []*Field

func NewField

func NewField(name string, valType value.ValueType, size int, allowNulls bool, defaultVal driver.Value, key, collation, description string) *Field

func NewFieldBase

func NewFieldBase(name string, valType value.ValueType, size int, extra string) *Field

func (*Field) AsRow

func (m *Field) AsRow() []driver.Value

func (*Field) Body

func (m *Field) Body() interface{}

func (*Field) Id

func (m *Field) Id() uint64

type FieldData

type FieldData []byte

type Index

type Index struct {
	Name          string
	Fields        []string
	PrimaryKey    bool
	HashPartition []string
	PartitionSize int
}

Index a description of how data is/should be indexed

type Iterator

type Iterator interface {
	Next() Message
}

Iterator is simple iterator for paging through a datastore Messages/rows

  • used for scanning
  • for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows

type Key

type Key interface {
	Key() driver.Value
}

Key interface is the Unique Key identifying a row

type KeyUint

type KeyUint struct {
	ID uint64
}

KeyUint implements Key interface and is simple uint64 key

func NewKeyUint

func NewKeyUint(key uint64) *KeyUint

NewKeyUint simple new uint64 key

func (*KeyUint) Key

func (m *KeyUint) Key() driver.Value

Key is key interface

type Message

type Message interface {
	Id() uint64
	Body() interface{}
}

Message is a row/event, the Id() method provides a consistent uint64 which can be used by consistent-hash algorithms for topologies that split messages up amongst multiple machines

Body() returns interface allowing this to be generic structure for routing

see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid

type Partition

type Partition struct {
	Id               string `protobuf:"bytes,1,req,name=id" json:"id"`
	Left             string `protobuf:"bytes,2,req,name=left" json:"left"`
	Right            string `protobuf:"bytes,3,req,name=right" json:"right"`
	XXX_unrecognized []byte `json:"-"`
}

Partition describes a range of data

the left-key is contained in this partition
the right key is not contained in this partition, in the next one

func (*Partition) Marshal

func (m *Partition) Marshal() (data []byte, err error)

func (*Partition) MarshalTo

func (m *Partition) MarshalTo(data []byte) (int, error)

func (*Partition) ProtoMessage

func (*Partition) ProtoMessage()

func (*Partition) Reset

func (m *Partition) Reset()

func (*Partition) Size

func (m *Partition) Size() (n int)

func (*Partition) String

func (m *Partition) String() string

func (*Partition) Unmarshal

func (m *Partition) Unmarshal(data []byte) error

type Schema

type Schema struct {
	Name          string                   // Name of schema
	InfoSchema    *Schema                  // represent this Schema as sql schema like "information_schema"
	SchemaSources map[string]*SchemaSource // map[source_name]:Source Schemas
	// contains filtered or unexported fields
}

Schema is a "Virtual" Schema Database.

  • Multiple DataSource(s) (each may be discrete source type such as mysql, elasticsearch, etc)
  • each datasource supplies tables to the virtual table pool
  • each table name across source's for single schema must be unique (or aliased)

func NewSchema

func NewSchema(schemaName string) *Schema

func (*Schema) AddSourceSchema

func (m *Schema) AddSourceSchema(ss *SchemaSource)

func (*Schema) AddTableName

func (m *Schema) AddTableName(tableName string, ss *SchemaSource)

func (*Schema) Current

func (m *Schema) Current() bool

Is this schema uptodate?

func (*Schema) Open

func (m *Schema) Open(tableName string) (Conn, error)

Get a connection from this source via table name

func (*Schema) RefreshSchema

func (m *Schema) RefreshSchema()

func (*Schema) Since

func (m *Schema) Since(dur time.Duration) bool

Is this schema object within time window described by @dur time ago ?

func (*Schema) Source

func (m *Schema) Source(tableName string) (*SchemaSource, error)

Find a SchemaSource for this Table

func (*Schema) Table

func (m *Schema) Table(tableName string) (*Table, error)

func (*Schema) Tables

func (m *Schema) Tables() []string

type SchemaSource

type SchemaSource struct {
	Name       string            // Source specific Schema name, generally underlying db name
	Conf       *ConfigSource     // source configuration
	Schema     *Schema           // Schema this is participating in
	Nodes      []*ConfigNode     // List of nodes config
	Partitions []*TablePartition // List of partitions per table (optional)
	DS         Source            // This datasource Interface
	// contains filtered or unexported fields
}

SchemaSource is a schema for a single DataSource (elasticsearch, mysql, filesystem, elasticsearch)

each DataSource would have multiple tables

func NewSchemaSource

func NewSchemaSource(name, sourceType string) *SchemaSource

func (*SchemaSource) AddTable

func (m *SchemaSource) AddTable(tbl *Table)

func (*SchemaSource) AddTableName

func (m *SchemaSource) AddTableName(tableName string)

func (*SchemaSource) HasTable

func (m *SchemaSource) HasTable(table string) bool

func (*SchemaSource) Table

func (m *SchemaSource) Table(tableName string) (*Table, error)

func (*SchemaSource) Tables

func (m *SchemaSource) Tables() []string

type Source

type Source interface {
	Tables() []string
	Open(source string) (Conn, error)
	Close() error
}

Source A datasource is factory registered to create connections to a custom dastasource. (most likely a database, file, api, in-mem data etc) It is thread-safe, singleton, responsible for creating connections and exposing schema and ddl operations.

It also exposes partition information optionally.

DDL/Schema Operations

  • schema discovery, tables, columns etc
  • create
  • index

type SourceAll

type SourceAll interface {
	Source
	SourceTableSchema
}

SourceAll combo interface

type SourcePartitionable

type SourcePartitionable interface {
	// Many databases's already have internal Partitions, allow those to
	// be exposed for use in our partitioning
	Partitions() []*Partition
	PartitionSource(p *Partition) (Conn, error)
}

SourcePartitionable DataSource that is partitionable into ranges for splitting

reads, writes onto different nodes.

type SourceSetup

type SourceSetup interface {
	Setup(*SchemaSource) error
}

SourceSetup A Datasource optional interface for getting the SourceSchema injected

during creation.

type SourceTableSchema

type SourceTableSchema interface {
	Table(table string) (*Table, error)
}

SourceTableSchema A data source provider that also provides table schema info

type Table

type Table struct {
	Name           string            // Name of table lowercased
	NameOriginal   string            // Name of table
	FieldPositions map[string]int    // Maps name of column to ordinal position in array of []driver.Value's
	Fields         []*Field          // List of Fields, in order
	FieldMap       map[string]*Field // Map of Field-name -> Field
	Schema         *Schema           // The schema this is member of
	SchemaSource   *SchemaSource     // The source schema this is member of
	Charset        uint16            // Character set, default = utf8
	Partition      *TablePartition   // Partitions in this table, optional may be empty
	PartitionCt    int               // Partition Count
	Indexes        []*Index          // List of indexes for this table
	// contains filtered or unexported fields
}

Table represents traditional definition of Database Table. It belongs to a Schema and can be used to create a Datasource used to read this table.

func NewTable

func NewTable(table string, s *SchemaSource) *Table

func (*Table) AddField

func (m *Table) AddField(fld *Field)

func (*Table) AddFieldType

func (m *Table) AddFieldType(name string, valType value.ValueType)

func (*Table) AsRows

func (m *Table) AsRows() [][]driver.Value

func (*Table) Body

func (m *Table) Body() interface{}

func (*Table) Columns

func (m *Table) Columns() []string

func (*Table) Current

func (m *Table) Current() bool

Is this schema object current? ie, have we refreshed it from

source since refresh interval

func (*Table) FieldNamesPositions

func (m *Table) FieldNamesPositions() map[string]int

List of Field Names and ordinal position in Column list

func (*Table) FieldsAsMessages

func (m *Table) FieldsAsMessages() []Message

func (*Table) HasField

func (m *Table) HasField(name string) bool

func (*Table) Id

func (m *Table) Id() uint64

func (*Table) SetColumns

func (m *Table) SetColumns(cols []string)

Explicityly set column names

func (*Table) SetRefreshed

func (m *Table) SetRefreshed()

update the refreshed date to now

func (*Table) SetRows

func (m *Table) SetRows(rows [][]driver.Value)

func (*Table) Since

func (m *Table) Since(dur time.Duration) bool

Is this schema object within time window described by @dur time ago ?

type TablePartition

type TablePartition struct {
	Table            string       `protobuf:"bytes,1,req,name=table" json:"table"`
	Keys             []string     `protobuf:"bytes,2,rep,name=keys" json:"keys"`
	Partitions       []*Partition `protobuf:"bytes,3,rep,name=partitions" json:"partitions"`
	XXX_unrecognized []byte       `json:"-"`
}

Partition describes a range of data

the left-key is contained in this partition
the right key is not contained in this partition, in the next one

func (*TablePartition) Marshal

func (m *TablePartition) Marshal() (data []byte, err error)

func (*TablePartition) MarshalTo

func (m *TablePartition) MarshalTo(data []byte) (int, error)

func (*TablePartition) ProtoMessage

func (*TablePartition) ProtoMessage()

func (*TablePartition) Reset

func (m *TablePartition) Reset()

func (*TablePartition) Size

func (m *TablePartition) Size() (n int)

func (*TablePartition) String

func (m *TablePartition) String() string

func (*TablePartition) Unmarshal

func (m *TablePartition) Unmarshal(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL