notifier

package
v1.1.0-beta.0...-35f329d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotReadyRetryLater = errors.New("not ready, retry later")

ErrNotReadyRetryLater should be returned by a registered handler that is not ready to process the events.

View Source
var ProcessEventsBatchSize = 1024

ProcessEventsBatchSize is the number of events to process in a SQL query. It's exposed for testing.

Functions

func PubSchemeChangeToStore

func PubSchemeChangeToStore(
	ctx context.Context,
	se *sess.Session,
	ddlJobID int64,
	subJobID int64,
	event *SchemaChangeEvent,
	store Store,
) error

PubSchemeChangeToStore publishes schema changes to the store to notify subscribers on the Store. It stages changes in given `se` so they will be visible when `se` further commits. When the DDL contains only one schema change, `subJobID` is -1. Otherwise, `subJobID` is the sub-job index of the DDL, like multi-schema change or batched create table.

Types

type CloseFn

type CloseFn func()

CloseFn is the function to release the resource.

type DDLNotifier

type DDLNotifier struct {
	// contains filtered or unexported fields
}

DDLNotifier implements the subscription on DDL events.

func NewDDLNotifier

func NewDDLNotifier(
	sysSessionPool util.SessionPool,
	store Store,
	pollInterval time.Duration,
) *DDLNotifier

NewDDLNotifier initializes the global DDLNotifier.

func (*DDLNotifier) OnBecomeOwner

func (n *DDLNotifier) OnBecomeOwner()

OnBecomeOwner implements the owner.Listener interface. We need to make sure only one DDLNotifier is running at any time.

func (*DDLNotifier) OnRetireOwner

func (n *DDLNotifier) OnRetireOwner()

OnRetireOwner implements the owner.Listener interface. After the owner is retired, we need to stop the DDLNotifier.

func (*DDLNotifier) RegisterHandler

func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler)

RegisterHandler must be called with an exclusive and fixed HandlerID for each handler to register the handler. Illegal ID will panic. RegisterHandler should not be called after the global DDLNotifier is started.

RegisterHandler is not concurrency-safe.

func (*DDLNotifier) Stop

func (n *DDLNotifier) Stop()

Stop stops the background loop. Exposed for testing. Do not call this function directly. Use owner.Listener interface instead.

type HandlerID

type HandlerID int

HandlerID is the type of the persistent ID used to register a handler. Every ID occupies a bit in a BIGINT column, so at most we can only have 64 IDs. To avoid duplicate IDs, all IDs should be defined in below declaration.

const (
	// TestHandlerID is used for testing only.
	TestHandlerID HandlerID = 0
	// StatsMetaHandlerID is used to update statistics system table.
	StatsMetaHandlerID HandlerID = 1
	// PriorityQueueHandlerID is used to update the priority queue.
	PriorityQueueHandlerID HandlerID = 2
)

func (HandlerID) String

func (id HandlerID) String() string

String implements fmt.Stringer interface.

type ListResult

type ListResult interface {
	// Read tries to decode at most `len(changes)` SchemaChange into given slices. It
	// returns the number of schemaChanges decoded, 0 means no more schemaChanges.
	//
	// Note that the previous SchemaChange in the slice will be overwritten when call
	// Read.
	Read(changes []*SchemaChange) (int, error)
}

ListResult is the result stream of a List operation.

type MiniDBInfoForSchemaEvent

type MiniDBInfoForSchemaEvent struct {
	ID     int64                          `json:"id"`
	Name   pmodel.CIStr                   `json:"name"`
	Tables []*MiniTableInfoForSchemaEvent `json:"tables,omitempty"`
}

MiniDBInfoForSchemaEvent is a mini version of DBInfo for DropSchemaEvent only.

type MiniPartitionInfoForSchemaEvent

type MiniPartitionInfoForSchemaEvent struct {
	ID   int64        `json:"id"`
	Name pmodel.CIStr `json:"name"`
}

MiniPartitionInfoForSchemaEvent is a mini version of PartitionInfo for DropSchemaEvent only. Note: Usually we encourage to use PartitionInfo instead of this mini version, but for DropSchemaEvent, it's more efficient to use this mini version. So please do not use this mini version in other places.

type MiniTableInfoForSchemaEvent

type MiniTableInfoForSchemaEvent struct {
	ID         int64                              `json:"id"`
	Name       pmodel.CIStr                       `json:"name"`
	Partitions []*MiniPartitionInfoForSchemaEvent `json:"partitions,omitempty"`
}

MiniTableInfoForSchemaEvent is a mini version of TableInfo for DropSchemaEvent only. Note: Usually we encourage to use TableInfo instead of this mini version, but for DropSchemaEvent, it's more efficient to use this mini version. So please do not use this mini version in other places.

type SchemaChange

type SchemaChange struct {
	// contains filtered or unexported fields
}

SchemaChange is the Golang representation of the persistent data. (ddlJobID, subJobID) should be unique in the cluster.

type SchemaChangeEvent

type SchemaChangeEvent struct {
	// contains filtered or unexported fields
}

SchemaChangeEvent stands for a schema change event. DDL will generate one event or multiple events (only for multi-schema change DDL or merged DDL). The caller should check the GetType of SchemaChange and call the corresponding getter function to retrieve the needed information.

func NewAddColumnEvent

func NewAddColumnEvent(
	tableInfo *model.TableInfo,
	newColumns []*model.ColumnInfo,
) *SchemaChangeEvent

NewAddColumnEvent creates a SchemaChangeEvent whose type is ActionAddColumn.

func NewAddIndexEvent

func NewAddIndexEvent(
	tableInfo *model.TableInfo,
	newIndexes []*model.IndexInfo,
) *SchemaChangeEvent

NewAddIndexEvent creates a schema change event whose type is ActionAddIndex.

func NewAddPartitionEvent

func NewAddPartitionEvent(
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewAddPartitionEvent creates a SchemaChangeEvent whose type is ActionAddPartition.

func NewAddPartitioningEvent

func NewAddPartitioningEvent(
	nonPartTableID int64,
	newGlobalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewAddPartitioningEvent creates a SchemaChangeEvent whose type is ActionAlterTablePartitioning. It means that a non-partitioned table is converted to a partitioned table. For example, `alter table t partition by range (c1) (partition p1 values less than (10))`.

func NewCreateTableEvent

func NewCreateTableEvent(
	newTableInfo *model.TableInfo,
) *SchemaChangeEvent

NewCreateTableEvent creates a SchemaChangeEvent whose type is ActionCreateTable.

func NewDropPartitionEvent

func NewDropPartitionEvent(
	globalTableInfo *model.TableInfo,
	droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewDropPartitionEvent creates a SchemaChangeEvent whose type is ActionDropTablePartition.

func NewDropSchemaEvent

func NewDropSchemaEvent(dbInfo *model.DBInfo, tables []*model.TableInfo) *SchemaChangeEvent

NewDropSchemaEvent creates a schema change event whose type is ActionDropSchema.

func NewDropTableEvent

func NewDropTableEvent(
	droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent

NewDropTableEvent creates a SchemaChangeEvent whose type is ActionDropTable.

func NewExchangePartitionEvent

func NewExchangePartitionEvent(
	globalTableInfo *model.TableInfo,
	partInfo *model.PartitionInfo,
	nonPartTableInfo *model.TableInfo,
) *SchemaChangeEvent

NewExchangePartitionEvent creates a SchemaChangeEvent whose type is ActionExchangeTablePartition.

func NewFlashbackClusterEvent

func NewFlashbackClusterEvent() *SchemaChangeEvent

NewFlashbackClusterEvent creates a schema change event whose type is ActionFlashbackCluster.

func NewModifyColumnEvent

func NewModifyColumnEvent(
	tableInfo *model.TableInfo,
	modifiedColumns []*model.ColumnInfo,
) *SchemaChangeEvent

NewModifyColumnEvent creates a SchemaChangeEvent whose type is ActionModifyColumn.

func NewRemovePartitioningEvent

func NewRemovePartitioningEvent(
	oldPartitionedTableID int64,
	nonPartitionTableInfo *model.TableInfo,
	droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewRemovePartitioningEvent creates a schema change event whose type is ActionRemovePartitioning.

func NewReorganizePartitionEvent

func NewReorganizePartitionEvent(
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
	droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewReorganizePartitionEvent creates a SchemaChangeEvent whose type is ActionReorganizePartition.

func NewTruncatePartitionEvent

func NewTruncatePartitionEvent(
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
	droppedPartInfo *model.PartitionInfo,
) *SchemaChangeEvent

NewTruncatePartitionEvent creates a SchemaChangeEvent whose type is ActionTruncateTablePartition.

func NewTruncateTableEvent

func NewTruncateTableEvent(
	newTableInfo *model.TableInfo,
	droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent

NewTruncateTableEvent creates a SchemaChangeEvent whose type is ActionTruncateTable.

func (*SchemaChangeEvent) GetAddColumnInfo

func (s *SchemaChangeEvent) GetAddColumnInfo() (
	tableInfo *model.TableInfo,
	columnInfos []*model.ColumnInfo,
)

GetAddColumnInfo returns the table info of the SchemaChangeEvent whose type is ActionAddColumn.

func (*SchemaChangeEvent) GetAddIndexInfo

func (s *SchemaChangeEvent) GetAddIndexInfo() (
	tableInfo *model.TableInfo,
	indexes []*model.IndexInfo,
)

GetAddIndexInfo returns the table info and added index info of the SchemaChangeEvent whose type is ActionAddIndex.

func (*SchemaChangeEvent) GetAddPartitionInfo

func (s *SchemaChangeEvent) GetAddPartitionInfo() (
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
)

GetAddPartitionInfo returns the global table info and partition info of the SchemaChangeEvent whose type is ActionAddPartition.

func (*SchemaChangeEvent) GetAddPartitioningInfo

func (s *SchemaChangeEvent) GetAddPartitioningInfo() (
	nonPartTableID int64,
	newGlobalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
)

GetAddPartitioningInfo returns the old table ID of non-partitioned table, new global table info and added partition info of the SchemaChangeEvent whose type is ActionAlterTablePartitioning.

func (*SchemaChangeEvent) GetCreateTableInfo

func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo

GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type is ActionCreateTable.

func (*SchemaChangeEvent) GetDropPartitionInfo

func (s *SchemaChangeEvent) GetDropPartitionInfo() (
	globalTableInfo *model.TableInfo,
	droppedPartInfo *model.PartitionInfo,
)

GetDropPartitionInfo returns the global table info and dropped partition info of the SchemaChangeEvent whose type is ActionDropTablePartition.

func (*SchemaChangeEvent) GetDropSchemaInfo

func (s *SchemaChangeEvent) GetDropSchemaInfo() (miniDBInfo *MiniDBInfoForSchemaEvent)

GetDropSchemaInfo returns the database info and tables of the SchemaChangeEvent whose type is ActionDropSchema.

func (*SchemaChangeEvent) GetDropTableInfo

func (s *SchemaChangeEvent) GetDropTableInfo() (droppedTableInfo *model.TableInfo)

GetDropTableInfo returns the table info of the SchemaChangeEvent whose type is ActionDropTable.

func (*SchemaChangeEvent) GetExchangePartitionInfo

func (s *SchemaChangeEvent) GetExchangePartitionInfo() (
	globalTableInfo *model.TableInfo,
	partInfo *model.PartitionInfo,
	nonPartTableInfo *model.TableInfo,
)

GetExchangePartitionInfo returns the global table info, exchanged partition info and original non-partitioned table info of the SchemaChangeEvent whose type is ActionExchangeTablePartition.

func (*SchemaChangeEvent) GetModifyColumnInfo

func (s *SchemaChangeEvent) GetModifyColumnInfo() (
	newTableInfo *model.TableInfo,
	modifiedColumns []*model.ColumnInfo,
)

GetModifyColumnInfo returns the table info and modified column info the SchemaChangeEvent whose type is ActionModifyColumn.

func (*SchemaChangeEvent) GetRemovePartitioningInfo

func (s *SchemaChangeEvent) GetRemovePartitioningInfo() (
	oldPartitionedTableID int64,
	newSingleTableInfo *model.TableInfo,
	droppedPartInfo *model.PartitionInfo,
)

GetRemovePartitioningInfo returns the table info and partition info of the SchemaChangeEvent whose type is ActionRemovePartitioning.

func (*SchemaChangeEvent) GetReorganizePartitionInfo

func (s *SchemaChangeEvent) GetReorganizePartitionInfo() (
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
	droppedPartInfo *model.PartitionInfo,
)

GetReorganizePartitionInfo returns the global table info, added partition info and deleted partition info of the SchemaChangeEvent whose type is ActionReorganizePartition.

func (*SchemaChangeEvent) GetTruncatePartitionInfo

func (s *SchemaChangeEvent) GetTruncatePartitionInfo() (
	globalTableInfo *model.TableInfo,
	addedPartInfo *model.PartitionInfo,
	droppedPartInfo *model.PartitionInfo,
)

GetTruncatePartitionInfo returns the global table info, added partition info and deleted partition info of the SchemaChangeEvent whose type is ActionTruncateTablePartition.

func (*SchemaChangeEvent) GetTruncateTableInfo

func (s *SchemaChangeEvent) GetTruncateTableInfo() (
	newTableInfo *model.TableInfo,
	droppedTableInfo *model.TableInfo,
)

GetTruncateTableInfo returns the new and old table info of the SchemaChangeEvent whose type is ActionTruncateTable.

func (*SchemaChangeEvent) GetType

func (s *SchemaChangeEvent) GetType() model.ActionType

GetType returns the type of the schema change event.

func (*SchemaChangeEvent) MarshalJSON

func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface.

func (*SchemaChangeEvent) String

func (s *SchemaChangeEvent) String() string

String implements fmt.Stringer interface.

func (*SchemaChangeEvent) UnmarshalJSON

func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error

UnmarshalJSON implements the json.Unmarshaler interface.

type SchemaChangeHandler

type SchemaChangeHandler func(
	ctx context.Context,
	sctx sessionctx.Context,
	change *SchemaChangeEvent,
) error

SchemaChangeHandler function is used by subscribers to handle the SchemaChangeEvent generated by the publisher (DDL module currently). It will be called at least once for every SchemaChange. The sctx has already started a pessimistic transaction and handler should execute exactly once SQL modification logic with it. After the function is returned, subscribing framework will commit the whole transaction with internal flag modification to provide exactly-once delivery. The handler will be called periodically, with no guarantee about the latency between the execution time and SchemaChangeEvent happening time.

The handler function must be registered by RegisterHandler before the DDLNotifier is started. If the handler can't immediately serve the handling after registering, it can return nil to tell the DDLNotifier to act like the change has been handled, or return ErrNotReadyRetryLater to hold the change and re-handle later.

type Store

type Store interface {
	Insert(context.Context, *sess.Session, *SchemaChange) error
	UpdateProcessed(
		ctx context.Context,
		se *sess.Session,
		ddlJobID int64,
		multiSchemaChangeID int64,
		processedBy uint64,
	) error
	DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
	// List will start a transaction of given session and read all schema changes
	// through a ListResult. The ownership of session is occupied by Store until
	// CloseFn is called.
	List(ctx context.Context, se *sess.Session) (ListResult, CloseFn)
}

Store is the (de)serialization and persistent layer.

func OpenTableStore

func OpenTableStore(db, table string) Store

OpenTableStore opens a store on a created table `db`.`table`. The table should be created with the table structure:

ddl_job_id BIGINT,
sub_job_id BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL or a merged DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL or a merged DDL',
schema_change JSON COMMENT 'SchemaChange at rest',
processed_by_flag BIGINT UNSIGNED DEFAULT 0 COMMENT 'flag to mark which subscriber has processed the event',
PRIMARY KEY(ddl_job_id, multi_schema_change_id)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL