Documentation ¶
Index ¶
- Variables
- func PubSchemeChangeToStore(ctx context.Context, se *sess.Session, ddlJobID int64, subJobID int64, ...) error
- type CloseFn
- type DDLNotifier
- type HandlerID
- type ListResult
- type MiniDBInfoForSchemaEvent
- type MiniPartitionInfoForSchemaEvent
- type MiniTableInfoForSchemaEvent
- type SchemaChange
- type SchemaChangeEvent
- func NewAddColumnEvent(tableInfo *model.TableInfo, newColumns []*model.ColumnInfo) *SchemaChangeEvent
- func NewAddIndexEvent(tableInfo *model.TableInfo, newIndexes []*model.IndexInfo) *SchemaChangeEvent
- func NewAddPartitionEvent(globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo) *SchemaChangeEvent
- func NewAddPartitioningEvent(nonPartTableID int64, newGlobalTableInfo *model.TableInfo, ...) *SchemaChangeEvent
- func NewCreateTableEvent(newTableInfo *model.TableInfo) *SchemaChangeEvent
- func NewDropPartitionEvent(globalTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo) *SchemaChangeEvent
- func NewDropSchemaEvent(dbInfo *model.DBInfo, tables []*model.TableInfo) *SchemaChangeEvent
- func NewDropTableEvent(droppedTableInfo *model.TableInfo) *SchemaChangeEvent
- func NewExchangePartitionEvent(globalTableInfo *model.TableInfo, partInfo *model.PartitionInfo, ...) *SchemaChangeEvent
- func NewFlashbackClusterEvent() *SchemaChangeEvent
- func NewModifyColumnEvent(tableInfo *model.TableInfo, modifiedColumns []*model.ColumnInfo) *SchemaChangeEvent
- func NewRemovePartitioningEvent(oldPartitionedTableID int64, nonPartitionTableInfo *model.TableInfo, ...) *SchemaChangeEvent
- func NewReorganizePartitionEvent(globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ...) *SchemaChangeEvent
- func NewTruncatePartitionEvent(globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ...) *SchemaChangeEvent
- func NewTruncateTableEvent(newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo) *SchemaChangeEvent
- func (s *SchemaChangeEvent) GetAddColumnInfo() (tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo)
- func (s *SchemaChangeEvent) GetAddIndexInfo() (tableInfo *model.TableInfo, indexes []*model.IndexInfo)
- func (s *SchemaChangeEvent) GetAddPartitionInfo() (globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo)
- func (s *SchemaChangeEvent) GetAddPartitioningInfo() (nonPartTableID int64, newGlobalTableInfo *model.TableInfo, ...)
- func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo
- func (s *SchemaChangeEvent) GetDropPartitionInfo() (globalTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo)
- func (s *SchemaChangeEvent) GetDropSchemaInfo() (miniDBInfo *MiniDBInfoForSchemaEvent)
- func (s *SchemaChangeEvent) GetDropTableInfo() (droppedTableInfo *model.TableInfo)
- func (s *SchemaChangeEvent) GetExchangePartitionInfo() (globalTableInfo *model.TableInfo, partInfo *model.PartitionInfo, ...)
- func (s *SchemaChangeEvent) GetModifyColumnInfo() (newTableInfo *model.TableInfo, modifiedColumns []*model.ColumnInfo)
- func (s *SchemaChangeEvent) GetRemovePartitioningInfo() (oldPartitionedTableID int64, newSingleTableInfo *model.TableInfo, ...)
- func (s *SchemaChangeEvent) GetReorganizePartitionInfo() (globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ...)
- func (s *SchemaChangeEvent) GetTruncatePartitionInfo() (globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ...)
- func (s *SchemaChangeEvent) GetTruncateTableInfo() (newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo)
- func (s *SchemaChangeEvent) GetType() model.ActionType
- func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error)
- func (s *SchemaChangeEvent) String() string
- func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error
- type SchemaChangeHandler
- type Store
Constants ¶
This section is empty.
Variables ¶
var ErrNotReadyRetryLater = errors.New("not ready, retry later")
ErrNotReadyRetryLater should be returned by a registered handler that is not ready to process the events.
var ProcessEventsBatchSize = 1024
ProcessEventsBatchSize is the number of events to process in a SQL query. It's exposed for testing.
Functions ¶
func PubSchemeChangeToStore ¶
func PubSchemeChangeToStore( ctx context.Context, se *sess.Session, ddlJobID int64, subJobID int64, event *SchemaChangeEvent, store Store, ) error
PubSchemeChangeToStore publishes schema changes to the store to notify subscribers on the Store. It stages changes in given `se` so they will be visible when `se` further commits. When the DDL contains only one schema change, `subJobID` is -1. Otherwise, `subJobID` is the sub-job index of the DDL, like multi-schema change or batched create table.
Types ¶
type DDLNotifier ¶
type DDLNotifier struct {
// contains filtered or unexported fields
}
DDLNotifier implements the subscription on DDL events.
func NewDDLNotifier ¶
func NewDDLNotifier( sysSessionPool util.SessionPool, store Store, pollInterval time.Duration, ) *DDLNotifier
NewDDLNotifier initializes the global DDLNotifier.
func (*DDLNotifier) OnBecomeOwner ¶
func (n *DDLNotifier) OnBecomeOwner()
OnBecomeOwner implements the owner.Listener interface. We need to make sure only one DDLNotifier is running at any time.
func (*DDLNotifier) OnRetireOwner ¶
func (n *DDLNotifier) OnRetireOwner()
OnRetireOwner implements the owner.Listener interface. After the owner is retired, we need to stop the DDLNotifier.
func (*DDLNotifier) RegisterHandler ¶
func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler)
RegisterHandler must be called with an exclusive and fixed HandlerID for each handler to register the handler. Illegal ID will panic. RegisterHandler should not be called after the global DDLNotifier is started.
RegisterHandler is not concurrency-safe.
func (*DDLNotifier) Stop ¶
func (n *DDLNotifier) Stop()
Stop stops the background loop. Exposed for testing. Do not call this function directly. Use owner.Listener interface instead.
type HandlerID ¶
type HandlerID int
HandlerID is the type of the persistent ID used to register a handler. Every ID occupies a bit in a BIGINT column, so at most we can only have 64 IDs. To avoid duplicate IDs, all IDs should be defined in below declaration.
type ListResult ¶
type ListResult interface { // Read tries to decode at most `len(changes)` SchemaChange into given slices. It // returns the number of schemaChanges decoded, 0 means no more schemaChanges. // // Note that the previous SchemaChange in the slice will be overwritten when call // Read. Read(changes []*SchemaChange) (int, error) }
ListResult is the result stream of a List operation.
type MiniDBInfoForSchemaEvent ¶
type MiniDBInfoForSchemaEvent struct { ID int64 `json:"id"` Name pmodel.CIStr `json:"name"` Tables []*MiniTableInfoForSchemaEvent `json:"tables,omitempty"` }
MiniDBInfoForSchemaEvent is a mini version of DBInfo for DropSchemaEvent only.
type MiniPartitionInfoForSchemaEvent ¶
type MiniPartitionInfoForSchemaEvent struct { ID int64 `json:"id"` Name pmodel.CIStr `json:"name"` }
MiniPartitionInfoForSchemaEvent is a mini version of PartitionInfo for DropSchemaEvent only. Note: Usually we encourage to use PartitionInfo instead of this mini version, but for DropSchemaEvent, it's more efficient to use this mini version. So please do not use this mini version in other places.
type MiniTableInfoForSchemaEvent ¶
type MiniTableInfoForSchemaEvent struct { ID int64 `json:"id"` Name pmodel.CIStr `json:"name"` Partitions []*MiniPartitionInfoForSchemaEvent `json:"partitions,omitempty"` }
MiniTableInfoForSchemaEvent is a mini version of TableInfo for DropSchemaEvent only. Note: Usually we encourage to use TableInfo instead of this mini version, but for DropSchemaEvent, it's more efficient to use this mini version. So please do not use this mini version in other places.
type SchemaChange ¶
type SchemaChange struct {
// contains filtered or unexported fields
}
SchemaChange is the Golang representation of the persistent data. (ddlJobID, subJobID) should be unique in the cluster.
type SchemaChangeEvent ¶
type SchemaChangeEvent struct {
// contains filtered or unexported fields
}
SchemaChangeEvent stands for a schema change event. DDL will generate one event or multiple events (only for multi-schema change DDL or merged DDL). The caller should check the GetType of SchemaChange and call the corresponding getter function to retrieve the needed information.
func NewAddColumnEvent ¶
func NewAddColumnEvent( tableInfo *model.TableInfo, newColumns []*model.ColumnInfo, ) *SchemaChangeEvent
NewAddColumnEvent creates a SchemaChangeEvent whose type is ActionAddColumn.
func NewAddIndexEvent ¶
func NewAddIndexEvent( tableInfo *model.TableInfo, newIndexes []*model.IndexInfo, ) *SchemaChangeEvent
NewAddIndexEvent creates a schema change event whose type is ActionAddIndex.
func NewAddPartitionEvent ¶
func NewAddPartitionEvent( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewAddPartitionEvent creates a SchemaChangeEvent whose type is ActionAddPartition.
func NewAddPartitioningEvent ¶
func NewAddPartitioningEvent( nonPartTableID int64, newGlobalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewAddPartitioningEvent creates a SchemaChangeEvent whose type is ActionAlterTablePartitioning. It means that a non-partitioned table is converted to a partitioned table. For example, `alter table t partition by range (c1) (partition p1 values less than (10))`.
func NewCreateTableEvent ¶
func NewCreateTableEvent( newTableInfo *model.TableInfo, ) *SchemaChangeEvent
NewCreateTableEvent creates a SchemaChangeEvent whose type is ActionCreateTable.
func NewDropPartitionEvent ¶
func NewDropPartitionEvent( globalTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewDropPartitionEvent creates a SchemaChangeEvent whose type is ActionDropTablePartition.
func NewDropSchemaEvent ¶
func NewDropSchemaEvent(dbInfo *model.DBInfo, tables []*model.TableInfo) *SchemaChangeEvent
NewDropSchemaEvent creates a schema change event whose type is ActionDropSchema.
func NewDropTableEvent ¶
func NewDropTableEvent( droppedTableInfo *model.TableInfo, ) *SchemaChangeEvent
NewDropTableEvent creates a SchemaChangeEvent whose type is ActionDropTable.
func NewExchangePartitionEvent ¶
func NewExchangePartitionEvent( globalTableInfo *model.TableInfo, partInfo *model.PartitionInfo, nonPartTableInfo *model.TableInfo, ) *SchemaChangeEvent
NewExchangePartitionEvent creates a SchemaChangeEvent whose type is ActionExchangeTablePartition.
func NewFlashbackClusterEvent ¶
func NewFlashbackClusterEvent() *SchemaChangeEvent
NewFlashbackClusterEvent creates a schema change event whose type is ActionFlashbackCluster.
func NewModifyColumnEvent ¶
func NewModifyColumnEvent( tableInfo *model.TableInfo, modifiedColumns []*model.ColumnInfo, ) *SchemaChangeEvent
NewModifyColumnEvent creates a SchemaChangeEvent whose type is ActionModifyColumn.
func NewRemovePartitioningEvent ¶
func NewRemovePartitioningEvent( oldPartitionedTableID int64, nonPartitionTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewRemovePartitioningEvent creates a schema change event whose type is ActionRemovePartitioning.
func NewReorganizePartitionEvent ¶
func NewReorganizePartitionEvent( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewReorganizePartitionEvent creates a SchemaChangeEvent whose type is ActionReorganizePartition.
func NewTruncatePartitionEvent ¶
func NewTruncatePartitionEvent( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, ) *SchemaChangeEvent
NewTruncatePartitionEvent creates a SchemaChangeEvent whose type is ActionTruncateTablePartition.
func NewTruncateTableEvent ¶
func NewTruncateTableEvent( newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo, ) *SchemaChangeEvent
NewTruncateTableEvent creates a SchemaChangeEvent whose type is ActionTruncateTable.
func (*SchemaChangeEvent) GetAddColumnInfo ¶
func (s *SchemaChangeEvent) GetAddColumnInfo() ( tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo, )
GetAddColumnInfo returns the table info of the SchemaChangeEvent whose type is ActionAddColumn.
func (*SchemaChangeEvent) GetAddIndexInfo ¶
func (s *SchemaChangeEvent) GetAddIndexInfo() ( tableInfo *model.TableInfo, indexes []*model.IndexInfo, )
GetAddIndexInfo returns the table info and added index info of the SchemaChangeEvent whose type is ActionAddIndex.
func (*SchemaChangeEvent) GetAddPartitionInfo ¶
func (s *SchemaChangeEvent) GetAddPartitionInfo() ( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, )
GetAddPartitionInfo returns the global table info and partition info of the SchemaChangeEvent whose type is ActionAddPartition.
func (*SchemaChangeEvent) GetAddPartitioningInfo ¶
func (s *SchemaChangeEvent) GetAddPartitioningInfo() ( nonPartTableID int64, newGlobalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, )
GetAddPartitioningInfo returns the old table ID of non-partitioned table, new global table info and added partition info of the SchemaChangeEvent whose type is ActionAlterTablePartitioning.
func (*SchemaChangeEvent) GetCreateTableInfo ¶
func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo
GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type is ActionCreateTable.
func (*SchemaChangeEvent) GetDropPartitionInfo ¶
func (s *SchemaChangeEvent) GetDropPartitionInfo() ( globalTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, )
GetDropPartitionInfo returns the global table info and dropped partition info of the SchemaChangeEvent whose type is ActionDropTablePartition.
func (*SchemaChangeEvent) GetDropSchemaInfo ¶
func (s *SchemaChangeEvent) GetDropSchemaInfo() (miniDBInfo *MiniDBInfoForSchemaEvent)
GetDropSchemaInfo returns the database info and tables of the SchemaChangeEvent whose type is ActionDropSchema.
func (*SchemaChangeEvent) GetDropTableInfo ¶
func (s *SchemaChangeEvent) GetDropTableInfo() (droppedTableInfo *model.TableInfo)
GetDropTableInfo returns the table info of the SchemaChangeEvent whose type is ActionDropTable.
func (*SchemaChangeEvent) GetExchangePartitionInfo ¶
func (s *SchemaChangeEvent) GetExchangePartitionInfo() ( globalTableInfo *model.TableInfo, partInfo *model.PartitionInfo, nonPartTableInfo *model.TableInfo, )
GetExchangePartitionInfo returns the global table info, exchanged partition info and original non-partitioned table info of the SchemaChangeEvent whose type is ActionExchangeTablePartition.
func (*SchemaChangeEvent) GetModifyColumnInfo ¶
func (s *SchemaChangeEvent) GetModifyColumnInfo() ( newTableInfo *model.TableInfo, modifiedColumns []*model.ColumnInfo, )
GetModifyColumnInfo returns the table info and modified column info the SchemaChangeEvent whose type is ActionModifyColumn.
func (*SchemaChangeEvent) GetRemovePartitioningInfo ¶
func (s *SchemaChangeEvent) GetRemovePartitioningInfo() ( oldPartitionedTableID int64, newSingleTableInfo *model.TableInfo, droppedPartInfo *model.PartitionInfo, )
GetRemovePartitioningInfo returns the table info and partition info of the SchemaChangeEvent whose type is ActionRemovePartitioning.
func (*SchemaChangeEvent) GetReorganizePartitionInfo ¶
func (s *SchemaChangeEvent) GetReorganizePartitionInfo() ( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, )
GetReorganizePartitionInfo returns the global table info, added partition info and deleted partition info of the SchemaChangeEvent whose type is ActionReorganizePartition.
func (*SchemaChangeEvent) GetTruncatePartitionInfo ¶
func (s *SchemaChangeEvent) GetTruncatePartitionInfo() ( globalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo, droppedPartInfo *model.PartitionInfo, )
GetTruncatePartitionInfo returns the global table info, added partition info and deleted partition info of the SchemaChangeEvent whose type is ActionTruncateTablePartition.
func (*SchemaChangeEvent) GetTruncateTableInfo ¶
func (s *SchemaChangeEvent) GetTruncateTableInfo() ( newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo, )
GetTruncateTableInfo returns the new and old table info of the SchemaChangeEvent whose type is ActionTruncateTable.
func (*SchemaChangeEvent) GetType ¶
func (s *SchemaChangeEvent) GetType() model.ActionType
GetType returns the type of the schema change event.
func (*SchemaChangeEvent) MarshalJSON ¶
func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface.
func (*SchemaChangeEvent) String ¶
func (s *SchemaChangeEvent) String() string
String implements fmt.Stringer interface.
func (*SchemaChangeEvent) UnmarshalJSON ¶
func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error
UnmarshalJSON implements the json.Unmarshaler interface.
type SchemaChangeHandler ¶
type SchemaChangeHandler func( ctx context.Context, sctx sessionctx.Context, change *SchemaChangeEvent, ) error
SchemaChangeHandler function is used by subscribers to handle the SchemaChangeEvent generated by the publisher (DDL module currently). It will be called at least once for every SchemaChange. The sctx has already started a pessimistic transaction and handler should execute exactly once SQL modification logic with it. After the function is returned, subscribing framework will commit the whole transaction with internal flag modification to provide exactly-once delivery. The handler will be called periodically, with no guarantee about the latency between the execution time and SchemaChangeEvent happening time.
The handler function must be registered by RegisterHandler before the DDLNotifier is started. If the handler can't immediately serve the handling after registering, it can return nil to tell the DDLNotifier to act like the change has been handled, or return ErrNotReadyRetryLater to hold the change and re-handle later.
type Store ¶
type Store interface { Insert(context.Context, *sess.Session, *SchemaChange) error UpdateProcessed( ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int64, processedBy uint64, ) error DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error // List will start a transaction of given session and read all schema changes // through a ListResult. The ownership of session is occupied by Store until // CloseFn is called. List(ctx context.Context, se *sess.Session) (ListResult, CloseFn) }
Store is the (de)serialization and persistent layer.
func OpenTableStore ¶
OpenTableStore opens a store on a created table `db`.`table`. The table should be created with the table structure:
ddl_job_id BIGINT, sub_job_id BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL or a merged DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL or a merged DDL', schema_change JSON COMMENT 'SchemaChange at rest', processed_by_flag BIGINT UNSIGNED DEFAULT 0 COMMENT 'flag to mark which subscriber has processed the event', PRIMARY KEY(ddl_job_id, multi_schema_change_id)