Documentation ¶
Index ¶
- func GeneratePublicationName(group string) string
- func InitializeOIDMap(ctx context.Context, conn StandardConnection) error
- type BaseReplicator
- func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)
- func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)
- func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error
- func (r *BaseReplicator) CreatePublication() error
- func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error
- func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)
- func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error
- func (r *BaseReplicator) HandleBeginMessage(_ *pglogrepl.BeginMessage) error
- func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error
- func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) InitializePrimaryKeyInfo() error
- func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, ...) error
- func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error
- func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error
- func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error
- func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error
- func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error
- type Buffer
- type Config
- type CopyAndStreamReplicator
- func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error
- func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, ...) (int64, error)
- func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, ...)
- func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error
- func (r *CopyAndStreamReplicator) NewBaseReplicator() *BaseReplicator
- func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error
- func (r *CopyAndStreamReplicator) StartReplication() error
- type DDLReplicator
- func (d *DDLReplicator) Close(ctx context.Context) error
- func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)
- func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error
- func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error
- func (d *DDLReplicator) Shutdown(ctx context.Context) error
- func (d *DDLReplicator) StartDDLReplication(ctx context.Context)
- type NATSClient
- type PgxPoolConn
- type PgxPoolConnWrapper
- type PostgresReplicationConnection
- func (rc *PostgresReplicationConnection) Close(ctx context.Context) error
- func (rc *PostgresReplicationConnection) Connect(ctx context.Context) error
- func (rc *PostgresReplicationConnection) CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
- func (rc *PostgresReplicationConnection) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
- func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
- func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, ...) error
- type ReplicationConnection
- type ReplicationError
- type Replicator
- type StandardConnection
- type StandardConnectionImpl
- func (s *StandardConnectionImpl) Acquire(ctx context.Context) (PgxPoolConn, error)
- func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (s *StandardConnectionImpl) Close(_ context.Context) error
- func (s *StandardConnectionImpl) Connect(ctx context.Context) error
- func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
- func (s *StandardConnectionImpl) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (s *StandardConnectionImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- type StreamReplicator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GeneratePublicationName ¶
GeneratePublicationName generates a deterministic publication name based on the group name
func InitializeOIDMap ¶
func InitializeOIDMap(ctx context.Context, conn StandardConnection) error
InitializeOIDMap initializes the OID to type name map with custom types from the database
Types ¶
type BaseReplicator ¶
type BaseReplicator struct { Config Config ReplicationConn ReplicationConnection StandardConn StandardConnection Relations map[uint32]*pglogrepl.RelationMessage Logger zerolog.Logger TableDetails map[string][]string LastLSN pglogrepl.LSN NATSClient NATSClient }
BaseReplicator provides core functionality for PostgreSQL logical replication
func NewBaseReplicator ¶
func NewBaseReplicator(config Config, replicationConn ReplicationConnection, standardConn StandardConnection, natsClient NATSClient) *BaseReplicator
NewBaseReplicator creates a new BaseReplicator instance
func (*BaseReplicator) AddPrimaryKeyInfo ¶
func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)
AddPrimaryKeyInfo adds primary key information to the CDCMessage
func (*BaseReplicator) CheckReplicationSlotExists ¶
func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)
CheckReplicationSlotExists checks if a slot with the given name already exists
func (*BaseReplicator) CheckReplicationSlotStatus ¶
func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error
CheckReplicationSlotStatus checks the status of the replication slot
func (*BaseReplicator) CreatePublication ¶
func (r *BaseReplicator) CreatePublication() error
CreatePublication creates a new publication if it doesn't exist
func (*BaseReplicator) CreateReplicationSlot ¶
func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error
CreateReplicationSlot ensures that a replication slot exists, creating one if necessary
func (*BaseReplicator) GetLastState ¶
func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)
GetLastState retrieves the last saved replication state
func (*BaseReplicator) GracefulShutdown ¶
func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error
GracefulShutdown performs a graceful shutdown of the replicator
func (*BaseReplicator) HandleBeginMessage ¶
func (r *BaseReplicator) HandleBeginMessage(_ *pglogrepl.BeginMessage) error
HandleBeginMessage handles BeginMessage messages
func (*BaseReplicator) HandleCommitMessage ¶
func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error
HandleCommitMessage processes a commit message and publishes it to NATS
func (*BaseReplicator) HandleDeleteMessage ¶
func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error
HandleDeleteMessage handles DeleteMessage messages
func (*BaseReplicator) HandleInsertMessage ¶
func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error
HandleInsertMessage handles InsertMessage messages
func (*BaseReplicator) HandleUpdateMessage ¶
func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error
HandleUpdateMessage handles UpdateMessage messages
func (*BaseReplicator) InitializePrimaryKeyInfo ¶
func (r *BaseReplicator) InitializePrimaryKeyInfo() error
InitializePrimaryKeyInfo initializes primary key information for all tables
func (*BaseReplicator) ProcessNextMessage ¶
func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, standbyMessageTimeout time.Duration) error
ProcessNextMessage handles the next replication message
func (*BaseReplicator) PublishToNATS ¶
func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error
PublishToNATS publishes a message to NATS
func (*BaseReplicator) SaveState ¶
func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error
SaveState saves the current replication state
func (*BaseReplicator) SendStandbyStatusUpdate ¶
func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error
SendStandbyStatusUpdate sends a status update to the primary server
func (*BaseReplicator) StartReplicationFromLSN ¶
func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error
StartReplicationFromLSN initiates the replication process from a given LSN
func (*BaseReplicator) StreamChanges ¶
func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error
StreamChanges continuously processes replication messages
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is a structure that holds data to be flushed periodically or when certain conditions are met
type Config ¶
type Config struct { Host string Port uint16 Database string User string Password string Group string Schema string Tables []string TrackDDL bool }
Config holds the configuration for the replicator
func (Config) ConnectionString ¶
ConnectionString generates and returns a PostgreSQL connection string
type CopyAndStreamReplicator ¶
type CopyAndStreamReplicator struct { BaseReplicator MaxCopyWorkersPerTable int DDLReplicator DDLReplicator CopyOnly bool }
CopyAndStreamReplicator implements a replication strategy that first copies existing data and then streams changes.
func (*CopyAndStreamReplicator) CopyTable ¶
func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error
CopyTable copies a single table using multiple workers.
func (*CopyAndStreamReplicator) CopyTableRange ¶
func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, snapshotID string, workerID int) (int64, error)
CopyTableRange copies a range of pages from a table.
func (*CopyAndStreamReplicator) CopyTableWorker ¶
func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, rangesChan <-chan [2]uint32, tableName, snapshotID string, workerID int)
CopyTableWorker is a worker function that copies ranges of pages from a table.
func (*CopyAndStreamReplicator) CopyTables ¶
func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error
CopyTables copies all specified tables in parallel.
func (*CopyAndStreamReplicator) NewBaseReplicator ¶
func (r *CopyAndStreamReplicator) NewBaseReplicator() *BaseReplicator
func (*CopyAndStreamReplicator) ParallelCopy ¶
func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error
ParallelCopy performs a parallel copy of all specified tables.
func (*CopyAndStreamReplicator) StartReplication ¶
func (r *CopyAndStreamReplicator) StartReplication() error
StartReplication begins the replication process.
type DDLReplicator ¶
type DDLReplicator struct { DDLConn StandardConnection BaseRepl *BaseReplicator Config Config }
func NewDDLReplicator ¶
func NewDDLReplicator(config Config, BaseRepl *BaseReplicator, ddlConn StandardConnection) (*DDLReplicator, error)
NewDDLReplicator creates a new DDLReplicator instance
func (*DDLReplicator) Close ¶
func (d *DDLReplicator) Close(ctx context.Context) error
Close closes the DDL connection
func (*DDLReplicator) HasPendingDDLEvents ¶
func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)
HasPendingDDLEvents checks if there are pending DDL events in the log
func (*DDLReplicator) ProcessDDLEvents ¶
func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error
ProcessDDLEvents processes DDL events from the log table
func (*DDLReplicator) SetupDDLTracking ¶
func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error
SetupDDLTracking sets up the necessary schema, table, and triggers for DDL tracking
func (*DDLReplicator) Shutdown ¶
func (d *DDLReplicator) Shutdown(ctx context.Context) error
Shutdown performs a graceful shutdown of the DDL replicator
func (*DDLReplicator) StartDDLReplication ¶
func (d *DDLReplicator) StartDDLReplication(ctx context.Context)
StartDDLReplication starts the DDL replication process
type NATSClient ¶
type PgxPoolConn ¶
type PgxPoolConn interface { BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row Release() }
type PgxPoolConnWrapper ¶
type PostgresReplicationConnection ¶
PostgresReplicationConnection implements the ReplicationConnection interface for PostgreSQL databases.
func (*PostgresReplicationConnection) Close ¶
func (rc *PostgresReplicationConnection) Close(ctx context.Context) error
Close terminates the connection to the PostgreSQL database.
func (*PostgresReplicationConnection) Connect ¶
func (rc *PostgresReplicationConnection) Connect(ctx context.Context) error
Connect establishes a connection to the PostgreSQL database for replication.
func (*PostgresReplicationConnection) CreateReplicationSlot ¶
func (rc *PostgresReplicationConnection) CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
CreateReplicationSlot creates a new replication slot in the PostgreSQL database.
func (*PostgresReplicationConnection) ReceiveMessage ¶
func (rc *PostgresReplicationConnection) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
ReceiveMessage receives a message from the PostgreSQL replication stream.
func (*PostgresReplicationConnection) SendStandbyStatusUpdate ¶
func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
SendStandbyStatusUpdate sends a status update to the PostgreSQL server during replication.
func (*PostgresReplicationConnection) StartReplication ¶
func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error
StartReplication initiates the replication process from the specified LSN.
type ReplicationConnection ¶
type ReplicationConnection interface { Connect(ctx context.Context) error Close(ctx context.Context) error CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error }
func NewReplicationConnection ¶
func NewReplicationConnection(config Config) ReplicationConnection
NewReplicationConnection creates a new PostgresReplicationConnection instance.
type ReplicationError ¶
type ReplicationError struct { Op string // The operation that caused the error Err error // The underlying error }
ReplicationError represents an error that occurred during replication.
func (*ReplicationError) Error ¶
func (e *ReplicationError) Error() string
Error returns a formatted error message.
type Replicator ¶
type Replicator interface {
StartReplication() error
}
func NewReplicator ¶
func NewReplicator(config Config, natsClient *pgflonats.NATSClient, copyAndStream bool, copyOnly bool, maxCopyWorkersPerTable int) (Replicator, error)
NewReplicator creates a new Replicator based on the configuration
type StandardConnection ¶
type StandardConnection interface { Connect(ctx context.Context) error Close(ctx context.Context) error Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) Acquire(ctx context.Context) (PgxPoolConn, error) }
type StandardConnectionImpl ¶
type StandardConnectionImpl struct {
// contains filtered or unexported fields
}
StandardConnectionImpl implements the StandardConnection interface for PostgreSQL databases.
func NewStandardConnection ¶
func NewStandardConnection(config Config) (*StandardConnectionImpl, error)
NewStandardConnection creates a new StandardConnectionImpl instance and establishes a connection.
func (*StandardConnectionImpl) Acquire ¶
func (s *StandardConnectionImpl) Acquire(ctx context.Context) (PgxPoolConn, error)
Acquire acquires a connection from the pool.
func (*StandardConnectionImpl) BeginTx ¶
func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
BeginTx starts a new transaction with the specified options.
func (*StandardConnectionImpl) Close ¶
func (s *StandardConnectionImpl) Close(_ context.Context) error
Close terminates the connection to the PostgreSQL database.
func (*StandardConnectionImpl) Connect ¶
func (s *StandardConnectionImpl) Connect(ctx context.Context) error
Connect establishes a connection to the PostgreSQL database.
func (*StandardConnectionImpl) Exec ¶
func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Exec executes a SQL query without returning any rows.
type StreamReplicator ¶
type StreamReplicator struct { BaseReplicator DDLReplicator DDLReplicator }
func (*StreamReplicator) StartReplication ¶
func (r *StreamReplicator) StartReplication() error
StartReplication begins the replication process.