postgres

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 18 Imported by: 1

README

Conduit Connector PostgreSQL

scarf pixel

The PostgreSQL connector is a Conduit plugin. It provides both, a source and a destination PostgresSQL connectors.

Source

The Postgres Source Connector connects to a database with the provided url and starts creating records for each change detected in the provided tables.

Upon starting, the source takes a snapshot of the provided tables in the database, then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events.

Snapshot Capture

When the connector first starts, snapshot mode is enabled. The connector acquires a read-only lock on the tables, and then reads all rows of the tables into Conduit. Once all rows in that initial snapshot are read the connector releases its lock and switches into CDC mode.

This behavior is enabled by default, but can be turned off by adding "snapshotMode":"never" to the Source configuration.

Change Data Capture

This connector implements CDC features for PostgreSQL by creating a logical replication slot and a publication that listens to changes in the configured tables. Every detected change is converted into a record and returned in the call to Read. If there is no record available at the moment Read is called, it blocks until a record is available or the connector receives a stop signal.

Logical Replication Configuration

When the connector switches to CDC mode, it attempts to run the initial setup commands to create its logical replication slot and publication. It will connect to an existing slot if one with the configured name exists.

The Postgres user specified in the connection URL must have sufficient privileges to run all of these setup commands, or it will fail.

Example configuration for CDC features:

{
  "url": "url",
  "tables": "records",
  "cdcMode": "logrepl",
  "logrepl.publicationName": "meroxademo",
  "logrepl.slotName": "meroxademo"
}

⚠ When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication. This is the default behaviour and can be disabled by setting logrepl.autoCleanup to false.

Key Handling

The connector will automatically look up the primary key column for the specified tables. If that can't be determined, the connector will return an error.

Configuration Options

name description required default
url Connection string for the Postgres database. true
tables List of table names to read from, separated by comma. Example: "employees,offices,payments". Using * will read from all public tables. true
snapshotMode Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: initial or never). false initial
cdcMode Determines the CDC mode (allowed values: auto, logrepl). false auto
logrepl.publicationName Name of the publication to listen for WAL events. false conduitpub
logrepl.slotName Name of the slot opened for replication events. false conduitslot
logrepl.autoCleanup Whether or not to cleanup the replication slot and pub when connector is deleted false true
table List of table names to read from, separated by comma. Deprecated: use tables instead. false

Destination

The Postgres Destination takes a record.Record and parses it into a valid SQL query. The Destination is designed to handle different payloads and keys. Because of this, each record is individually parsed and upserted.

Upsert Behavior

If the target table already contains a record with the same key, the Destination will upsert with its current received values. Because Keys must be unique, this can overwrite and thus potentially lose data, so keys should be assigned correctly from the Source.

If there is no key, the record will be simply appended.

Configuration Options

name description required default
url Connection string for the Postgres database. true
table Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the opencdc.collection metadata field. false {{ index .Metadata "opencdc.collection" }}
key Key represents the column name for the key used to identify and update existing rows. false

Testing

Run make test to run all the unit and integration tests, which require Docker to be installed and running. The command will handle starting and stopping docker containers for you.

References

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        NewSource,
	NewDestination:   NewDestination,
}

Functions

func NewDestination added in v0.6.0

func NewDestination() sdk.Destination

func NewSource added in v0.6.0

func NewSource() sdk.Source

func Specification

func Specification() sdk.Specification

Specification returns the Plugin's Specification.

Types

type Destination added in v0.6.0

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Configure added in v0.6.0

func (d *Destination) Configure(ctx context.Context, cfg config.Config) error

func (*Destination) Open added in v0.6.0

func (d *Destination) Open(ctx context.Context) error

func (*Destination) Parameters added in v0.6.0

func (d *Destination) Parameters() config.Parameters

func (*Destination) Teardown added in v0.6.0

func (d *Destination) Teardown(ctx context.Context) error

func (*Destination) Write added in v0.6.0

func (d *Destination) Write(ctx context.Context, recs []opencdc.Record) (int, error)

Write routes incoming records to their appropriate handler based on the operation.

type Source added in v0.6.0

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

Source is a Postgres source plugin.

func (*Source) Ack added in v0.6.0

func (s *Source) Ack(ctx context.Context, pos opencdc.Position) error

func (*Source) Configure added in v0.6.0

func (s *Source) Configure(ctx context.Context, cfg config.Config) error

func (*Source) LifecycleOnDeleted added in v0.7.1

func (s *Source) LifecycleOnDeleted(ctx context.Context, cfg config.Config) error

func (*Source) Open added in v0.6.0

func (s *Source) Open(ctx context.Context, pos opencdc.Position) error

func (*Source) Parameters added in v0.6.0

func (s *Source) Parameters() config.Parameters

func (*Source) Read added in v0.6.0

func (s *Source) Read(ctx context.Context) (opencdc.Record, error)

func (*Source) Teardown added in v0.6.0

func (s *Source) Teardown(ctx context.Context) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL