replicator

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GeneratePublicationName

func GeneratePublicationName(group string) string

GeneratePublicationName generates a deterministic publication name based on the group name

func InitializeOIDMap

func InitializeOIDMap(ctx context.Context, conn StandardConnection) error

InitializeOIDMap initializes the OID to type name map with custom types from the database

Types

type BaseReplicator

type BaseReplicator struct {
	Config          Config
	ReplicationConn ReplicationConnection
	StandardConn    StandardConnection
	Relations       map[uint32]*pglogrepl.RelationMessage
	Logger          zerolog.Logger
	TableDetails    map[string][]string
	LastLSN         pglogrepl.LSN
	NATSClient      NATSClient
}

BaseReplicator provides core functionality for PostgreSQL logical replication

func NewBaseReplicator

func NewBaseReplicator(config Config, replicationConn ReplicationConnection, standardConn StandardConnection, natsClient NATSClient) *BaseReplicator

NewBaseReplicator creates a new BaseReplicator instance

func (*BaseReplicator) AddPrimaryKeyInfo

func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)

AddPrimaryKeyInfo adds primary key information to the CDCMessage

func (*BaseReplicator) CheckReplicationSlotExists

func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)

CheckReplicationSlotExists checks if a slot with the given name already exists

func (*BaseReplicator) CheckReplicationSlotStatus

func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error

CheckReplicationSlotStatus checks the status of the replication slot

func (*BaseReplicator) CreatePublication

func (r *BaseReplicator) CreatePublication() error

CreatePublication creates a new publication if it doesn't exist

func (*BaseReplicator) CreateReplicationSlot

func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error

CreateReplicationSlot ensures that a replication slot exists, creating one if necessary

func (*BaseReplicator) GetLastState

func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)

GetLastState retrieves the last saved replication state

func (*BaseReplicator) GracefulShutdown

func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error

GracefulShutdown performs a graceful shutdown of the replicator

func (*BaseReplicator) HandleBeginMessage

func (r *BaseReplicator) HandleBeginMessage(_ *pglogrepl.BeginMessage) error

HandleBeginMessage handles BeginMessage messages

func (*BaseReplicator) HandleCommitMessage

func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error

HandleCommitMessage processes a commit message and publishes it to NATS

func (*BaseReplicator) HandleDeleteMessage

func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error

HandleDeleteMessage handles DeleteMessage messages

func (*BaseReplicator) HandleInsertMessage

func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error

HandleInsertMessage handles InsertMessage messages

func (*BaseReplicator) HandleUpdateMessage

func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error

HandleUpdateMessage handles UpdateMessage messages

func (*BaseReplicator) InitializePrimaryKeyInfo

func (r *BaseReplicator) InitializePrimaryKeyInfo() error

InitializePrimaryKeyInfo initializes primary key information for all tables

func (*BaseReplicator) ProcessNextMessage

func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, standbyMessageTimeout time.Duration) error

ProcessNextMessage handles the next replication message

func (*BaseReplicator) PublishToNATS

func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error

PublishToNATS publishes a message to NATS

func (*BaseReplicator) SaveState

func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error

SaveState saves the current replication state

func (*BaseReplicator) SendStandbyStatusUpdate

func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error

SendStandbyStatusUpdate sends a status update to the primary server

func (*BaseReplicator) StartReplicationFromLSN

func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error

StartReplicationFromLSN initiates the replication process from a given LSN

func (*BaseReplicator) StreamChanges

func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error

StreamChanges continuously processes replication messages

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is a structure that holds data to be flushed periodically or when certain conditions are met

func NewBuffer

func NewBuffer(maxRows int, flushTimeout time.Duration) *Buffer

NewBuffer creates a new Buffer instance

func (*Buffer) Add

func (b *Buffer) Add(item interface{}) bool

Add adds an item to the buffer and returns true if the buffer should be flushed

func (*Buffer) Flush

func (b *Buffer) Flush() []interface{}

Flush flushes the buffer and returns the data

type Config

type Config struct {
	Host     string
	Port     uint16
	Database string
	User     string
	Password string
	Group    string
	Schema   string
	Tables   []string
	TrackDDL bool
}

Config holds the configuration for the replicator

func (Config) ConnectionString

func (c Config) ConnectionString() string

ConnectionString generates and returns a PostgreSQL connection string

type CopyAndStreamReplicator

type CopyAndStreamReplicator struct {
	BaseReplicator
	MaxCopyWorkersPerTable int
	DDLReplicator          DDLReplicator
	CopyOnly               bool
}

CopyAndStreamReplicator implements a replication strategy that first copies existing data and then streams changes.

func (*CopyAndStreamReplicator) CopyTable

func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error

CopyTable copies a single table using multiple workers.

func (*CopyAndStreamReplicator) CopyTableRange

func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, snapshotID string, workerID int) (int64, error)

CopyTableRange copies a range of pages from a table.

func (*CopyAndStreamReplicator) CopyTableWorker

func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, rangesChan <-chan [2]uint32, tableName, snapshotID string, workerID int)

CopyTableWorker is a worker function that copies ranges of pages from a table.

func (*CopyAndStreamReplicator) CopyTables

func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error

CopyTables copies all specified tables in parallel.

func (*CopyAndStreamReplicator) NewBaseReplicator

func (r *CopyAndStreamReplicator) NewBaseReplicator() *BaseReplicator

func (*CopyAndStreamReplicator) ParallelCopy

func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error

ParallelCopy performs a parallel copy of all specified tables.

func (*CopyAndStreamReplicator) StartReplication

func (r *CopyAndStreamReplicator) StartReplication() error

StartReplication begins the replication process.

type DDLReplicator

type DDLReplicator struct {
	DDLConn  StandardConnection
	BaseRepl *BaseReplicator
	Config   Config
}

func NewDDLReplicator

func NewDDLReplicator(config Config, BaseRepl *BaseReplicator, ddlConn StandardConnection) (*DDLReplicator, error)

NewDDLReplicator creates a new DDLReplicator instance

func (*DDLReplicator) Close

func (d *DDLReplicator) Close(ctx context.Context) error

Close closes the DDL connection

func (*DDLReplicator) HasPendingDDLEvents

func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)

HasPendingDDLEvents checks if there are pending DDL events in the log

func (*DDLReplicator) ProcessDDLEvents

func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error

ProcessDDLEvents processes DDL events from the log table

func (*DDLReplicator) SetupDDLTracking

func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error

SetupDDLTracking sets up the necessary schema, table, and triggers for DDL tracking

func (*DDLReplicator) Shutdown

func (d *DDLReplicator) Shutdown(ctx context.Context) error

Shutdown performs a graceful shutdown of the DDL replicator

func (*DDLReplicator) StartDDLReplication

func (d *DDLReplicator) StartDDLReplication(ctx context.Context)

StartDDLReplication starts the DDL replication process

type NATSClient

type NATSClient interface {
	PublishMessage(subject string, data []byte) error
	Close() error
	SaveState(state pgflonats.State) error
	GetState() (pgflonats.State, error)
	JetStream() nats.JetStreamContext
}

type PgxPoolConn

type PgxPoolConn interface {
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	Release()
}

type PgxPoolConnWrapper

type PgxPoolConnWrapper struct {
	*pgxpool.Conn
}

type PostgresReplicationConnection

type PostgresReplicationConnection struct {
	Config Config
	Conn   *pgconn.PgConn
}

PostgresReplicationConnection implements the ReplicationConnection interface for PostgreSQL databases.

func (*PostgresReplicationConnection) Close

Close terminates the connection to the PostgreSQL database.

func (*PostgresReplicationConnection) Connect

Connect establishes a connection to the PostgreSQL database for replication.

func (*PostgresReplicationConnection) CreateReplicationSlot

CreateReplicationSlot creates a new replication slot in the PostgreSQL database.

func (*PostgresReplicationConnection) ReceiveMessage

ReceiveMessage receives a message from the PostgreSQL replication stream.

func (*PostgresReplicationConnection) SendStandbyStatusUpdate

func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error

SendStandbyStatusUpdate sends a status update to the PostgreSQL server during replication.

func (*PostgresReplicationConnection) StartReplication

func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error

StartReplication initiates the replication process from the specified LSN.

type ReplicationConnection

type ReplicationConnection interface {
	Connect(ctx context.Context) error
	Close(ctx context.Context) error
	CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
	StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error
	ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
	SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
}

func NewReplicationConnection

func NewReplicationConnection(config Config) ReplicationConnection

NewReplicationConnection creates a new PostgresReplicationConnection instance.

type ReplicationError

type ReplicationError struct {
	Op  string // The operation that caused the error
	Err error  // The underlying error
}

ReplicationError represents an error that occurred during replication.

func (*ReplicationError) Error

func (e *ReplicationError) Error() string

Error returns a formatted error message.

type Replicator

type Replicator interface {
	StartReplication() error
}

func NewReplicator

func NewReplicator(config Config, natsClient *pgflonats.NATSClient, copyAndStream bool, copyOnly bool, maxCopyWorkersPerTable int) (Replicator, error)

NewReplicator creates a new Replicator based on the configuration

type StandardConnection

type StandardConnection interface {
	Connect(ctx context.Context) error
	Close(ctx context.Context) error
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
	Acquire(ctx context.Context) (PgxPoolConn, error)
}

type StandardConnectionImpl

type StandardConnectionImpl struct {
	// contains filtered or unexported fields
}

StandardConnectionImpl implements the StandardConnection interface for PostgreSQL databases.

func NewStandardConnection

func NewStandardConnection(config Config) (*StandardConnectionImpl, error)

NewStandardConnection creates a new StandardConnectionImpl instance and establishes a connection.

func (*StandardConnectionImpl) Acquire

Acquire acquires a connection from the pool.

func (*StandardConnectionImpl) BeginTx

func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)

BeginTx starts a new transaction with the specified options.

func (*StandardConnectionImpl) Close

Close terminates the connection to the PostgreSQL database.

func (*StandardConnectionImpl) Connect

func (s *StandardConnectionImpl) Connect(ctx context.Context) error

Connect establishes a connection to the PostgreSQL database.

func (*StandardConnectionImpl) Exec

func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

Exec executes a SQL query without returning any rows.

func (*StandardConnectionImpl) Query

func (s *StandardConnectionImpl) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query that returns rows, typically a SELECT.

func (*StandardConnectionImpl) QueryRow

func (s *StandardConnectionImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that is expected to return at most one row.

type StreamReplicator

type StreamReplicator struct {
	BaseReplicator
	DDLReplicator DDLReplicator
}

func (*StreamReplicator) StartReplication

func (r *StreamReplicator) StartReplication() error

StartReplication begins the replication process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL