schema

package
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2023 License: Apache-2.0 Imports: 42 Imported by: 14

Documentation

Index

Constants

View Source
const (
	NoType = iota
	Sequence
	Message
	View
)

Table types

Variables

View Source
var TypeNames = []string{
	"none",
	"sequence",
	"message",
	"view",
}

TypeNames allows to fetch a the type name for a table. Count must match the number of table types.

Functions

func GetFetchTableAndViewsQuery added in v0.17.0

func GetFetchTableAndViewsQuery(tableNames []string) (string, error)

GetFetchTableAndViewsQuery gets the fetch query to run for getting the listed tables and views. If no table names are provided, then all the tables and views are fetched.

func GetFetchTableQuery added in v0.17.0

func GetFetchTableQuery(tableNames []string) (string, error)

GetFetchTableQuery gets the fetch query to run for getting the listed tables. If no tables are provided, then all the tables are fetched.

func GetFetchViewQuery added in v0.17.0

func GetFetchViewQuery(viewNames []string) (string, error)

GetFetchViewQuery gets the fetch query to run for getting the listed views. If no views are provided, then all the views are fetched.

func MustReloadSchemaOnDDL added in v0.11.0

func MustReloadSchemaOnDDL(sql string, dbname string) bool

MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact

Types

type Engine

type Engine struct {

	// SkipMetaCheck skips the metadata about the database and table information
	SkipMetaCheck bool

	SchemaReloadTimings *servenv.TimingsWrapper
	// contains filtered or unexported fields
}

Engine stores the schema info and performs operations that keep itself up-to-date.

func NewEngine

func NewEngine(env tabletenv.Env) *Engine

NewEngine creates a new Engine.

func NewEngineForTests

func NewEngineForTests() *Engine

NewEngineForTests creates a new engine, that can't query the database, and will not send notifications. It starts opened, and doesn't reload. Use SetTableForTests to set table schema.

func (*Engine) Close

func (se *Engine) Close()

Close shuts down Engine and is idempotent. It can be re-opened after Close.

func (*Engine) EnableHistorian

func (se *Engine) EnableHistorian(enabled bool) error

EnableHistorian forces tracking to be on or off. Only used for testing.

func (*Engine) EnsureConnectionAndDB added in v0.8.0

func (se *Engine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error

EnsureConnectionAndDB ensures that we can connect to mysql. If tablet type is primary and there is no db, then the database is created. This function can be called before opening the Engine.

func (*Engine) GetConnection

func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error)

GetConnection returns a connection from the pool

func (*Engine) GetDBConnector added in v0.17.0

func (se *Engine) GetDBConnector() dbconfigs.Connector

func (*Engine) GetSchema

func (se *Engine) GetSchema() map[string]*Table

GetSchema returns the current schema. The Tables are a shared data structure and must be treated as read-only.

func (*Engine) GetTable

func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table

GetTable returns the info for a table.

func (*Engine) GetTableForPos

func (se *Engine) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error)

GetTableForPos returns a best-effort schema for a specific gtid

func (*Engine) InitDBConfig

func (se *Engine) InitDBConfig(cp dbconfigs.Connector)

InitDBConfig must be called before Open.

func (*Engine) IsOpen

func (se *Engine) IsOpen() bool

IsOpen checks if engine is open

func (*Engine) MakeNonPrimary added in v0.12.0

func (se *Engine) MakeNonPrimary()

MakeNonPrimary clears the sequence caches to make sure that they don't get accidentally reused after losing primaryship.

func (*Engine) MakePrimary added in v0.17.0

func (se *Engine) MakePrimary(serving bool)

MakePrimary tells the schema engine that the current tablet is now the primary, so it can read and write to the MySQL instance for schema-tracking.

func (*Engine) MarshalMinimalSchema added in v0.15.5

func (se *Engine) MarshalMinimalSchema() ([]byte, error)

MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema

func (*Engine) Open

func (se *Engine) Open() error

Open initializes the Engine. Calling Open on an already open engine is a no-op.

func (*Engine) RegisterNotifier

func (se *Engine) RegisterNotifier(name string, f notifier, runNotifier bool)

RegisterNotifier registers the function for schema change notification. It also causes an immediate notification to the caller. The notified function must not change the map or its contents. The only exception is the sequence table where the values can be changed using the lock.

func (*Engine) RegisterVersionEvent

func (se *Engine) RegisterVersionEvent() error

RegisterVersionEvent is called by the vstream when it encounters a version event (an insert into the schema_tracking table). It triggers the historian to load the newer rows from the database to update its cache.

func (*Engine) Reload

func (se *Engine) Reload(ctx context.Context) error

Reload reloads the schema info from the db. Any tables that have changed since the last load are updated. The includeStats argument controls whether table size statistics should be emitted, as they can be expensive to calculate for a large number of tables

func (*Engine) ReloadAt

func (se *Engine) ReloadAt(ctx context.Context, pos replication.Position) error

ReloadAt reloads the schema info from the db. Any tables that have changed since the last load are updated. It maintains the position at which the schema was reloaded and if the same position is provided (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema

func (*Engine) ReloadAtEx added in v0.16.0

func (se *Engine) ReloadAtEx(ctx context.Context, pos replication.Position, includeStats bool) error

ReloadAtEx reloads the schema info from the db. Any tables that have changed since the last load are updated. It maintains the position at which the schema was reloaded and if the same position is provided (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema The includeStats argument controls whether table size statistics should be emitted, as they can be expensive to calculate for a large number of tables

func (*Engine) ResetSequences added in v0.18.0

func (se *Engine) ResetSequences(tables []string) error

func (*Engine) SetTableForTests

func (se *Engine) SetTableForTests(table *Table)

SetTableForTests puts a Table in the map directly.

func (*Engine) UnregisterNotifier

func (se *Engine) UnregisterNotifier(name string)

UnregisterNotifier unregisters the notifier function.

type MessageInfo

type MessageInfo struct {
	// Fields stores the field info to be
	// returned for subscribers.
	Fields []*querypb.Field

	// AckWaitDuration specifies how long to wait after
	// the message was first sent. The back-off doubles
	// every attempt.
	AckWaitDuration time.Duration

	// PurgeAfterDuration specifies the time after which
	// a successfully acked message can be deleted.
	PurgeAfterDuration time.Duration

	// BatchSize specifies the max number of events to
	// send per response.
	BatchSize int

	// CacheSize specifies the number of messages to keep
	// in cache. Anything that cannot fit in the cache
	// is sent as best effort.
	CacheSize int

	// PollInterval specifies the polling frequency to
	// look for messages to be sent.
	PollInterval time.Duration

	// MinBackoff specifies the shortest duration message manager
	// should wait before rescheduling a message
	MinBackoff time.Duration

	// MaxBackoff specifies the longest duration message manager
	// should wait before rescheduling a message
	MaxBackoff time.Duration
}

MessageInfo contains info specific to message tables.

func (*MessageInfo) CachedSize added in v0.10.0

func (cached *MessageInfo) CachedSize(alloc bool) int64

type SequenceInfo

type SequenceInfo struct {
	sync.Mutex
	NextVal int64
	LastVal int64
}

SequenceInfo contains info specific to sequence tabels. It must be locked before accessing the values inside. If CurVal==LastVal, we have to cache new values. When the schema is first loaded, the values are all 0, which will trigger caching on first use.

func (*SequenceInfo) Reset added in v0.18.0

func (seq *SequenceInfo) Reset()

Reset clears the cache for the sequence. This is called to ensure that we always start with a fresh cache, when a new primary is elected, and, when a table is moved into a new keyspace. When we first need a new value from a sequence, i.e. when the schema engine sees a uninitialized sequence, it will get the next set of values from the backing sequence table and cache them.

func (*SequenceInfo) String added in v0.18.0

func (seq *SequenceInfo) String()

type Table

type Table struct {
	Name      sqlparser.IdentifierCS
	Fields    []*querypb.Field
	PKColumns []int
	Type      int

	// SequenceInfo contains info for sequence tables.
	SequenceInfo *SequenceInfo

	// MessageInfo contains info for message tables.
	MessageInfo *MessageInfo

	CreateTime    int64
	FileSize      uint64
	AllocatedSize uint64
}

Table contains info about a table.

func LoadTable

func LoadTable(conn *connpool.DBConn, databaseName, tableName, tableType string, comment string) (*Table, error)

LoadTable creates a Table from the schema info in the database.

func NewTable

func NewTable(name string, tableType int) *Table

NewTable creates a new Table.

func (*Table) CachedSize added in v0.10.0

func (cached *Table) CachedSize(alloc bool) int64

func (*Table) FindColumn

func (ta *Table) FindColumn(name sqlparser.IdentifierCI) int

FindColumn finds a column in the table. It returns the index if found. Otherwise, it returns -1.

func (*Table) GetPKColumn

func (ta *Table) GetPKColumn(index int) *querypb.Field

GetPKColumn returns the pk column specified by the index.

func (*Table) HasPrimary

func (ta *Table) HasPrimary() bool

HasPrimary returns true if the table has a primary key.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered.

func NewTracker

func NewTracker(env tabletenv.Env, vs VStreamer, engine *Engine) *Tracker

NewTracker creates a Tracker, needs an Open SchemaEngine (which implements the trackerEngine interface)

func (*Tracker) Close

func (tr *Tracker) Close()

Close disables the tracker functionality

func (*Tracker) Enable

func (tr *Tracker) Enable(enabled bool)

Enable forces tracking to be on or off. Only used for testing.

func (*Tracker) Open

func (tr *Tracker) Open()

Open enables the tracker functionality

type VStreamer

type VStreamer interface {
	Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
}

VStreamer defines the functions of VStreamer that the replicationWatcher needs.

Directories

Path Synopsis
Package schematest provides support for testing packages that depend on schema
Package schematest provides support for testing packages that depend on schema

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL