Documentation ¶
Overview ¶
pglogrepl package implements PostgreSQL logical replication client functionality.
pglogrepl uses package github.com/jackc/pgconn as its underlying PostgreSQL connection. Use pgconn to establish a connection to PostgreSQL and then use the pglogrepl functions on that connection.
Proper use of this package requires understanding the underlying PostgreSQL concepts. See https://www.postgresql.org/docs/current/protocol-replication.html.
Index ¶
- Constants
- func DropReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slotName string, ...) error
- func NextTableSpace(ctx context.Context, conn *pgconn.PgConn) (err error)
- func SendStandbyStatusUpdate(ctx context.Context, conn *pgconn.PgConn, ssu StandbyStatusUpdate) error
- func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string, startLSN LSN, ...) error
- type BaseBackupOptions
- type BaseBackupResult
- type BaseBackupTablespace
- type BeginMessage
- type CommitMessage
- type CopyDoneResult
- type CreateReplicationSlotOptions
- type CreateReplicationSlotResult
- type DeleteMessage
- type DropReplicationSlotOptions
- type IdentifySystemResult
- type InsertMessage
- type LSN
- type Message
- type MessageDecoder
- type MessageType
- type OriginMessage
- type PrimaryKeepaliveMessage
- type RelationMessage
- type RelationMessageColumn
- type ReplicationMode
- type StandbyStatusUpdate
- type StartReplicationOptions
- type TimelineHistoryResult
- type TruncateMessage
- type TupleData
- type TupleDataColumn
- type TypeMessage
- type UpdateMessage
- type XLogData
Constants ¶
const ( TupleDataTypeNull = uint8('n') TupleDataTypeToast = uint8('u') TupleDataTypeText = uint8('t') TupleDataTypeBinary = uint8('b') )
List of types of data in a tuple.
const ( UpdateMessageTupleTypeNone = uint8(0) UpdateMessageTupleTypeKey = uint8('K') UpdateMessageTupleTypeOld = uint8('O') UpdateMessageTupleTypeNew = uint8('N') )
List of types of UpdateMessage tuples.
const ( DeleteMessageTupleTypeKey = uint8('K') DeleteMessageTupleTypeOld = uint8('O') )
List of types of DeleteMessage tuples.
const ( TruncateOptionCascade = uint8(1) << iota TruncateOptionRestartIdentity )
List of truncate options.
const ( XLogDataByteID = 'w' PrimaryKeepaliveMessageByteID = 'k' StandbyStatusUpdateByteID = 'r' )
Variables ¶
This section is empty.
Functions ¶
func DropReplicationSlot ¶
func DropReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slotName string, options DropReplicationSlotOptions) error
DropReplicationSlot drops a logical replication slot.
func NextTableSpace ¶
NextTablespace consumes some msgs so we are at start of CopyData
func SendStandbyStatusUpdate ¶
func SendStandbyStatusUpdate(ctx context.Context, conn *pgconn.PgConn, ssu StandbyStatusUpdate) error
SendStandbyStatusUpdate sends a StandbyStatusUpdate to the PostgreSQL server.
The only required field in ssu is WALWritePosition. If WALFlushPosition is 0 then WALWritePosition will be assigned to it. If WALApplyPosition is 0 then WALWritePosition will be assigned to it. If ClientTime is the zero value then the current time will be assigned to it.
func StartReplication ¶
func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string, startLSN LSN, options StartReplicationOptions) error
StartReplication begins the replication process by executing the START_REPLICATION command.
Types ¶
type BaseBackupOptions ¶
type BaseBackupOptions struct { // Request information required to generate a progress report, but might as such have a negative impact on the performance. Progress bool // Sets the label of the backup. If none is specified, a backup label of 'wal-g' will be used. Label string // Request a fast checkpoint. Fast bool // Include the necessary WAL segments in the backup. This will include all the files between start and stop backup in the pg_wal directory of the base directory tar file. WAL bool // By default, the backup will wait until the last required WAL segment has been archived, or emit a warning if log archiving is not enabled. // Specifying NOWAIT disables both the waiting and the warning, leaving the client responsible for ensuring the required log is available. NoWait bool // Limit (throttle) the maximum amount of data transferred from server to client per unit of time (kb/s). MaxRate int32 // Include information about symbolic links present in the directory pg_tblspc in a file named tablespace_map. TablespaceMap bool // Disable checksums being verified during a base backup. // Note that NoVerifyChecksums=true is only supported since PG11 NoVerifyChecksums bool }
type BaseBackupResult ¶
type BaseBackupResult struct { LSN LSN TimelineID int32 Tablespaces []BaseBackupTablespace }
BaseBackupResult will hold the return values of the BaseBackup command
func FinishBaseBackup ¶
func FinishBaseBackup(ctx context.Context, conn *pgconn.PgConn) (result BaseBackupResult, err error)
FinishBaseBackup wraps up a backup after copying all results from the BASE_BACKUP command.
func StartBaseBackup ¶
func StartBaseBackup(ctx context.Context, conn *pgconn.PgConn, options BaseBackupOptions) (result BaseBackupResult, err error)
StartBaseBackup begins the process for copying a basebackup by executing the BASE_BACKUP command.
type BaseBackupTablespace ¶
BaseBackupTablespace represents a tablespace in the backup
type BeginMessage ¶
type BeginMessage struct { //FinalLSN is the final LSN of the transaction. FinalLSN LSN // CommitTime is the commit timestamp of the transaction. CommitTime time.Time // Xid of the transaction. Xid uint32 // contains filtered or unexported fields }
BeginMessage is a begin message.
func (*BeginMessage) Decode ¶
func (m *BeginMessage) Decode(src []byte) error
Decode decodes the message from src.
func (*BeginMessage) Raw ¶
func (m *BeginMessage) Raw() []byte
func (*BeginMessage) SetType ¶
func (m *BeginMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type CommitMessage ¶
type CommitMessage struct { // Flags currently unused (must be 0). Flags uint8 // CommitLSN is the LSN of the commit. CommitLSN LSN // TransactionEndLSN is the end LSN of the transaction. TransactionEndLSN LSN // CommitTime is the commit timestamp of the transaction CommitTime time.Time // contains filtered or unexported fields }
CommitMessage is a commit message.
func (*CommitMessage) Decode ¶
func (m *CommitMessage) Decode(src []byte) error
Decode decodes the message from src.
func (*CommitMessage) Raw ¶
func (m *CommitMessage) Raw() []byte
func (*CommitMessage) SetType ¶
func (m *CommitMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type CopyDoneResult ¶
CopyDoneResult is the parsed result as returned by the server after the client sends a CopyDone to the server to confirm ending the copy-both mode.
func SendStandbyCopyDone ¶
SendStandbyCopyDone sends a StandbyCopyDone to the PostgreSQL server to confirm ending the copy-both mode.
type CreateReplicationSlotOptions ¶
type CreateReplicationSlotOptions struct { Temporary bool SnapshotAction string Mode ReplicationMode }
type CreateReplicationSlotResult ¶
type CreateReplicationSlotResult struct { SlotName string ConsistentPoint string SnapshotName string OutputPlugin string }
CreateReplicationSlotResult is the parsed results the CREATE_REPLICATION_SLOT command.
func CreateReplicationSlot ¶
func CreateReplicationSlot( ctx context.Context, conn *pgconn.PgConn, slotName string, outputPlugin string, options CreateReplicationSlotOptions, ) (CreateReplicationSlotResult, error)
CreateReplicationSlot creates a logical replication slot.
func ParseCreateReplicationSlot ¶
func ParseCreateReplicationSlot(mrr *pgconn.MultiResultReader) (CreateReplicationSlotResult, error)
ParseCreateReplicationSlot parses the result of the CREATE_REPLICATION_SLOT command.
type DeleteMessage ¶
type DeleteMessage struct { RelationID uint32 // OldTupleType // Byte1('K'): // Identifies the following TupleData submessage as a key. // This field is present if the table in which the delete has happened uses an index // as REPLICA IDENTITY. // // Byte1('O') // Identifies the following TupleData message as a old tuple. // This field is present if the table in which the delete has happened has // REPLICA IDENTITY set to FULL. // // The Delete message may contain either a 'K' message part or an 'O' message part, // but never both of them. OldTupleType uint8 OldTuple *TupleData // contains filtered or unexported fields }
DeleteMessage is a delete message.
func (*DeleteMessage) Decode ¶
func (m *DeleteMessage) Decode(src []byte) (err error)
Decode decodes a message from src.
func (*DeleteMessage) Raw ¶
func (m *DeleteMessage) Raw() []byte
func (*DeleteMessage) SetType ¶
func (m *DeleteMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type DropReplicationSlotOptions ¶
type DropReplicationSlotOptions struct {
Wait bool
}
type IdentifySystemResult ¶
IdentifySystemResult is the parsed result of the IDENTIFY_SYSTEM command.
func IdentifySystem ¶
IdentifySystem executes the IDENTIFY_SYSTEM command.
func ParseIdentifySystem ¶
func ParseIdentifySystem(mrr *pgconn.MultiResultReader) (IdentifySystemResult, error)
ParseIdentifySystem parses the result of the IDENTIFY_SYSTEM command.
type InsertMessage ¶
type InsertMessage struct { // RelationID is the ID of the relation corresponding to the ID in the relation message. RelationID uint32 Tuple *TupleData // contains filtered or unexported fields }
InsertMessage is a insert message
func (*InsertMessage) Decode ¶
func (m *InsertMessage) Decode(src []byte) error
Decode decodes to message from src.
func (*InsertMessage) Raw ¶
func (m *InsertMessage) Raw() []byte
func (*InsertMessage) SetType ¶
func (m *InsertMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type LSN ¶
type LSN uint64
LSN is a PostgreSQL Log Sequence Number. See https://www.postgresql.org/docs/current/datatype-pg-lsn.html.
type Message ¶
type Message interface { Type() MessageType Raw() []byte }
Message is a message received from server.
type MessageDecoder ¶
MessageDecoder decodes meessage into struct.
type MessageType ¶
type MessageType uint8
MessageType indicates type of a logical replication message.
const ( MessageTypeBegin MessageType = 'B' MessageTypeCommit MessageType = 'C' MessageTypeOrigin MessageType = 'O' MessageTypeRelation MessageType = 'R' MessageTypeType MessageType = 'Y' MessageTypeInsert MessageType = 'I' MessageTypeUpdate MessageType = 'U' MessageTypeDelete MessageType = 'D' MessageTypeTruncate MessageType = 'T' )
List of types of logical replication messages.
func (MessageType) String ¶
func (t MessageType) String() string
type OriginMessage ¶
type OriginMessage struct { // CommitLSN is the LSN of the commit on the origin server. CommitLSN LSN Name string // contains filtered or unexported fields }
OriginMessage is a origin message.
func (*OriginMessage) Decode ¶
func (m *OriginMessage) Decode(src []byte) error
Decode decodes to message from src.
func (*OriginMessage) Raw ¶
func (m *OriginMessage) Raw() []byte
func (*OriginMessage) SetType ¶
func (m *OriginMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type PrimaryKeepaliveMessage ¶
func ParsePrimaryKeepaliveMessage ¶
func ParsePrimaryKeepaliveMessage(buf []byte) (PrimaryKeepaliveMessage, error)
ParsePrimaryKeepaliveMessage parses a Primary keepalive message from the server.
type RelationMessage ¶
type RelationMessage struct { RelationID uint32 Namespace string RelationName string ReplicaIdentity uint8 ColumnNum uint16 Columns []*RelationMessageColumn // contains filtered or unexported fields }
RelationMessage is a relation message.
func (*RelationMessage) Decode ¶
func (m *RelationMessage) Decode(src []byte) error
Decode decodes to message from src.
func (*RelationMessage) Raw ¶
func (m *RelationMessage) Raw() []byte
func (*RelationMessage) SetType ¶
func (m *RelationMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
func (*RelationMessage) Type ¶
func (m *RelationMessage) Type() MessageType
Type returns message type.
type RelationMessageColumn ¶
type RelationMessageColumn struct { // Flags for the column. Currently can be either 0 for no flags or 1 which marks the column as part of the key. Flags uint8 Name string // DataType is the ID of the column's data type. DataType uint32 // TypeModifier is type modifier of the column (atttypmod). TypeModifier int32 }
RelationMessageColumn is one column in a RelationMessage.
type ReplicationMode ¶
type ReplicationMode int
const ( LogicalReplication ReplicationMode = iota PhysicalReplication )
func (ReplicationMode) String ¶
func (mode ReplicationMode) String() string
String formats the mode into a postgres valid string
type StandbyStatusUpdate ¶
type StandbyStatusUpdate struct { WALWritePosition LSN // The WAL position that's been locally written WALFlushPosition LSN // The WAL position that's been locally flushed WALApplyPosition LSN // The WAL position that's been locally applied ClientTime time.Time // Client system clock time ReplyRequested bool // Request server to reply immediately. }
StandbyStatusUpdate is a message sent from the client that acknowledges receipt of WAL records.
type StartReplicationOptions ¶
type StartReplicationOptions struct { Timeline int32 // 0 means current server timeline Mode ReplicationMode PluginArgs []string }
type TimelineHistoryResult ¶
TimelineHistoryResult is the parsed result of the TIMELINE_HISTORY command.
func ParseTimelineHistory ¶
func ParseTimelineHistory(mrr *pgconn.MultiResultReader) (TimelineHistoryResult, error)
ParseTimelineHistory parses the result of the TIMELINE_HISTORY command.
func TimelineHistory ¶
func TimelineHistory(ctx context.Context, conn *pgconn.PgConn, timeline int32) (TimelineHistoryResult, error)
TimelineHistory executes the TIMELINE_HISTORY command.
type TruncateMessage ¶
type TruncateMessage struct { RelationNum uint32 Option uint8 RelationIDs []uint32 // contains filtered or unexported fields }
TruncateMessage is a truncate message.
func (*TruncateMessage) Decode ¶
func (m *TruncateMessage) Decode(src []byte) (err error)
Decode decodes to message from src.
func (*TruncateMessage) Raw ¶
func (m *TruncateMessage) Raw() []byte
func (*TruncateMessage) SetType ¶
func (m *TruncateMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
func (*TruncateMessage) Type ¶
func (m *TruncateMessage) Type() MessageType
Type returns message type.
type TupleData ¶
type TupleData struct { ColumnNum uint16 Columns []*TupleDataColumn // contains filtered or unexported fields }
TupleData contains row change information.
func (*TupleData) SetType ¶
func (m *TupleData) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type TupleDataColumn ¶
type TupleDataColumn struct { // DataType indicates the how does the data is stored. // Byte1('n') Identifies the data as NULL value. // Or // Byte1('u') Identifies unchanged TOASTed value (the actual value is not sent). // Or // Byte1('t') Identifies the data as text formatted value. // Or // Byte1('b') Identifies the data as binary value. DataType uint8 Length uint32 // Data is th value of the column, in text format. (A future release might support additional formats.) n is the above length. Data []byte }
TupleDataColumn is a column in a TupleData.
func (*TupleDataColumn) Int64 ¶
func (c *TupleDataColumn) Int64() (int64, error)
Int64 parse column data as an int64 integer.
type TypeMessage ¶
type TypeMessage struct { DataType uint32 Namespace string Name string // contains filtered or unexported fields }
TypeMessage is a type message.
func (*TypeMessage) Decode ¶
func (m *TypeMessage) Decode(src []byte) error
Decode decodes to message from src.
func (*TypeMessage) Raw ¶
func (m *TypeMessage) Raw() []byte
func (*TypeMessage) SetType ¶
func (m *TypeMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.
type UpdateMessage ¶
type UpdateMessage struct { RelationID uint32 // OldTupleType // Byte1('K'): // Identifies the following TupleData submessage as a key. // This field is optional and is only present if the update changed data // in any of the column(s) that are part of the REPLICA IDENTITY index. // // Byte1('O'): // Identifies the following TupleData submessage as an old tuple. // This field is optional and is only present if table in which the update happened // has REPLICA IDENTITY set to FULL. // // The Update message may contain either a 'K' message part or an 'O' message part // or neither of them, but never both of them. OldTupleType uint8 OldTuple *TupleData // NewTuple is the contents of a new tuple. // Byte1('N'): Identifies the following TupleData message as a new tuple. NewTuple *TupleData // contains filtered or unexported fields }
UpdateMessage is a update message.
func (*UpdateMessage) Decode ¶
func (m *UpdateMessage) Decode(src []byte) (err error)
Decode decodes to message from src.
func (*UpdateMessage) Raw ¶
func (m *UpdateMessage) Raw() []byte
func (*UpdateMessage) SetType ¶
func (m *UpdateMessage) SetType(t MessageType)
SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.