postgres

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	PostgresURL string
	// Name of the replication slot to listen on. If not provided, it defaults
	// to "pgstream_<dbname>_slot".
	ReplicationSlotName string
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles the postgres replication slot operations

func NewHandler

func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, error)

NewHandler returns a new postgres replication handler for the database on input.

func (*Handler) Close

func (h *Handler) Close() error

Close closes the database connections.

func (*Handler) GetCurrentLSN added in v0.3.0

func (h *Handler) GetCurrentLSN(ctx context.Context) (replication.LSN, error)

func (*Handler) GetLSNParser

func (h *Handler) GetLSNParser() replication.LSNParser

GetLSNParser returns a postgres implementation of the LSN parser.

func (*Handler) GetReplicationLag

func (h *Handler) GetReplicationLag(ctx context.Context) (int64, error)

GetReplicationLag will return the consumer current replication lag. This value is different from the postgres replication lag, which takes into consideration all consumers and ongoing transactions.

func (*Handler) ReceiveMessage

func (h *Handler) ReceiveMessage(ctx context.Context) (*replication.Message, error)

ReceiveMessage will listen for messages from the WAL. It returns an error if an unexpected message is received.

func (*Handler) StartReplication

func (h *Handler) StartReplication(ctx context.Context) error

StartReplication will start the replication process on the configured replication slot. It will check for the last synced LSN (confirmed_flush_lsn), and if there isn't one, it will start replication from the restart_lsn position.

func (*Handler) StartReplicationFromLSN added in v0.3.0

func (h *Handler) StartReplicationFromLSN(ctx context.Context, lsn replication.LSN) error

StartReplicationFromLSN will start the replication process on the configured replication slot from the LSN on input.

func (*Handler) SyncLSN

func (h *Handler) SyncLSN(ctx context.Context, lsn replication.LSN) error

SyncLSN notifies Postgres how far we have processed in the WAL.

type LSNParser

type LSNParser struct{}

LSNParser is the postgres implementation of the replication.LSNParser

func NewLSNParser

func NewLSNParser() *LSNParser

func (*LSNParser) FromString

func (p *LSNParser) FromString(lsnStr string) (replication.LSN, error)

func (*LSNParser) ToString

func (p *LSNParser) ToString(lsn replication.LSN) string

type Option

type Option func(h *Handler)

func WithLogger

func WithLogger(l loglib.Logger) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL