pglogicalstream

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: MIT Imports: 18 Imported by: 0

README

PostgreSQL Logical Replication CDC Module for Go Go

This Go module builds upon github.com/jackc/pglogrepl to provide an advanced logical replication solution for PostgreSQL. It extends the capabilities of jackc/pglogrep for logical replication by introducing several key features, making it easier to implement Change Data Capture (CDC) in your Go-based applications.

Features

  • Checkpoints Storing: Efficiently manage and store replication checkpoints, facilitating better tracking and management of data changes.

  • Snapshot Streaming: Seamlessly capture and replicate snapshots of your PostgreSQL database, ensuring all data is streamed through the pipeline.

  • Table Filtering: Tailor your CDC process by selectively filtering and replicating specific tables, optimizing resource usage.

Getting Started

Follow these steps to get started with our PostgreSQL Logical Replication CDC Module for Go:

Install the module
import (
    "github.com/usedatabrew/pglogicalstream"
)

Configure your replication stream

Create config.yaml file

db_host: database host
db_password: password12345
db_user: postgres
db_port: 5432
db_name: mocks
db_schema: public
db_tables:
  - rides
replication_slot_name: morning_elephant
tls_verify: require
stream_old_data: true
Basic usage example

By defaultpglogicalstreamwill create replication slot and publication for the tables you provide in Yaml config It immediately starts streaming updates and you can receive them in the OnMessage function

package main

import (
  "fmt"
  "github.com/usedatabrew/pglogicalstream"
  "gopkg.in/yaml.v3"
  "io/ioutil"
  "log"
)

func main() {
  var config pglogicalstream.Config
  yamlFile, err := ioutil.ReadFile("./example/simple/config.yaml")
  if err != nil {
    log.Printf("yamlFile.Get err   #%v ", err)
  }

  err = yaml.Unmarshal(yamlFile, &config)
  if err != nil {
    log.Fatalf("Unmarshal: %v", err)
  }

  pgStream, err := pglogicalstream.NewPgStream(config, nil)
  if err != nil {
    panic(err)
  }

  // Listen to messages here
  pgStream.OnMessage(func(message []byte) {
    fmt.Println(string(message))
  })
}
Example with checkpointer

In order to recover after the failure, etc you have to store LSN somewhere to continue streaming the data You can implement CheckPointer interface and pass it's instance to NewPgStreamCheckPointer and your LSN will be stored automatically

checkPointer, err := NewPgStreamCheckPointer("redis.com:port", "user", "password")
if err != nil {
    log.Fatalf("Checkpointer error")
}
pgStream, err := pglogicalstream.NewPgStream(config, checkPointer)
Enable checkpoints by implementing CheckPointer inteface
type CheckPointer interface {
    SetCheckPoint(lsn, slot string) error
    GetCheckPoint(slot string) string
}
Checkpointer implementation example
package main

import (
  "fmt"
  "github.com/go-redis/redis/v7"
)

type PgStreamCheckPointer struct {
  redisConn *redis.Client
}

func NewPgStreamCheckPointer(addr, user, password string) (*PgStreamCheckPointer, error) {
  client := redis.NewClient(&redis.Options{
    Addr:     addr,
    Username: user,
    Password: password,
  })
  conn := client.Conn()
  result := conn.Ping()
  if result.Err() != nil {
    return nil, result.Err()
  }

  return &PgStreamCheckPointer{
    redisConn: client,
  }, nil
}

func (p *PgStreamCheckPointer) SaveCheckPoint(lnsCheckPoint, replicationSlot string) error {
  return p.redisConn.Set(fmt.Sprintf("databrew_checkpoint_%s", replicationSlot), lnsCheckPoint, 0).Err()
}

func (p *PgStreamCheckPointer) GetCheckPoint(replicationSlot string) string {
  result, _ := p.redisConn.Get(fmt.Sprintf("databrew_checkpoint_%s", replicationSlot)).Result()
  return result
}

func (p *PgStreamCheckPointer) Close() error {
  if p.redisConn != nil {
    return p.redisConn.Close()
  }

  return nil
}

Contributing

We welcome contributions from the Go community!

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	DbHost                     string           `yaml:"db_host"`
	DbPassword                 string           `yaml:"db_password"`
	DbUser                     string           `yaml:"db_user"`
	DbPort                     int              `yaml:"db_port"`
	DbName                     string           `yaml:"db_name"`
	DbSchema                   string           `yaml:"db_schema"`
	DbTablesSchema             []DbTablesSchema `yaml:"db_table_schema"`
	ReplicationSlotName        string           `yaml:"replication_slot_name"`
	TlsVerify                  TlsVerify        `yaml:"tls_verify"`
	StreamOldData              bool             `yaml:"stream_old_data"`
	SeparateChanges            bool             `yaml:"separate_changes"`
	SnapshotMemorySafetyFactor float64          `yaml:"snapshot_memory_safety_factor"`
	BatchSize                  int              `yaml:"batch_size"`
}

type DbSchemaColumn

type DbSchemaColumn struct {
	Name                string `yaml:"name"`
	DatabrewType        string `yaml:"databrewType"`
	NativeConnectorType string `yaml:"nativeConnectorType"`
	Pk                  bool   `yaml:"pk"`
	Nullable            bool   `yaml:"nullable"`
}

type DbTablesSchema

type DbTablesSchema struct {
	Table   string           `yaml:"table"`
	Columns []DbSchemaColumn `yaml:"columns"`
}

type OnMessage

type OnMessage = func(message replication.Wal2JsonChanges)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewPgStream

func NewPgStream(config Config, logger *log.Logger) (*Stream, error)

func (*Stream) AckLSN

func (s *Stream) AckLSN(lsn string)

func (*Stream) LrMessageC

func (s *Stream) LrMessageC() chan replication.Wal2JsonChanges

func (*Stream) OnMessage

func (s *Stream) OnMessage(callback OnMessage)

func (*Stream) SnapshotMessageC

func (s *Stream) SnapshotMessageC() chan replication.Wal2JsonChanges

func (*Stream) Stop

func (s *Stream) Stop() error

type TlsVerify

type TlsVerify string
const TlsNoVerify TlsVerify = "none"
const TlsRequireVerify TlsVerify = "require"

Directories

Path Synopsis
example
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL