Documentation ¶
Index ¶
- Constants
- Variables
- func GetFetchTableAndViewsQuery(tableNames []string) (string, error)
- func GetFetchTableQuery(tableNames []string) (string, error)
- func GetFetchViewQuery(viewNames []string) (string, error)
- func MustReloadSchemaOnDDL(sql string, dbname string) bool
- type Engine
- func (se *Engine) Close()
- func (se *Engine) EnableHistorian(enabled bool) error
- func (se *Engine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error
- func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error)
- func (se *Engine) GetDBConnector() dbconfigs.Connector
- func (se *Engine) GetSchema() map[string]*Table
- func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table
- func (se *Engine) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error)
- func (se *Engine) InitDBConfig(cp dbconfigs.Connector)
- func (se *Engine) IsOpen() bool
- func (se *Engine) MakeNonPrimary()
- func (se *Engine) MakePrimary(serving bool)
- func (se *Engine) Open() error
- func (se *Engine) RegisterNotifier(name string, f notifier, runNotifier bool)
- func (se *Engine) RegisterVersionEvent() error
- func (se *Engine) Reload(ctx context.Context) error
- func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error
- func (se *Engine) ReloadAtEx(ctx context.Context, pos mysql.Position, includeStats bool) error
- func (se *Engine) SetTableForTests(table *Table)
- func (se *Engine) UnregisterNotifier(name string)
- type MessageInfo
- type SequenceInfo
- type Table
- type Tracker
- type VStreamer
Constants ¶
const ( NoType = iota Sequence Message View )
Table types
Variables ¶
var TypeNames = []string{
"none",
"sequence",
"message",
"view",
}
TypeNames allows to fetch a the type name for a table. Count must match the number of table types.
Functions ¶
func GetFetchTableAndViewsQuery ¶ added in v0.17.0
GetFetchTableAndViewsQuery gets the fetch query to run for getting the listed tables and views. If no table names are provided, then all the tables and views are fetched.
func GetFetchTableQuery ¶ added in v0.17.0
GetFetchTableQuery gets the fetch query to run for getting the listed tables. If no tables are provided, then all the tables are fetched.
func GetFetchViewQuery ¶ added in v0.17.0
GetFetchViewQuery gets the fetch query to run for getting the listed views. If no views are provided, then all the views are fetched.
func MustReloadSchemaOnDDL ¶ added in v0.11.0
MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact
Types ¶
type Engine ¶
type Engine struct { // SkipMetaCheck skips the metadata about the database and table information SkipMetaCheck bool SchemaReloadTimings *servenv.TimingsWrapper // contains filtered or unexported fields }
Engine stores the schema info and performs operations that keep itself up-to-date.
func NewEngineForTests ¶
func NewEngineForTests() *Engine
NewEngineForTests creates a new engine, that can't query the database, and will not send notifications. It starts opened, and doesn't reload. Use SetTableForTests to set table schema.
func (*Engine) Close ¶
func (se *Engine) Close()
Close shuts down Engine and is idempotent. It can be re-opened after Close.
func (*Engine) EnableHistorian ¶
EnableHistorian forces tracking to be on or off. Only used for testing.
func (*Engine) EnsureConnectionAndDB ¶ added in v0.8.0
func (se *Engine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error
EnsureConnectionAndDB ensures that we can connect to mysql. If tablet type is primary and there is no db, then the database is created. This function can be called before opening the Engine.
func (*Engine) GetConnection ¶
GetConnection returns a connection from the pool
func (*Engine) GetDBConnector ¶ added in v0.17.0
func (*Engine) GetSchema ¶
GetSchema returns the current schema. The Tables are a shared data structure and must be treated as read-only.
func (*Engine) GetTable ¶
func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table
GetTable returns the info for a table.
func (*Engine) GetTableForPos ¶
func (se *Engine) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error)
GetTableForPos returns a best-effort schema for a specific gtid
func (*Engine) InitDBConfig ¶
InitDBConfig must be called before Open.
func (*Engine) MakeNonPrimary ¶ added in v0.12.0
func (se *Engine) MakeNonPrimary()
MakeNonPrimary clears the sequence caches to make sure that they don't get accidentally reused after losing primaryship.
func (*Engine) MakePrimary ¶ added in v0.17.0
MakePrimary tells the schema engine that the current tablet is now the primary, so it can read and write to the MySQL instance for schema-tracking.
func (*Engine) Open ¶
Open initializes the Engine. Calling Open on an already open engine is a no-op.
func (*Engine) RegisterNotifier ¶
RegisterNotifier registers the function for schema change notification. It also causes an immediate notification to the caller. The notified function must not change the map or its contents. The only exception is the sequence table where the values can be changed using the lock.
func (*Engine) RegisterVersionEvent ¶
RegisterVersionEvent is called by the vstream when it encounters a version event (an insert into the schema_tracking table). It triggers the historian to load the newer rows from the database to update its cache.
func (*Engine) Reload ¶
Reload reloads the schema info from the db. Any tables that have changed since the last load are updated. The includeStats argument controls whether table size statistics should be emitted, as they can be expensive to calculate for a large number of tables
func (*Engine) ReloadAt ¶
ReloadAt reloads the schema info from the db. Any tables that have changed since the last load are updated. It maintains the position at which the schema was reloaded and if the same position is provided (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
func (*Engine) ReloadAtEx ¶ added in v0.16.0
ReloadAtEx reloads the schema info from the db. Any tables that have changed since the last load are updated. It maintains the position at which the schema was reloaded and if the same position is provided (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema The includeStats argument controls whether table size statistics should be emitted, as they can be expensive to calculate for a large number of tables
func (*Engine) SetTableForTests ¶
SetTableForTests puts a Table in the map directly.
func (*Engine) UnregisterNotifier ¶
UnregisterNotifier unregisters the notifier function.
type MessageInfo ¶
type MessageInfo struct { // Fields stores the field info to be // returned for subscribers. Fields []*querypb.Field // AckWaitDuration specifies how long to wait after // the message was first sent. The back-off doubles // every attempt. AckWaitDuration time.Duration // PurgeAfterDuration specifies the time after which // a successfully acked message can be deleted. PurgeAfterDuration time.Duration // BatchSize specifies the max number of events to // send per response. BatchSize int // CacheSize specifies the number of messages to keep // in cache. Anything that cannot fit in the cache // is sent as best effort. CacheSize int // PollInterval specifies the polling frequency to // look for messages to be sent. PollInterval time.Duration // MinBackoff specifies the shortest duration message manager // should wait before rescheduling a message MinBackoff time.Duration // MaxBackoff specifies the longest duration message manager // should wait before rescheduling a message MaxBackoff time.Duration }
MessageInfo contains info specific to message tables.
func (*MessageInfo) CachedSize ¶ added in v0.10.0
func (cached *MessageInfo) CachedSize(alloc bool) int64
type SequenceInfo ¶
SequenceInfo contains info specific to sequence tabels. It must be locked before accessing the values inside. If CurVal==LastVal, we have to cache new values. When the schema is first loaded, the values are all 0, which will trigger caching on first use.
type Table ¶
type Table struct { Name sqlparser.IdentifierCS Fields []*querypb.Field PKColumns []int Type int // SequenceInfo contains info for sequence tables. SequenceInfo *SequenceInfo // MessageInfo contains info for message tables. MessageInfo *MessageInfo CreateTime int64 FileSize uint64 AllocatedSize uint64 }
Table contains info about a table.
func LoadTable ¶
func LoadTable(conn *connpool.DBConn, databaseName, tableName, tableType string, comment string) (*Table, error)
LoadTable creates a Table from the schema info in the database.
func (*Table) CachedSize ¶ added in v0.10.0
func (*Table) FindColumn ¶
func (ta *Table) FindColumn(name sqlparser.IdentifierCI) int
FindColumn finds a column in the table. It returns the index if found. Otherwise, it returns -1.
func (*Table) GetPKColumn ¶
GetPKColumn returns the pk column specified by the index.
func (*Table) HasPrimary ¶
HasPrimary returns true if the table has a primary key.
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered.
func NewTracker ¶
NewTracker creates a Tracker, needs an Open SchemaEngine (which implements the trackerEngine interface)
type VStreamer ¶
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
}
VStreamer defines the functions of VStreamer that the replicationWatcher needs.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package schematest provides support for testing packages that depend on schema
|
Package schematest provides support for testing packages that depend on schema |