pglogicalstream

package module
v0.0.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2024 License: MIT Imports: 27 Imported by: 1

README

PostgreSQL Logical Replication CDC Module for Go Go

This Go module builds upon github.com/jackc/pglogrepl to provide an advanced logical replication solution for PostgreSQL. It extends the capabilities of jackc/pglogrep for logical replication by introducing several key features, making it easier to implement Change Data Capture (CDC) in your Go-based applications.

Features

  • Checkpoints Storing: Efficiently manage and store replication checkpoints, facilitating better tracking and management of data changes.

  • Snapshot Streaming: Seamlessly capture and replicate snapshots of your PostgreSQL database, ensuring all data is streamed through the pipeline.

  • Table Filtering: Tailor your CDC process by selectively filtering and replicating specific tables, optimizing resource usage.

Getting Started

Follow these steps to get started with our PostgreSQL Logical Replication CDC Module for Go:

Install the module
import (
    "github.com/usedatabrew/pglogicalstream"
)

Configure your replication stream

Create config.yaml file

db_host: database host
db_password: password12345
db_user: postgres
db_port: 5432
db_name: mocks
db_schema: public
db_tables:
  - rides
replication_slot_name: morning_elephant
tls_verify: require
stream_old_data: true
Basic usage example

By defaultpglogicalstreamwill create replication slot and publication for the tables you provide in Yaml config It immediately starts streaming updates and you can receive them in the OnMessage function

package main

import (
  "fmt"
  "github.com/usedatabrew/pglogicalstream"
  "gopkg.in/yaml.v3"
  "io/ioutil"
  "log"
)

func main() {
  var config pglogicalstream.Config
  yamlFile, err := ioutil.ReadFile("./example/simple/config.yaml")
  if err != nil {
    log.Printf("yamlFile.Get err   #%v ", err)
  }

  err = yaml.Unmarshal(yamlFile, &config)
  if err != nil {
    log.Fatalf("Unmarshal: %v", err)
  }

  pgStream, err := pglogicalstream.NewPgStream(config, log.WithPrefix("pg-cdc"))
  if err != nil {
    panic(err)
  }

  pgStream.OnMessage(func(message messages.Wal2JsonChanges) {
    fmt.Println(message.Changes)
  })
}

Example with checkpointer

In order to recover after the failure, etc you have to store LSN somewhere to continue streaming the data You can implement CheckPointer interface and pass it's instance to NewPgStreamCheckPointer and your LSN will be stored automatically

checkPointer, err := NewPgStreamCheckPointer("redis.com:port", "user", "password")
if err != nil {
    log.Fatalf("Checkpointer error")
}
pgStream, err := pglogicalstream.NewPgStream(config, checkPointer)

Contributing

We welcome contributions from the Go community!

License

This project is licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	TupleDataTypeNull   = uint8('n')
	TupleDataTypeToast  = uint8('u')
	TupleDataTypeText   = uint8('t')
	TupleDataTypeBinary = uint8('b')
)

List of types of data in a tuple.

View Source
const (
	UpdateMessageTupleTypeNone = uint8(0)
	UpdateMessageTupleTypeKey  = uint8('K')
	UpdateMessageTupleTypeOld  = uint8('O')
	UpdateMessageTupleTypeNew  = uint8('N')
)

List of types of UpdateMessage tuples.

View Source
const (
	DeleteMessageTupleTypeKey = uint8('K')
	DeleteMessageTupleTypeOld = uint8('O')
)

List of types of DeleteMessage tuples.

View Source
const (
	TruncateOptionCascade = uint8(1) << iota
	TruncateOptionRestartIdentity
)

List of truncate options.

View Source
const (
	XLogDataByteID                = 'w'
	PrimaryKeepaliveMessageByteID = 'k'
	StandbyStatusUpdateByteID     = 'r'
)

Variables

This section is empty.

Functions

func DropReplicationSlot added in v0.0.26

func DropReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slotName string, options DropReplicationSlotOptions) error

DropReplicationSlot drops a logical replication slot.

func NextTableSpace added in v0.0.26

func NextTableSpace(ctx context.Context, conn *pgconn.PgConn) (err error)

NextTableSpace consumes some msgs so we are at start of CopyData

func SendStandbyStatusUpdate added in v0.0.26

func SendStandbyStatusUpdate(_ context.Context, conn *pgconn.PgConn, ssu StandbyStatusUpdate) error

SendStandbyStatusUpdate sends a StandbyStatusUpdate to the PostgreSQL server.

The only required field in ssu is WALWritePosition. If WALFlushPosition is 0 then WALWritePosition will be assigned to it. If WALApplyPosition is 0 then WALWritePosition will be assigned to it. If ClientTime is the zero value then the current time will be assigned to it.

func StartReplication added in v0.0.26

func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string, startLSN LSN, options StartReplicationOptions) error

StartReplication begins the replication process by executing the START_REPLICATION command.

Types

type BaseBackupOptions added in v0.0.26

type BaseBackupOptions struct {
	// Request information required to generate a progress report, but might as such have a negative impact on the performance.
	Progress bool
	// Sets the label of the backup. If none is specified, a backup label of 'wal-g' will be used.
	Label string
	// Request a fast checkpoint.
	Fast bool
	// Include the necessary WAL segments in the backup. This will include all the files between start and stop backup in the pg_wal directory of the base directory tar file.
	WAL bool
	// By default, the backup will wait until the last required WAL segment has been archived, or emit a warning if log archiving is not enabled.
	// Specifying NOWAIT disables both the waiting and the warning, leaving the client responsible for ensuring the required log is available.
	NoWait bool
	// Limit (throttle) the maximum amount of data transferred from server to client per unit of time (kb/s).
	MaxRate int32
	// Include information about symbolic links present in the directory pg_tblspc in a file named tablespace_map.
	TablespaceMap bool
	// Disable checksums being verified during a base backup.
	// Note that NoVerifyChecksums=true is only supported since PG11
	NoVerifyChecksums bool
}

type BaseBackupResult added in v0.0.26

type BaseBackupResult struct {
	LSN         LSN
	TimelineID  int32
	Tablespaces []BaseBackupTablespace
}

BaseBackupResult will hold the return values of the BaseBackup command

func FinishBaseBackup added in v0.0.26

func FinishBaseBackup(ctx context.Context, conn *pgconn.PgConn) (result BaseBackupResult, err error)

FinishBaseBackup wraps up a backup after copying all results from the BASE_BACKUP command.

func StartBaseBackup added in v0.0.26

func StartBaseBackup(ctx context.Context, conn *pgconn.PgConn, options BaseBackupOptions) (result BaseBackupResult, err error)

StartBaseBackup begins the process for copying a basebackup by executing the BASE_BACKUP command.

type BaseBackupTablespace added in v0.0.26

type BaseBackupTablespace struct {
	OID      int32
	Location string
	Size     int8
}

BaseBackupTablespace represents a tablespace in the backup

type BeginMessage added in v0.0.26

type BeginMessage struct {

	//FinalLSN is the final LSN of the transaction.
	FinalLSN LSN
	// CommitTime is the commit timestamp of the transaction.
	CommitTime time.Time
	// Xid of the transaction.
	Xid uint32
	// contains filtered or unexported fields
}

BeginMessage is a begin message.

func (*BeginMessage) Decode added in v0.0.26

func (m *BeginMessage) Decode(src []byte) error

Decode decodes the message from src.

func (*BeginMessage) SetType added in v0.0.26

func (m *BeginMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*BeginMessage) Type added in v0.0.26

func (m *BeginMessage) Type() MessageType

Type returns message type.

type ChangeFilter added in v0.0.26

type ChangeFilter struct {
	// contains filtered or unexported fields
}

func NewChangeFilter added in v0.0.26

func NewChangeFilter(tableSchemas []schemas.DataTableSchema, schema string) ChangeFilter

func (ChangeFilter) FilterChange added in v0.0.26

func (c ChangeFilter) FilterChange(lsn string, change []byte, OnFiltered Filtered)

type CommitMessage added in v0.0.26

type CommitMessage struct {

	// Flags currently unused (must be 0).
	Flags uint8
	// CommitLSN is the LSN of the commit.
	CommitLSN LSN
	// TransactionEndLSN is the end LSN of the transaction.
	TransactionEndLSN LSN
	// CommitTime is the commit timestamp of the transaction
	CommitTime time.Time
	// contains filtered or unexported fields
}

CommitMessage is a commit message.

func (*CommitMessage) Decode added in v0.0.26

func (m *CommitMessage) Decode(src []byte) error

Decode decodes the message from src.

func (*CommitMessage) SetType added in v0.0.26

func (m *CommitMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*CommitMessage) Type added in v0.0.26

func (m *CommitMessage) Type() MessageType

Type returns message type.

type Config

type Config struct {
	DbHost                     string           `yaml:"db_host"`
	DbPassword                 string           `yaml:"db_password"`
	DbUser                     string           `yaml:"db_user"`
	DbPort                     int              `yaml:"db_port"`
	DbName                     string           `yaml:"db_name"`
	DbSchema                   string           `yaml:"db_schema"`
	DbTablesSchema             []DbTablesSchema `yaml:"db_table_schema"`
	ReplicationSlotName        string           `yaml:"replication_slot_name"`
	TlsVerify                  TlsVerify        `yaml:"tls_verify"`
	StreamOldData              bool             `yaml:"stream_old_data"`
	SeparateChanges            bool             `yaml:"separate_changes"`
	SnapshotMemorySafetyFactor float64          `yaml:"snapshot_memory_safety_factor"`
	BatchSize                  int              `yaml:"batch_size"`
}

type CopyDoneResult added in v0.0.26

type CopyDoneResult struct {
	Timeline int32
	LSN      LSN
}

CopyDoneResult is the parsed result as returned by the server after the client sends a CopyDone to the server to confirm ending the copy-both mode.

func SendStandbyCopyDone added in v0.0.26

func SendStandbyCopyDone(_ context.Context, conn *pgconn.PgConn) (cdr *CopyDoneResult, err error)

SendStandbyCopyDone sends a StandbyCopyDone to the PostgreSQL server to confirm ending the copy-both mode.

type CreateReplicationSlotOptions added in v0.0.26

type CreateReplicationSlotOptions struct {
	Temporary      bool
	SnapshotAction string
	Mode           ReplicationMode
}

type CreateReplicationSlotResult added in v0.0.26

type CreateReplicationSlotResult struct {
	SlotName        string
	ConsistentPoint string
	SnapshotName    string
	OutputPlugin    string
}

CreateReplicationSlotResult is the parsed results the CREATE_REPLICATION_SLOT command.

func CreateReplicationSlot added in v0.0.26

func CreateReplicationSlot(
	ctx context.Context,
	conn *pgconn.PgConn,
	slotName string,
	outputPlugin string,
	options CreateReplicationSlotOptions,
) (CreateReplicationSlotResult, error)

CreateReplicationSlot creates a logical replication slot.

func ParseCreateReplicationSlot added in v0.0.26

func ParseCreateReplicationSlot(mrr *pgconn.MultiResultReader) (CreateReplicationSlotResult, error)

ParseCreateReplicationSlot parses the result of the CREATE_REPLICATION_SLOT command.

type DbSchemaColumn added in v0.0.10

type DbSchemaColumn struct {
	Name                string `yaml:"name"`
	DatabrewType        string `yaml:"databrewType"`
	NativeConnectorType string `yaml:"nativeConnectorType"`
	Pk                  bool   `yaml:"pk"`
	Nullable            bool   `yaml:"nullable"`
}

type DbTablesSchema added in v0.0.9

type DbTablesSchema struct {
	Table   string           `yaml:"table"`
	Columns []DbSchemaColumn `yaml:"columns"`
}

type DeleteMessage added in v0.0.26

type DeleteMessage struct {
	RelationID uint32
	// OldTupleType
	//   Byte1('K'):
	//     Identifies the following TupleData submessage as a key.
	//     This field is present if the table in which the delete has happened uses an index
	//     as REPLICA IDENTITY.
	//
	//   Byte1('O')
	//     Identifies the following TupleData message as an old tuple.
	//     This field is present if the table in which the delete has happened has
	//     REPLICA IDENTITY set to FULL.
	//
	// The Delete message may contain either a 'K' message part or an 'O' message part,
	// but never both of them.
	OldTupleType uint8
	OldTuple     *TupleData
	// contains filtered or unexported fields
}

DeleteMessage is a delete message.

func (*DeleteMessage) Decode added in v0.0.26

func (m *DeleteMessage) Decode(src []byte) (err error)

Decode decodes a message from src.

func (*DeleteMessage) SetType added in v0.0.26

func (m *DeleteMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*DeleteMessage) Type added in v0.0.26

func (m *DeleteMessage) Type() MessageType

Type returns message type.

type DropReplicationSlotOptions added in v0.0.26

type DropReplicationSlotOptions struct {
	Wait bool
}

type Filtered added in v0.0.26

type Filtered func(change Wal2JsonChanges)

type IdentifySystemResult added in v0.0.26

type IdentifySystemResult struct {
	SystemID string
	Timeline int32
	XLogPos  LSN
	DBName   string
}

IdentifySystemResult is the parsed result of the IDENTIFY_SYSTEM command.

func IdentifySystem added in v0.0.26

func IdentifySystem(ctx context.Context, conn *pgconn.PgConn) (IdentifySystemResult, error)

IdentifySystem executes the IDENTIFY_SYSTEM command.

func ParseIdentifySystem added in v0.0.26

func ParseIdentifySystem(mrr *pgconn.MultiResultReader) (IdentifySystemResult, error)

ParseIdentifySystem parses the result of the IDENTIFY_SYSTEM command.

type InsertMessage added in v0.0.26

type InsertMessage struct {

	// RelationID is the ID of the relation corresponding to the ID in the relation message.
	RelationID uint32
	Tuple      *TupleData
	// contains filtered or unexported fields
}

InsertMessage is a insert message

func (*InsertMessage) Decode added in v0.0.26

func (m *InsertMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*InsertMessage) SetType added in v0.0.26

func (m *InsertMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*InsertMessage) Type added in v0.0.26

func (m *InsertMessage) Type() MessageType

Type returns message type.

type LSN added in v0.0.26

type LSN uint64

LSN is a PostgreSQL Log Sequence Number. See https://www.postgresql.org/docs/current/datatype-pg-lsn.html.

func ParseLSN added in v0.0.26

func ParseLSN(s string) (LSN, error)

ParseLSN parses the given XXX/XXX text format LSN used by PostgreSQL.

func (*LSN) Scan added in v0.0.26

func (lsn *LSN) Scan(src interface{}) error

Scan implements the Scanner interface.

func (LSN) String added in v0.0.26

func (lsn LSN) String() string

String formats the LSN value into the XXX/XXX format which is the text format used by PostgreSQL.

func (LSN) Value added in v0.0.26

func (lsn LSN) Value() (driver.Value, error)

Value implements the Valuer interface.

type LogicalDecodingMessage added in v0.0.26

type LogicalDecodingMessage struct {
	LSN           LSN
	Transactional bool
	Prefix        string
	Content       []byte
	// contains filtered or unexported fields
}

LogicalDecodingMessage is a logical decoding message.

func (*LogicalDecodingMessage) Decode added in v0.0.26

func (m *LogicalDecodingMessage) Decode(src []byte) (err error)

Decode decodes a message from src.

func (*LogicalDecodingMessage) SetType added in v0.0.26

func (m *LogicalDecodingMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*LogicalDecodingMessage) Type added in v0.0.26

func (m *LogicalDecodingMessage) Type() MessageType

Type returns message type.

type Message added in v0.0.26

type Message interface {
	Type() MessageType
}

Message is a message received from server.

func Parse added in v0.0.26

func Parse(data []byte) (m Message, err error)

Parse parse a logical replication message.

type MessageDecoder added in v0.0.26

type MessageDecoder interface {
	Decode([]byte) error
}

MessageDecoder decodes message into struct.

type MessageType added in v0.0.26

type MessageType uint8

MessageType indicates the type of logical replication message.

const (
	MessageTypeBegin        MessageType = 'B'
	MessageTypeMessage      MessageType = 'M'
	MessageTypeCommit       MessageType = 'C'
	MessageTypeOrigin       MessageType = 'O'
	MessageTypeRelation     MessageType = 'R'
	MessageTypeType         MessageType = 'Y'
	MessageTypeInsert       MessageType = 'I'
	MessageTypeUpdate       MessageType = 'U'
	MessageTypeDelete       MessageType = 'D'
	MessageTypeTruncate     MessageType = 'T'
	MessageTypeStreamStart  MessageType = 'S'
	MessageTypeStreamStop   MessageType = 'E'
	MessageTypeStreamCommit MessageType = 'c'
	MessageTypeStreamAbort  MessageType = 'A'
)

List of types of logical replication messages.

func (MessageType) String added in v0.0.26

func (t MessageType) String() string

type OnMessage

type OnMessage = func(message Wal2JsonChanges)

type OriginMessage added in v0.0.26

type OriginMessage struct {

	// CommitLSN is the LSN of the commit on the origin server.
	CommitLSN LSN
	Name      string
	// contains filtered or unexported fields
}

OriginMessage is an origin message.

func (*OriginMessage) Decode added in v0.0.26

func (m *OriginMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*OriginMessage) SetType added in v0.0.26

func (m *OriginMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*OriginMessage) Type added in v0.0.26

func (m *OriginMessage) Type() MessageType

Type returns message type.

type PrimaryKeepaliveMessage added in v0.0.26

type PrimaryKeepaliveMessage struct {
	ServerWALEnd   LSN
	ServerTime     time.Time
	ReplyRequested bool
}

func ParsePrimaryKeepaliveMessage added in v0.0.26

func ParsePrimaryKeepaliveMessage(buf []byte) (PrimaryKeepaliveMessage, error)

ParsePrimaryKeepaliveMessage parses a Primary keepalive message from the server.

type RelationMessage added in v0.0.26

type RelationMessage struct {
	RelationID      uint32
	Namespace       string
	RelationName    string
	ReplicaIdentity uint8
	ColumnNum       uint16
	Columns         []*RelationMessageColumn
	// contains filtered or unexported fields
}

RelationMessage is a relation message.

func (*RelationMessage) Decode added in v0.0.26

func (m *RelationMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*RelationMessage) SetType added in v0.0.26

func (m *RelationMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*RelationMessage) Type added in v0.0.26

func (m *RelationMessage) Type() MessageType

Type returns message type.

type RelationMessageColumn added in v0.0.26

type RelationMessageColumn struct {
	// Flags for the column. Currently, it can be either 0 for no flags or 1 which marks the column as part of the key.
	Flags uint8

	Name string

	// DataType is the ID of the column's data type.
	DataType uint32

	// TypeModifier is type modifier of the column (atttypmod).
	TypeModifier int32
}

RelationMessageColumn is one column in a RelationMessage.

type ReplicationMode added in v0.0.26

type ReplicationMode int
const (
	LogicalReplication ReplicationMode = iota
	PhysicalReplication
)

func (ReplicationMode) String added in v0.0.26

func (mode ReplicationMode) String() string

String formats the mode into a postgres valid string

type Snapshotter added in v0.0.26

type Snapshotter struct {
	// contains filtered or unexported fields
}

func NewSnapshotter added in v0.0.26

func NewSnapshotter(dbConf pgconn.Config, snapshotName string) (*Snapshotter, error)

func (*Snapshotter) CalculateBatchSize added in v0.0.26

func (s *Snapshotter) CalculateBatchSize(safetyFactor float64, availableMemory uint64, estimatedRowSize uint64) int

func (*Snapshotter) CloseConn added in v0.0.26

func (s *Snapshotter) CloseConn() error

func (*Snapshotter) FindAvgRowSize added in v0.0.26

func (s *Snapshotter) FindAvgRowSize(table string) sql.NullInt64

func (*Snapshotter) Prepare added in v0.0.26

func (s *Snapshotter) Prepare() error

func (*Snapshotter) QuerySnapshotData added in v0.0.26

func (s *Snapshotter) QuerySnapshotData(table string, columns []string, pk string, limit, offset int) (rows pgx.Rows, err error)

func (*Snapshotter) ReleaseSnapshot added in v0.0.26

func (s *Snapshotter) ReleaseSnapshot() error

type StandbyStatusUpdate added in v0.0.26

type StandbyStatusUpdate struct {
	WALWritePosition LSN       // The WAL position that's been locally written
	WALFlushPosition LSN       // The WAL position that's been locally flushed
	WALApplyPosition LSN       // The WAL position that's been locally applied
	ClientTime       time.Time // Client system clock time
	ReplyRequested   bool      // Request server to reply immediately.
}

StandbyStatusUpdate is a message sent from the client that acknowledges receipt of WAL records.

type StartReplicationOptions added in v0.0.26

type StartReplicationOptions struct {
	Timeline   int32 // 0 means current server timeline
	Mode       ReplicationMode
	PluginArgs []string
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewPgStream

func NewPgStream(config Config, logger *log.Logger) (*Stream, error)

func (*Stream) AckLSN added in v0.0.12

func (s *Stream) AckLSN(lsn string)

func (*Stream) LrMessageC

func (s *Stream) LrMessageC() chan Wal2JsonChanges

func (*Stream) OnMessage

func (s *Stream) OnMessage(callback OnMessage)

func (*Stream) SnapshotMessageC

func (s *Stream) SnapshotMessageC() chan Wal2JsonChanges

func (*Stream) Stop

func (s *Stream) Stop() error

type TimelineHistoryResult added in v0.0.26

type TimelineHistoryResult struct {
	FileName string
	Content  []byte
}

TimelineHistoryResult is the parsed result of the TIMELINE_HISTORY command.

func ParseTimelineHistory added in v0.0.26

func ParseTimelineHistory(mrr *pgconn.MultiResultReader) (TimelineHistoryResult, error)

ParseTimelineHistory parses the result of the TIMELINE_HISTORY command.

func TimelineHistory added in v0.0.26

func TimelineHistory(ctx context.Context, conn *pgconn.PgConn, timeline int32) (TimelineHistoryResult, error)

TimelineHistory executes the TIMELINE_HISTORY command.

type TlsVerify

type TlsVerify string
const TlsNoVerify TlsVerify = "none"
const TlsRequireVerify TlsVerify = "require"

type TruncateMessage added in v0.0.26

type TruncateMessage struct {
	RelationNum uint32
	Option      uint8
	RelationIDs []uint32
	// contains filtered or unexported fields
}

TruncateMessage is a truncate message.

func (*TruncateMessage) Decode added in v0.0.26

func (m *TruncateMessage) Decode(src []byte) (err error)

Decode decodes to message from src.

func (*TruncateMessage) SetType added in v0.0.26

func (m *TruncateMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TruncateMessage) Type added in v0.0.26

func (m *TruncateMessage) Type() MessageType

Type returns message type.

type TupleData added in v0.0.26

type TupleData struct {
	ColumnNum uint16
	Columns   []*TupleDataColumn
	// contains filtered or unexported fields
}

TupleData contains row change information.

func (*TupleData) Decode added in v0.0.26

func (m *TupleData) Decode(src []byte) (int, error)

Decode decodes to message from src.

func (*TupleData) SetType added in v0.0.26

func (m *TupleData) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TupleData) Type added in v0.0.26

func (m *TupleData) Type() MessageType

Type returns message type.

type TupleDataColumn added in v0.0.26

type TupleDataColumn struct {
	// DataType indicates how the data is stored.
	//	 Byte1('n') Identifies the data as NULL value.
	//	 Or
	//	 Byte1('u') Identifies unchanged TOASTed value (the actual value is not sent).
	//	 Or
	//	 Byte1('t') Identifies the data as text formatted value.
	//	 Or
	//	 Byte1('b') Identifies the data as binary value.
	DataType uint8
	Length   uint32
	// Data is th value of the column, in text format. (A future release might support additional formats.) n is the above length.
	Data []byte
}

TupleDataColumn is a column in a TupleData.

func (*TupleDataColumn) Int64 added in v0.0.26

func (c *TupleDataColumn) Int64() (int64, error)

Int64 parse column data as an int64 integer.

type TypeMessage added in v0.0.26

type TypeMessage struct {
	DataType  uint32
	Namespace string
	Name      string
	// contains filtered or unexported fields
}

TypeMessage is a type message.

func (*TypeMessage) Decode added in v0.0.26

func (m *TypeMessage) Decode(src []byte) error

Decode decodes to message from src.

func (*TypeMessage) SetType added in v0.0.26

func (m *TypeMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*TypeMessage) Type added in v0.0.26

func (m *TypeMessage) Type() MessageType

Type returns message type.

type UpdateMessage added in v0.0.26

type UpdateMessage struct {
	RelationID uint32

	// OldTupleType
	//   Byte1('K'):
	//     Identifies the following TupleData submessage as a key.
	//     This field is optional and is only present if the update changed data
	//     in any of the column(s) that are part of the REPLICA IDENTITY index.
	//
	//   Byte1('O'):
	//     Identifies the following TupleData submessage as an old tuple.
	//     This field is optional and is only present if table in which the update happened
	//     has REPLICA IDENTITY set to FULL.
	//
	//   The Update message may contain either a 'K' message part or an 'O' message part
	//   or neither of them, but never both of them.
	OldTupleType uint8
	OldTuple     *TupleData

	// NewTuple is the contents of a new tuple.
	//   Byte1('N'): Identifies the following TupleData message as a new tuple.
	NewTuple *TupleData
	// contains filtered or unexported fields
}

UpdateMessage is a update message.

func (*UpdateMessage) Decode added in v0.0.26

func (m *UpdateMessage) Decode(src []byte) (err error)

Decode decodes to message from src.

func (*UpdateMessage) SetType added in v0.0.26

func (m *UpdateMessage) SetType(t MessageType)

SetType sets message type. This method is added to help writing test code in application. The message type is still defined by message data.

func (*UpdateMessage) Type added in v0.0.26

func (m *UpdateMessage) Type() MessageType

Type returns message type.

type Wal2JsonChange added in v0.0.24

type Wal2JsonChange struct {
	Kind   string       `json:"action"`
	Schema string       `json:"schema"`
	Table  string       `json:"table"`
	Row    arrow.Record `json:"data"`
}

type Wal2JsonChanges added in v0.0.24

type Wal2JsonChanges struct {
	Lsn     string
	Changes []Wal2JsonChange `json:"change"`
}

type WallMessage added in v0.0.26

type WallMessage struct {
	Change []struct {
		Kind         string        `json:"kind"`
		Schema       string        `json:"schema"`
		Table        string        `json:"table"`
		Columnnames  []string      `json:"columnnames"`
		Columntypes  []string      `json:"columntypes"`
		Columnvalues []interface{} `json:"columnvalues"`
		Oldkeys      struct {
			Keynames  []string      `json:"keynames"`
			Keytypes  []string      `json:"keytypes"`
			Keyvalues []interface{} `json:"keyvalues"`
		} `json:"oldkeys"`
	} `json:"change"`
}

type XLogData added in v0.0.26

type XLogData struct {
	WALStart     LSN
	ServerWALEnd LSN
	ServerTime   time.Time
	WALData      []byte
}

func ParseXLogData added in v0.0.26

func ParseXLogData(buf []byte) (XLogData, error)

ParseXLogData parses a XLogData message from the server.

Directories

Path Synopsis
example
ws
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL