pglogrepl

package module
v0.0.0-...-13cd64d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2024 License: MIT Imports: 11 Imported by: 0

README

Build Status

pglogrepl

pglogrepl is a Go package for PostgreSQL logical replication.

pglogrepl uses package github.com/jackc/pgx/v5/pgconn as its underlying PostgreSQL connection.

Proper use of this package requires understanding the underlying PostgreSQL concepts. See https://www.postgresql.org/docs/current/protocol-replication.html.

Example

In example/pglogrepl_demo, there is an example demo program that connects to a database and logs all messages sent over logical replication. In example/pgphysrepl_demo, there is an example demo program that connects to a database and logs all messages sent over physical replication.

Testing

Testing requires a user with replication permission, a database to replicate, access allowed in pg_hba.conf, and logical replication enabled in postgresql.conf.

Create a database:

create database pglogrepl;

Create a user:

create user pglogrepl with replication password 'secret';

If you're using PostgreSQL 15 or newer grant access to the public schema, just for these tests:

grant all on schema public to pglogrepl;

Add a replication line to your pg_hba.conf:

host replication pglogrepl 127.0.0.1/32 md5

Change the following settings in your postgresql.conf:

wal_level=logical
max_wal_senders=5
max_replication_slots=5

To run the tests set PGLOGREPL_TEST_CONN_STRING environment variable with a replication connection string (URL or DSN).

Since the base backup would request postgres to create a backup tar and stream it, this test cn be disabled with

PGLOGREPL_SKIP_BASE_BACKUP=true

Example:

PGLOGREPL_TEST_CONN_STRING=postgres://pglogrepl:secret@127.0.0.1/pglogrepl?replication=database go test

Documentation

Overview

Package pglogrepl implements PostgreSQL logical replication client functionality.

pglogrepl uses package github.com/jackc/pgconn as its underlying PostgreSQL connection. Use pgconn to establish a connection to PostgreSQL and then use the pglogrepl functions on that connection.

Proper use of this package requires understanding the underlying PostgreSQL concepts. See https://www.postgresql.org/docs/current/protocol-replication.html.

Index

Constants

View Source
const (
	TupleDataTypeNull   = uint8('n')
	TupleDataTypeToast  = uint8('u')
	TupleDataTypeText   = uint8('t')
	TupleDataTypeBinary = uint8('b')
)

List of types of data in a tuple.

View Source
const (
	UpdateMessageTupleTypeNone = uint8(0)
	UpdateMessageTupleTypeKey  = uint8('K')
	UpdateMessageTupleTypeOld  = uint8('O')
	UpdateMessageTupleTypeNew  = uint8('N')
)

List of types of UpdateMessage tuples.

View Source
const (
	DeleteMessageTupleTypeKey = uint8('K')
	DeleteMessageTupleTypeOld = uint8('O')
)

List of types of DeleteMessage tuples.

View Source
const (
	TruncateOptionCascade = uint8(1) << iota
	TruncateOptionRestartIdentity
)

List of truncate options.

View Source
const (
	XLogDataByteID                = 'w'
	PrimaryKeepaliveMessageByteID = 'k'
	StandbyStatusUpdateByteID     = 'r'
)

Variables

This section is empty.

Functions

func DropReplicationSlot

func DropReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slotName string, options DropReplicationSlotOptions) error

DropReplicationSlot drops a logical replication slot.

func NextTableSpace

func NextTableSpace(ctx context.Context, conn *pgconn.PgConn) (err error)

NextTableSpace consumes some msgs so we are at start of CopyData

func SendStandbyStatusUpdate

func SendStandbyStatusUpdate(_ context.Context, conn *pgconn.PgConn, ssu StandbyStatusUpdate) error

SendStandbyStatusUpdate sends a StandbyStatusUpdate to the PostgreSQL server.

The only required field in ssu is WALWritePosition. If WALFlushPosition is 0 then WALWritePosition will be assigned to it. If WALApplyPosition is 0 then WALWritePosition will be assigned to it. If ClientTime is the zero value then the current time will be assigned to it.

func StartReplication

func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string, startLSN LSN, options StartReplicationOptions) error

StartReplication begins the replication process by executing the START_REPLICATION command.

Types

type BaseBackupOptions

type BaseBackupOptions struct {
	// Request information required to generate a progress report, but might as such have a negative impact on the performance.
	Progress bool
	// Sets the label of the backup. If none is specified, a backup label of 'wal-g' will be used.
	Label string
	// Request a fast checkpoint.
	Fast bool
	// Include the necessary WAL segments in the backup. This will include all the files between start and stop backup in the pg_wal directory of the base directory tar file.
	WAL bool
	// By default, the backup will wait until the last required WAL segment has been archived, or emit a warning if log archiving is not enabled.
	// Specifying NOWAIT disables both the waiting and the warning, leaving the client responsible for ensuring the required log is available.
	NoWait bool
	// Limit (throttle) the maximum amount of data transferred from server to client per unit of time (kb/s).
	MaxRate int32
	// Include information about symbolic links present in the directory pg_tblspc in a file named tablespace_map.
	TablespaceMap bool
	// Disable checksums being verified during a base backup.
	// Note that NoVerifyChecksums=true is only supported since PG11
	NoVerifyChecksums bool
}

type BaseBackupResult

type BaseBackupResult struct {
	LSN         LSN
	TimelineID  int32
	Tablespaces []BaseBackupTablespace
}

BaseBackupResult will hold the return values of the BaseBackup command

func FinishBaseBackup

func FinishBaseBackup(ctx context.Context, conn *pgconn.PgConn) (result BaseBackupResult, err error)

FinishBaseBackup wraps up a backup after copying all results from the BASE_BACKUP command.

func StartBaseBackup

func StartBaseBackup(ctx context.Context, conn *pgconn.PgConn, options BaseBackupOptions) (result BaseBackupResult, err error)

StartBaseBackup begins the process for copying a basebackup by executing the BASE_BACKUP command.

type BaseBackupTablespace

type BaseBackupTablespace struct {
	OID      int32
	Location string
	Size     int8
}

BaseBackupTablespace represents a tablespace in the backup

type BeginMessage

type BeginMessage struct {

	//FinalLSN is the final LSN of the transaction.
	FinalLSN LSN
	// CommitTime is the commit timestamp of the transaction.
	CommitTime time.Time
	// Xid of the transaction.
	Xid uint32
	// contains filtered or unexported fields
}

BeginMessage is a begin message.

func (*BeginMessage) Decode

func (m *BeginMessage) Decode(src []byte) error

Decode decodes the message from src.

func (*BeginMessage) SetType

func (m *BeginMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*BeginMessage) Type

func (m *BeginMessage) Type() MessageType

Type returns message type.

type CommitMessage

type CommitMessage struct {

	// Flags currently unused (must be 0).
	Flags uint8
	// CommitLSN is the LSN of the commit.
	CommitLSN LSN
	// TransactionEndLSN is the end LSN of the transaction.
	TransactionEndLSN LSN
	// CommitTime is the commit timestamp of the transaction
	CommitTime time.Time
	// contains filtered or unexported fields
}

CommitMessage is a commit message.

func (*CommitMessage) Decode

func (m *CommitMessage) Decode(src []byte) error

Decode decodes the message from src.

func (*CommitMessage) SetType

func (m *CommitMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*CommitMessage) Type

func (m *CommitMessage) Type() MessageType

Type returns message type.

type CopyDoneResult

type CopyDoneResult struct {
	Timeline int32
	LSN      LSN
}

CopyDoneResult is the parsed result as returned by the server after the client sends a CopyDone to the server to confirm ending the copy-both mode.

func SendStandbyCopyDone

func SendStandbyCopyDone(_ context.Context, conn *pgconn.PgConn) (cdr *CopyDoneResult, err error)

SendStandbyCopyDone sends a StandbyCopyDone to the PostgreSQL server to confirm ending the copy-both mode.

type CreateReplicationSlotOptions

type CreateReplicationSlotOptions struct {
	Temporary      bool
	SnapshotAction string
	Mode           ReplicationMode
}

type CreateReplicationSlotResult

type CreateReplicationSlotResult struct {
	SlotName        string
	ConsistentPoint string
	SnapshotName    string
	OutputPlugin    string
}

CreateReplicationSlotResult is the parsed results the CREATE_REPLICATION_SLOT command.

func CreateReplicationSlot

func CreateReplicationSlot(
	ctx context.Context,
	conn *pgconn.PgConn,
	slotName string,
	outputPlugin string,
	options CreateReplicationSlotOptions,
) (CreateReplicationSlotResult, error)

CreateReplicationSlot creates a logical replication slot.

func ParseCreateReplicationSlot

func ParseCreateReplicationSlot(mrr *pgconn.MultiResultReader) (CreateReplicationSlotResult, error)

ParseCreateReplicationSlot parses the result of the CREATE_REPLICATION_SLOT command.

type DeleteMessage

type DeleteMessage struct {
	RelationID uint32
	// OldTupleType
	//   Byte1('K'):
	//     Identifies the following TupleData submessage as a key.
	//     This field is present if the table in which the delete has happened uses an index
	//     as REPLICA IDENTITY.
	//
	//   Byte1('O')
	//     Identifies the following TupleData message as an old tuple.
	//     This field is present if the table in which the delete has happened has
	//     REPLICA IDENTITY set to FULL.
	//
	// The Delete message may contain either a 'K' message part or an 'O' message part,
	// but never both of them.
	OldTupleType uint8
	OldTuple     *TupleData
	// contains filtered or unexported fields
}

DeleteMessage is a delete message.

func (*DeleteMessage) Decode

func (m *DeleteMessage) Decode(src []byte) (err error)

Decode decodes a message from src.

func (*DeleteMessage) SetType

func (m *DeleteMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*DeleteMessage) Type

func (m *DeleteMessage) Type() MessageType

Type returns message type.

type DropReplicationSlotOptions

type DropReplicationSlotOptions struct {
	Wait bool
}

type IdentifySystemResult

type IdentifySystemResult struct {
	SystemID string
	Timeline int32
	XLogPos  LSN
	DBName   string
}

IdentifySystemResult is the parsed result of the IDENTIFY_SYSTEM command.

func IdentifySystem

func IdentifySystem(ctx context.Context, conn *pgconn.PgConn) (IdentifySystemResult, error)

IdentifySystem executes the IDENTIFY_SYSTEM command.

func ParseIdentifySystem

func ParseIdentifySystem(mrr *pgconn.MultiResultReader) (IdentifySystemResult, error)

ParseIdentifySystem parses the result of the IDENTIFY_SYSTEM command.

type InsertMessage

type InsertMessage struct {

	// RelationID is the ID of the relation corresponding to the ID in the relation message.
	RelationID uint32
	Tuple      *TupleData
	// contains filtered or unexported fields
}

InsertMessage is a insert message

func (*InsertMessage) Decode

func (m *InsertMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*InsertMessage) SetType

func (m *InsertMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*InsertMessage) Type

func (m *InsertMessage) Type() MessageType

Type returns message type.

type LSN

type LSN uint64

LSN is a PostgreSQL Log Sequence Number. See https://www.postgresql.org/docs/current/datatype-pg-lsn.html.

func ParseLSN

func ParseLSN(s string) (LSN, error)

ParseLSN parses the given XXX/XXX text format LSN used by PostgreSQL.

func (*LSN) Scan

func (lsn *LSN) Scan(src interface{}) error

Scan implements the Scanner interface.

func (LSN) String

func (lsn LSN) String() string

String formats the LSN value into the XXX/XXX format which is the text format used by PostgreSQL.

func (LSN) Value

func (lsn LSN) Value() (driver.Value, error)

Value implements the Valuer interface.

type LogicalDecodingMessage

type LogicalDecodingMessage struct {
	LSN           LSN
	Transactional bool
	Prefix        string
	Content       []byte
	// contains filtered or unexported fields
}

LogicalDecodingMessage is a logical decoding message.

func (*LogicalDecodingMessage) Decode

func (m *LogicalDecodingMessage) Decode(src []byte) (err error)

Decode decodes a message from src.

func (*LogicalDecodingMessage) SetType

func (m *LogicalDecodingMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*LogicalDecodingMessage) Type

func (m *LogicalDecodingMessage) Type() MessageType

Type returns message type.

type Message

type Message interface {
	Type() MessageType
}

Message is a message received from server.

func Parse

func Parse(data []byte) (m Message, err error)

Parse parse a logical replication message.

type MessageDecoder

type MessageDecoder interface {
	Decode([]byte) error
}

MessageDecoder decodes message into struct.

type MessageType

type MessageType uint8

MessageType indicates the type of a logical replication message.

const (
	MessageTypeBegin    MessageType = 'B'
	MessageTypeMessage  MessageType = 'M'
	MessageTypeCommit   MessageType = 'C'
	MessageTypeOrigin   MessageType = 'O'
	MessageTypeRelation MessageType = 'R'
	MessageTypeType     MessageType = 'Y'
	MessageTypeInsert   MessageType = 'I'
	MessageTypeUpdate   MessageType = 'U'
	MessageTypeDelete   MessageType = 'D'
	MessageTypeTruncate MessageType = 'T'
)

List of types of logical replication messages.

func (MessageType) String

func (t MessageType) String() string

type OriginMessage

type OriginMessage struct {

	// CommitLSN is the LSN of the commit on the origin server.
	CommitLSN LSN
	Name      string
	// contains filtered or unexported fields
}

OriginMessage is an origin message.

func (*OriginMessage) Decode

func (m *OriginMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*OriginMessage) SetType

func (m *OriginMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*OriginMessage) Type

func (m *OriginMessage) Type() MessageType

Type returns message type.

type PrimaryKeepaliveMessage

type PrimaryKeepaliveMessage struct {
	ServerWALEnd   LSN
	ServerTime     time.Time
	ReplyRequested bool
}

func ParsePrimaryKeepaliveMessage

func ParsePrimaryKeepaliveMessage(buf []byte) (PrimaryKeepaliveMessage, error)

ParsePrimaryKeepaliveMessage parses a Primary keepalive message from the server.

type RelationMessage

type RelationMessage struct {
	RelationID      uint32
	Namespace       string
	RelationName    string
	ReplicaIdentity uint8
	ColumnNum       uint16
	Columns         []*RelationMessageColumn
	// contains filtered or unexported fields
}

RelationMessage is a relation message.

func (*RelationMessage) Decode

func (m *RelationMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*RelationMessage) SetType

func (m *RelationMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*RelationMessage) Type

func (m *RelationMessage) Type() MessageType

Type returns message type.

type RelationMessageColumn

type RelationMessageColumn struct {
	// Flags for the column. Currently, it can be either 0 for no flags or 1 which marks the column as part of the key.
	Flags uint8

	Name string

	// DataType is the ID of the column's data type.
	DataType uint32

	// TypeModifier is type modifier of the column (atttypmod).
	TypeModifier int32
}

RelationMessageColumn is one column in a RelationMessage.

type ReplicationMode

type ReplicationMode int
const (
	LogicalReplication ReplicationMode = iota
	PhysicalReplication
)

func (ReplicationMode) String

func (mode ReplicationMode) String() string

String formats the mode into a postgres valid string

type StandbyStatusUpdate

type StandbyStatusUpdate struct {
	WALWritePosition LSN       // The WAL position that's been locally written
	WALFlushPosition LSN       // The WAL position that's been locally flushed
	WALApplyPosition LSN       // The WAL position that's been locally applied
	ClientTime       time.Time // Client system clock time
	ReplyRequested   bool      // Request server to reply immediately.
}

StandbyStatusUpdate is a message sent from the client that acknowledges receipt of WAL records.

type StartReplicationOptions

type StartReplicationOptions struct {
	Timeline   int32 // 0 means current server timeline
	Mode       ReplicationMode
	PluginArgs []string
}

type TimelineHistoryResult

type TimelineHistoryResult struct {
	FileName string
	Content  []byte
}

TimelineHistoryResult is the parsed result of the TIMELINE_HISTORY command.

func ParseTimelineHistory

func ParseTimelineHistory(mrr *pgconn.MultiResultReader) (TimelineHistoryResult, error)

ParseTimelineHistory parses the result of the TIMELINE_HISTORY command.

func TimelineHistory

func TimelineHistory(ctx context.Context, conn *pgconn.PgConn, timeline int32) (TimelineHistoryResult, error)

TimelineHistory executes the TIMELINE_HISTORY command.

type TruncateMessage

type TruncateMessage struct {
	RelationNum uint32
	Option      uint8
	RelationIDs []uint32
	// contains filtered or unexported fields
}

TruncateMessage is a truncate message.

func (*TruncateMessage) Decode

func (m *TruncateMessage) Decode(src []byte) (err error)

Decode decodes to message from src.

func (*TruncateMessage) SetType

func (m *TruncateMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TruncateMessage) Type

func (m *TruncateMessage) Type() MessageType

Type returns message type.

type TupleData

type TupleData struct {
	ColumnNum uint16
	Columns   []*TupleDataColumn
	// contains filtered or unexported fields
}

TupleData contains row change information.

func (*TupleData) Decode

func (m *TupleData) Decode(src []byte) (int, error)

Decode decodes to message from src.

func (*TupleData) SetType

func (m *TupleData) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TupleData) Type

func (m *TupleData) Type() MessageType

Type returns message type.

type TupleDataColumn

type TupleDataColumn struct {
	// DataType indicates how the data is stored.
	//	 Byte1('n') Identifies the data as NULL value.
	//	 Or
	//	 Byte1('u') Identifies unchanged TOASTed value (the actual value is not sent).
	//	 Or
	//	 Byte1('t') Identifies the data as text formatted value.
	//	 Or
	//	 Byte1('b') Identifies the data as binary value.
	DataType uint8
	Length   uint32
	// Data is th value of the column, in text format. (A future release might support additional formats.) n is the above length.
	Data []byte
}

TupleDataColumn is a column in a TupleData.

func (*TupleDataColumn) Int64

func (c *TupleDataColumn) Int64() (int64, error)

Int64 parse column data as an int64 integer.

type TypeMessage

type TypeMessage struct {
	DataType  uint32
	Namespace string
	Name      string
	// contains filtered or unexported fields
}

TypeMessage is a type message.

func (*TypeMessage) Decode

func (m *TypeMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*TypeMessage) SetType

func (m *TypeMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TypeMessage) Type

func (m *TypeMessage) Type() MessageType

Type returns message type.

type UpdateMessage

type UpdateMessage struct {
	RelationID uint32

	// OldTupleType
	//   Byte1('K'):
	//     Identifies the following TupleData submessage as a key.
	//     This field is optional and is only present if the update changed data
	//     in any of the column(s) that are part of the REPLICA IDENTITY index.
	//
	//   Byte1('O'):
	//     Identifies the following TupleData submessage as an old tuple.
	//     This field is optional and is only present if table in which the update happened
	//     has REPLICA IDENTITY set to FULL.
	//
	//   The Update message may contain either a 'K' message part or an 'O' message part
	//   or neither of them, but never both of them.
	OldTupleType uint8
	OldTuple     *TupleData

	// NewTuple is the contents of a new tuple.
	//   Byte1('N'): Identifies the following TupleData message as a new tuple.
	NewTuple *TupleData
	// contains filtered or unexported fields
}

UpdateMessage is a update message.

func (*UpdateMessage) Decode

func (m *UpdateMessage) Decode(src []byte) (err error)

Decode decodes to message from src.

func (*UpdateMessage) SetType

func (m *UpdateMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*UpdateMessage) Type

func (m *UpdateMessage) Type() MessageType

Type returns message type.

type XLogData

type XLogData struct {
	WALStart     LSN
	ServerWALEnd LSN
	ServerTime   time.Time
	WALData      []byte
}

func ParseXLogData

func ParseXLogData(buf []byte) (XLogData, error)

ParseXLogData parses a XLogData message from the server.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL