Documentation ¶
Index ¶
- func CreatePublicationIfNotExists(primaryDns, publicationName string) error
- func CreateSubscription(ctx *sql.Context, name, conn, pub, lsn string, enabled bool) error
- func DeleteSubscription(ctx *sql.Context, name string) error
- func DropPublication(primaryDns, slotName string) error
- func SelectSubscriptionLsn(ctx *sql.Context, subscription string) (pglogrepl.LSN, error)
- func UpdateAllSubscriptionStatus(ctx *sql.Context, enabled bool) error
- func UpdateSubscriptionLsn(ctx *sql.Context, lsn, name string) error
- func UpdateSubscriptionStatus(ctx *sql.Context, enabled bool, name string) error
- func UpdateSubscriptions(ctx *sql.Context) error
- type LogicalReplicator
- func (r *LogicalReplicator) CaughtUp(threshold int) (bool, error)
- func (r *LogicalReplicator) CreateReplicationSlotIfNotExists(slotName string) error
- func (r *LogicalReplicator) DropReplicationSlotIfExists(slotName string) error
- func (r *LogicalReplicator) PrimaryDns() string
- func (r *LogicalReplicator) ReplicationDns() string
- func (r *LogicalReplicator) Running() bool
- func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName string) error
- func (r *LogicalReplicator) Stop()
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreatePublicationIfNotExists ¶
CreatePublicationIfNotExists creates a publication with the given name if it does not already exist. Mostly useful for testing. Customers should run the CREATE PUBLICATION command on their primary server manually, specifying whichever tables they want to replicate.
func CreateSubscription ¶
func DropPublication ¶
DropPublication drops the publication with the given name if it exists. Mostly useful for testing.
func SelectSubscriptionLsn ¶
func UpdateSubscriptions ¶
Types ¶
type LogicalReplicator ¶
type LogicalReplicator struct {
// contains filtered or unexported fields
}
func NewLogicalReplicator ¶
func NewLogicalReplicator(subscription, primaryDns string) (*LogicalReplicator, error)
NewLogicalReplicator creates a new logical replicator instance which connects to the primary and replication databases using the connection strings provided. The connection to the replica is established immediately, and the connection to the primary is established when StartReplication is called.
func (*LogicalReplicator) CaughtUp ¶
func (r *LogicalReplicator) CaughtUp(threshold int) (bool, error)
CaughtUp returns true if the replication slot is caught up to the primary, and false otherwise. This only works if there is only a single replication slot on the primary, so it's only suitable for testing. This method uses a threshold value to determine if the primary considers us caught up. This corresponds to the maximum number of bytes that the primary is ahead of the replica's last flush position. This rarely is zero when caught up, since the primary often sends additional WAL records after the last WAL location that was flushed to the replica. These additional WAL locations cannot be recorded as flushed since they don't result in writes to the replica, and could result in the primary not sending us necessary records after a shutdown and restart.
func (*LogicalReplicator) CreateReplicationSlotIfNotExists ¶
func (r *LogicalReplicator) CreateReplicationSlotIfNotExists(slotName string) error
CreateReplicationSlotIfNotExists creates the replication slot named if it doesn't already exist.
func (*LogicalReplicator) DropReplicationSlotIfExists ¶
func (r *LogicalReplicator) DropReplicationSlotIfExists(slotName string) error
DropReplicationSlotIfExists drops the replication slot with the given name. Any error from the slot not existing is ignored.
func (*LogicalReplicator) PrimaryDns ¶
func (r *LogicalReplicator) PrimaryDns() string
PrimaryDns returns the DNS for the primary database. Not suitable for RPCs used in replication e.g. StartReplication. See ReplicationDns.
func (*LogicalReplicator) ReplicationDns ¶
func (r *LogicalReplicator) ReplicationDns() string
ReplicationDns returns the DNS for the primary database with the replication query parameter appended. Not suitable for normal query RPCs.
func (*LogicalReplicator) Running ¶
func (r *LogicalReplicator) Running() bool
Running returns whether replication is currently running
func (*LogicalReplicator) StartReplication ¶
func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName string) error
StartReplication starts the replication process for the given slot name. This function blocks until replication is stopped via the Stop method, or an error occurs.
func (*LogicalReplicator) Stop ¶
func (r *LogicalReplicator) Stop()
Stop stops the replication process and blocks until clean shutdown occurs.