swosync

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

README

Logical Sync

Package swosync handles the logical replication from the source DB to the destination database during switchover.

Theory of operation

Triggers record all changes (INSERT, UPDATE, DELETE) in the change_log table as table, row_id pairs, only tracking a set of changed rows but not their point-in-time data. The changes are then read in and applied in batches by reading the CURRENT state of the row from the source database and writing it to the destination database at the time of sync.

Replicating point-in-time differences between "snapshots" avoids the need for a sequential solution for concurrent updates and intermediate row states by only syncing the final result. It also becomes more efficient because each row must be replicated at most once, even when multiple updates occur between sync points.

The process depends on having a consistent view of the source database, which a serializable transaction can obtain, or during a stop-the-world lock (during the final sync).

Basic strategy
  1. Read all changes (table and row ids)
  2. Fetch row data for each changed row
  3. Insert rows from old DB that are missing in new DB, in fkey-dependency order
  4. Update rows from old DB that exist in both, in fkey-dependency order
  5. Delete rows missing from the old DB that exist in the new DB, in reverse-fkey-dependency order
  6. Delete synced entries from the change_log table
  7. Repeat until both DBs are close in sync
  8. Obtain a stop-the-world lock
  9. Perform final sync, and update the use_next_db pointer
  10. Release the stop-the-world lock
  11. New DB is used for all future transactions

Further Notes

It is essential to keep the sync loop as tight as possible, particularly in "final sync" mode. The final sync will pause all transactions during its synchronization process; this is necessary to ensure that the database is in a consistent state with no leftover changes before setting the use_next_db pointer.

Round Trips
  • 1 to start tx, read all change ids & sequences (also stop-the-world lock in final mode)
  • 1 to fetch row data from each table (single batch, 1 query per table)
  • 1 to apply all updates to the new DB
  • 1 to commit src tx (also updates use_next_db in final mode)
  • 1 to delete all synced change rows from the DB

An extra round-trip for the last delete is a trade-off to favor a shorter stop-the-world time since deleting the previous change records isn't necessary after the switchover.

Documentation

Index

Constants

View Source
const ConnLockQuery = `` /* 375-byte string literal not displayed */

ConnLockQuery will result in a failed assertion if it is unable to get the exec lock or switchover state is use_next_db

View Source
const ConnWaitLockQuery = `` /* 336-byte string literal not displayed */

ConnLockQuery will result in a failed assertion if it is unable to get the exec lock or switchover state is use_next_db

Variables

This section is empty.

Functions

This section is empty.

Types

type LogicalReplicator

type LogicalReplicator struct {
	// contains filtered or unexported fields
}

LogicalReplicator manages synchronizing the source database to the destination database.

func NewLogicalReplicator

func NewLogicalReplicator() *LogicalReplicator

NewLogicalReplicator creates a new LogicalReplicator.

func (*LogicalReplicator) FinalSync

func (l *LogicalReplicator) FinalSync(ctx context.Context) error

FinalSync will sync the source database to the destination database, using the stop-the-world lock and updating switchover_state to use_next_db.

func (*LogicalReplicator) FullInitialSync

func (l *LogicalReplicator) FullInitialSync(ctx context.Context) error

FullInitialSync will insert all rows from the source database into the destination database.

While doing so it will update the rowID maps to track the rows that have been inserted.

func (*LogicalReplicator) LogicalSync

func (l *LogicalReplicator) LogicalSync(ctx context.Context) error

LogicalSync will sync the source database to the destination database as fast as possible.

func (*LogicalReplicator) ResetChangeTracking

func (l *LogicalReplicator) ResetChangeTracking(ctx context.Context) error

ResetChangeTracking disables tracking changes and truncates the tables in the destination database.

func (*LogicalReplicator) SetDestinationDB

func (l *LogicalReplicator) SetDestinationDB(db *pgx.Conn)

SetDestinationDB sets the destination database and must be called before Start.

func (*LogicalReplicator) SetProgressFunc

func (l *LogicalReplicator) SetProgressFunc(fn func(ctx context.Context, format string, args ...interface{}))

SetProgressFunc sets the function to call when progress is made, such as the currently syncing table.

func (*LogicalReplicator) SetSourceDB

func (l *LogicalReplicator) SetSourceDB(db *pgx.Conn)

SetSourceDB sets the source database and must be called before Start.

func (*LogicalReplicator) StartTrackingChanges

func (l *LogicalReplicator) StartTrackingChanges(ctx context.Context) error

StartTrackingChanges instruments and begins tracking changes to the DB.

- Creates the change_log table - Gets the list of tables and sequences to track - Creates the change trigger for each table - Disables triggers in the new DB - Waits for any in-flight transactions to finish (since these may not have picked up the change trigger)

type SequenceSync

type SequenceSync struct {
	// contains filtered or unexported fields
}

SequenceSync is a helper for synchronizing sequences.

func NewSequenceSync

func NewSequenceSync(names []string) *SequenceSync

NewSequenceSync creates a new SequenceSync for the given sequence names.

func (*SequenceSync) AddBatchReads

func (s *SequenceSync) AddBatchReads(b *pgx.Batch)

AddBatchReads queues up select statements to retrieve the current values of the sequences.

func (*SequenceSync) AddBatchWrites

func (s *SequenceSync) AddBatchWrites(b *pgx.Batch)

AddBatchWrites queues up update statements to set the current values of the sequences.

func (*SequenceSync) ScanBatchReads

func (s *SequenceSync) ScanBatchReads(res pgx.BatchResults) error

ScanReads scans the results of the added batch reads.

type TableSync

type TableSync struct {
	// contains filtered or unexported fields
}

TableSync is a helper for syncing tables from the source database to the target database.

func NewTableSync

func NewTableSync(tables []swoinfo.Table) *TableSync

NewTableSync creates a new TableSync for the given tables.

func (*TableSync) AddBatchChangeRead

func (c *TableSync) AddBatchChangeRead(b *pgx.Batch)

AddBatchChangeRead adds a query to the batch to read the changes from the source database.

func (*TableSync) AddBatchRowReads

func (c *TableSync) AddBatchRowReads(b *pgx.Batch)

AddBatchRowReads adds a query to the batch to read all changed rows from the source database.

func (*TableSync) AddBatchWrites

func (c *TableSync) AddBatchWrites(b *pgx.Batch)

func (*TableSync) ExecDeleteChanges

func (c *TableSync) ExecDeleteChanges(ctx context.Context, srcConn *pgx.Conn) (int64, error)

ExecDeleteChanges executes a query to deleted the change_log entries from the source database.

func (*TableSync) HasChanges

func (c *TableSync) HasChanges() bool

HasChanges returns true after ScanBatchChangeRead has been called, if there are changes.

func (*TableSync) ScanBatchChangeRead

func (c *TableSync) ScanBatchChangeRead(res pgx.BatchResults) error

ScanBatchChangeRead scans the results of the change read query.

func (*TableSync) ScanBatchRowReads

func (c *TableSync) ScanBatchRowReads(res pgx.BatchResults) error

ScanBatchRowReads scans the results of the row read queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL