sinks

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

README

Supported Sinks in pg_flo

pg_flo supports various sink types (destinations) for streaming data changes. This document provides an overview of the supported sinks and how to use them via the command-line interface.

Available Sinks

  1. STDOUT
  2. File
  3. PostgreSQL
  4. Webhook

Common Flags

These flags are common to all sink types:

  • --host: PostgreSQL source host
  • --port: PostgreSQL source port
  • --dbname: PostgreSQL source database name
  • --user: PostgreSQL source user
  • --password: PostgreSQL source password
  • --group: Group name for replication
  • --tables: Tables to replicate (comma-separated)
  • --status-dir: Directory to store status files

STDOUT Sink

The STDOUT sink writes changes directly to the console output.

Usage
pg_flo stream stdout [common flags]
Example
pg_flo stream stdout \
  --host localhost \
  --port 5432 \
  --dbname your_database \
  --user your_user \
  --password your_password \
  --group your_group \
  --tables table1,table2 \
  --status-dir /tmp/pg_flo-status

File Sink

The File sink writes changes to files in the specified output directory.

Usage
pg_flo stream file [common flags] --output-dir <output_directory>
Additional Flags
  • --output-dir: Output directory for file sink
Example
pg_flo stream file \
  --host localhost \
  --port 5432 \
  --dbname your_database \
  --user your_user \
  --password your_password \
  --group your_group \
  --tables table1,table2 \
  --status-dir /tmp/pg_flo-status \
  --output-dir /tmp/pg_flo-output

PostgreSQL Sink

The PostgreSQL sink replicates changes to another PostgreSQL database. To ensure accurate replication of updates and deletes, all tables must have a primary key defined.

Usage
pg_flo stream postgres [common flags] [postgres sink flags]
Additional Flags
  • --target-host: Target PostgreSQL host
  • --target-port: Target PostgreSQL port
  • --target-dbname: Target PostgreSQL database name
  • --target-user: Target PostgreSQL user
  • --target-password: Target PostgreSQL password
  • --sync-schema: Sync schema from source to target via pg_dump (boolean flag)
Example
pg_flo stream postgres \
  --host localhost \
  --port 5432 \
  --dbname source_db \
  --user source_user \
  --password source_password \
  --group replication_group \
  --tables table1,table2 \
  --schema public \
  --status-dir /tmp/pg_flo-status \
  --target-host target.host.com \
  --target-port 5433 \
  --target-dbname target_db \
  --target-user target_user \
  --target-password target_password \
  --sync-schema
Additional Behavior
  • Supports schema synchronization between source and target databases using pg_dump when the --sync-schema flag is set.
  • Creates an internal_pg_flo schema and lsn_status table to keep track of the last processed LSN.
  • Handles INSERT, UPDATE, DELETE, and DDL operations.
  • Uses UPSERT (INSERT ... ON CONFLICT DO UPDATE) for handling both INSERT and UPDATE operations efficiently.
  • Executes operations within a transaction for each batch of changes.
  • Rolls back the transaction and logs an error if any operation in the batch fails.

Webhook Sink

The Webhook sink sends changes as HTTP POST requests to a specified URL.

Usage
pg_flo stream webhook [common flags] --webhook-url <webhook_url>
Additional Flags
  • --webhook-url: URL to send webhook POST requests
Example
pg_flo stream webhook \
  --host localhost \
  --port 5432 \
  --dbname your_database \
  --user your_user \
  --password your_password \
  --group your_group \
  --tables table1,table2 \
  --schema public \
  --status-dir /tmp/pg_flo-status \
  --webhook-url https://your-webhook-endpoint.com/receive
Additional Behavior
  • Sends each change as a separate HTTP POST request to the specified webhook URL.
  • Implements a retry mechanism with up to 3 attempts for failed requests.
  • Considers both network errors and non-2xx status codes as failures that trigger retries.
  • Maintains a status file to keep track of the last processed LSN.
  • The status file is stored in the specified status directory with the name pg_flo_webhook_last_lsn.json.

Sink Interface

pg_flo uses a common interface for all sink types, allowing for easy implementation of new sinks. The Sink interface defines the following methods:

  • WriteBatch(data []interface{}) error: Writes a batch of changes to the sink.
  • Close() error: Closes the sink, releasing any resources or connections.

Sinks can save the last processed LSN at the destination (as appropriate). This ensures that if a pg_flo process shuts down (for example, during a deployment) and starts again, it knows where to resume from.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FileSink

type FileSink struct {
	// contains filtered or unexported fields
}

FileSink represents a sink that writes data to files

func NewFileSink

func NewFileSink(outputDir string) (*FileSink, error)

NewFileSink creates a new FileSink instance

func (*FileSink) Close

func (s *FileSink) Close() error

Close closes the current log file and performs any necessary cleanup

func (*FileSink) WriteBatch

func (s *FileSink) WriteBatch(messages []*utils.CDCMessage) error

WriteBatch writes a batch of data to the current log file

type PostgresSink

type PostgresSink struct {
	// contains filtered or unexported fields
}

PostgresSink represents a sink for PostgreSQL database

func NewPostgresSink

func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser, targetPassword string, syncSchema bool, sourceHost string, sourcePort int, sourceDBName, sourceUser, sourcePassword string, disableForeignKeyChecks bool) (*PostgresSink, error)

NewPostgresSink creates a new PostgresSink instance

func (*PostgresSink) Close

func (s *PostgresSink) Close() error

Close closes the database connection

func (*PostgresSink) WriteBatch

func (s *PostgresSink) WriteBatch(messages []*utils.CDCMessage) error

WriteBatch writes a batch of CDC messages to the target database

type Sink

type Sink interface {
	WriteBatch(data []*utils.CDCMessage) error
}

type Status

type Status struct {
	LastLSN pglogrepl.LSN `json:"last_lsn"`
}

type StdoutSink

type StdoutSink struct{}

StdoutSink represents a sink that writes data to standard output

func NewStdoutSink

func NewStdoutSink() (*StdoutSink, error)

NewStdoutSink creates a new StdoutSink instance

func (*StdoutSink) WriteBatch

func (s *StdoutSink) WriteBatch(messages []*utils.CDCMessage) error

WriteBatch writes a batch of data to standard output

type WebhookSink

type WebhookSink struct {
	// contains filtered or unexported fields
}

WebhookSink represents a sink that sends data to a webhook endpoint

func NewWebhookSink

func NewWebhookSink(webhookURL string) (*WebhookSink, error)

NewWebhookSink creates a new WebhookSink instance

func (*WebhookSink) Close

func (s *WebhookSink) Close() error

Close performs any necessary cleanup (no-op for WebhookSink)

func (*WebhookSink) WriteBatch

func (s *WebhookSink) WriteBatch(messages []*utils.CDCMessage) error

WriteBatch sends a batch of data to the webhook endpoint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL