pg

package
v0.2.0-nightly.20220209 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2022 License: Apache-2.0 Imports: 1 Imported by: 0

README

Conduit PostgreSQL Connectors

Source

The Postgres Source Connector connects to a database with the provided url and then will call Ping and test the connection. If the Ping fails, the Open method will fail and the Connector will not be ready to Read.

Change Data Capture

This connector implements CDC features for PostgreSQL by reading WAL log events into a buffer that is checked on each Read request after the initial table Rows have been read.

CDC Configuration

When Open is called, the connector will attempt to start all necessary connections and will run some initial setup commands to create the logical replication slots it needs to run and start its own subscription.

The connector user specified in the connection URL must have sufficient privileges to run all of these commands or it will fail.

If the cdc field in the plugin.Configuration is set to any truthy value, it will be enabled.

Publication and slot name are user configurable, and must be correctly set. The plugin will do what it can to be smart about publication and slot management, but it can't handle everything.

If a replication_url is provided, it will be used for CDC features instead of the url. If no replication_url is provided, but cdc is enabled, then it will attempt to use that url value for CDC features and logical replication setup.

Example configuration for CDC features:

"cdc":              "true",
"publication_name": "meroxademo",
"slot_name":        "meroxademo",
"url":              url,
"replication_url":  url,
"table":            "records",
"columns":          "key,column1,column2,column3",
Internal Event Buffer

There is a private variable bufferSize that dictates the size of the channel buffer that holds WAL events. If it's full, pushing to that channel will be a blocking operation, and thus execution will stop if the handler for WAL events cannot push into that buffer. That blocking execution could have unknown negative performance consequences, so we should have this be sufficiently high and possibly configured by environment variable. WAL Events

In all cases we should aim for consistency between WAL event records and standard Row records, so any discrepancies between the formats of those should be pointed out. I've addressed some where I've found them, for example the integer handling in withValues function treats all numbers as int64 since that's how the row queries are handled as well.

Go and PostgreSQL Data Types

We handle the basic Postgres types for now, but we will need an exhaustive test suite of all the different data types that Postgres can handle.

Position Handling

The WAL uses Postgres' internal LSN (log sequence number) to track positions of WAL events, but they're not an exact science, and they are subject to wrap-around at their high end limit. This means that once the plugin is reading only WAL events, the LSN will likely be orders of magnitude higher than the row Position of the last read row. This could cause some issues, but can still be handled by the Connector setting Position back to 0 and reading the table from the start again.

However, that isn't the greatest long-term solution and we should handle this by adding a highwater mark for the last read Row in our database query.

Position

The position argument let's the connector know what the current position of the Read is at. When you call Read, it takes that current position and attempts to get the next one. Because of this, we attempt to parse the Position as an integer and then increment it.

The read query will perform a lookup for a row where key >= $1 where key is the name of the key column and $1 is the value of the incremented Position. plugins.ErrEndData is returned if no row can be found that matches this selection.

Record Keys

Position currently references the key column when querying. Thus, key must be a column of integer, serial or bigserial type, or be a string that can be parsed as an integer.

Positions are parsed to integers with ParseInt and then parsed back to strings with FormatInt. incrementPosition and withPosition respectively handle this logic.

If no key field is provided, then the connector will attempt to look up the primary key column of the table. If that can't be determined it will error.

Configuration

The config passed to Open can be contain the following fields.

name description required
table the name of the table in Postgres that the connector should read yes
url formatted connection string to the database. yes
columns comma separated string list of column names that should be built in to each Record's payload. no
key column name that records should use for their Key fields. defaults to the column's primary key if nothing is specified no
cdc enables CDC features req. for CDC mode
publication_name name of the publication to listen for WAL events req. for CDC mode
slot_name name of the slot opened for replication events req. for CDC mode
replication_url URL for the CDC connection to use. If no replication_url is provided, then the CDC connection attempts the use the url value. optional in CDC mode

Columns

If no column names are provided in the config, then the plugin will assume that all columns in the table should be queried. It will attempt to get the column names for the configured table and set them in memory.

Record Keys

Currently the Postgres source takes a key property and uses that column name to key records. The key column must be unique and must be able to be parsed as an integer.

We plan to support alphanumeric and composite record keys soon.

Roadmap

  • Change data capture handling
  • Composite key support
  • Alphanumeric position handling
  • JSONB support

Testing

Run the docker-compose from the project root:

docker-compose -f ./test/docker-compose-postgres.yml up

Run all connector tests:

go test -race ./pkg/plugins/pg/...

References

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Spec

type Spec struct{}

func (Spec) Specify

func (s Spec) Specify() (plugins.Specification, error)

Specify returns the Plugin's Specification

Directories

Path Synopsis
cmd
pg
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL