logrepl

package
v0.0.0-...-be912e8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

README

The code in this directory was copied and modified from the DoltgreSQL project (as of 2024-11-08, https://github.com/dolthub/doltgresql/blob/main/server). The original code is licensed under the Apache License, Version 2.0. The modifications are also licensed under the Apache License, Version 2.0.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreatePublicationIfNotExists

func CreatePublicationIfNotExists(primaryDns, publicationName string) error

CreatePublicationIfNotExists creates a publication with the given name if it does not already exist. Mostly useful for testing. Customers should run the CREATE PUBLICATION command on their primary server manually, specifying whichever tables they want to replicate.

func CreateSubscription

func CreateSubscription(ctx *sql.Context, name, conn, pub, lsn string, enabled bool) error

func DeleteSubscription

func DeleteSubscription(ctx *sql.Context, name string) error

func DropPublication

func DropPublication(primaryDns, slotName string) error

DropPublication drops the publication with the given name if it exists. Mostly useful for testing.

func SelectSubscriptionLsn

func SelectSubscriptionLsn(ctx *sql.Context, subscription string) (pglogrepl.LSN, error)

func UpdateAllSubscriptionStatus

func UpdateAllSubscriptionStatus(ctx *sql.Context, enabled bool) error

func UpdateSubscriptionLsn

func UpdateSubscriptionLsn(ctx *sql.Context, lsn, name string) error

func UpdateSubscriptionStatus

func UpdateSubscriptionStatus(ctx *sql.Context, enabled bool, name string) error

func UpdateSubscriptions

func UpdateSubscriptions(ctx *sql.Context) error

Types

type LogicalReplicator

type LogicalReplicator struct {
	// contains filtered or unexported fields
}

func NewLogicalReplicator

func NewLogicalReplicator(subscription, primaryDns string) (*LogicalReplicator, error)

NewLogicalReplicator creates a new logical replicator instance which connects to the primary and replication databases using the connection strings provided. The connection to the replica is established immediately, and the connection to the primary is established when StartReplication is called.

func (*LogicalReplicator) CaughtUp

func (r *LogicalReplicator) CaughtUp(threshold int) (bool, error)

CaughtUp returns true if the replication slot is caught up to the primary, and false otherwise. This only works if there is only a single replication slot on the primary, so it's only suitable for testing. This method uses a threshold value to determine if the primary considers us caught up. This corresponds to the maximum number of bytes that the primary is ahead of the replica's last flush position. This rarely is zero when caught up, since the primary often sends additional WAL records after the last WAL location that was flushed to the replica. These additional WAL locations cannot be recorded as flushed since they don't result in writes to the replica, and could result in the primary not sending us necessary records after a shutdown and restart.

func (*LogicalReplicator) CreateReplicationSlotIfNotExists

func (r *LogicalReplicator) CreateReplicationSlotIfNotExists(slotName string) error

CreateReplicationSlotIfNotExists creates the replication slot named if it doesn't already exist.

func (*LogicalReplicator) DropReplicationSlotIfExists

func (r *LogicalReplicator) DropReplicationSlotIfExists(slotName string) error

DropReplicationSlotIfExists drops the replication slot with the given name. Any error from the slot not existing is ignored.

func (*LogicalReplicator) PrimaryDns

func (r *LogicalReplicator) PrimaryDns() string

PrimaryDns returns the DNS for the primary database. Not suitable for RPCs used in replication e.g. StartReplication. See ReplicationDns.

func (*LogicalReplicator) ReplicationDns

func (r *LogicalReplicator) ReplicationDns() string

ReplicationDns returns the DNS for the primary database with the replication query parameter appended. Not suitable for normal query RPCs.

func (*LogicalReplicator) Running

func (r *LogicalReplicator) Running() bool

Running returns whether replication is currently running

func (*LogicalReplicator) StartReplication

func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName string) error

StartReplication starts the replication process for the given slot name. This function blocks until replication is stopped via the Stop method, or an error occurs.

func (*LogicalReplicator) Stop

func (r *LogicalReplicator) Stop()

Stop stops the replication process and blocks until clean shutdown occurs.

type Subscription

type Subscription struct {
	Subscription string
	Conn         string
	Publication  string
	LsnStr       string
	Enabled      bool
	Replicator   *LogicalReplicator
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL