stream

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRelationChanged    = errors.New("relation changed")
	ErrRelationNotFound   = errors.New("relation not found")
	ErrNoIdentityColumns  = errors.New("relation has no identity columns")
	ErrMessageLost        = errors.New("unexpected message order (messages are lost)")
	ErrUnknownMessageType = errors.New("unknown message type")
)

Functions

This section is empty.

Types

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

func NewAccumulator

func NewAccumulator(typeIsArray map[uint32]bool) *Accumulator

func (*Accumulator) Add

func (wta *Accumulator) Add(msg pglogrepl.Message) (*db.WalTransaction, error)

Add adds a logical replication message to the accumulator and returns a WalTransaction when the transaction is complete

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(pool *pgxpool.Pool, replConn *pgconn.PgConn, target target.TargetInterface, slotName string, batchMaxItems int, batchTimeout time.Duration, skipAcknowledge bool) *Stream

func (*Stream) Close

func (s *Stream) Close(ctx context.Context) error

Close is a finalizer function.

func (*Stream) CreateSlot

func (s *Stream) CreateSlot(ctx context.Context) (string, error)

func (*Stream) SetRestartLSN

func (s *Stream) SetRestartLSN(restartLSN pglogrepl.LSN)

func (*Stream) Stream

func (s *Stream) Stream(ctx context.Context) error

Stream receives event from PostgreSQL, batches them, calls target.Write for each batch, increments the LSN after each successful call to Write, sends StandbyStatus after each successful Write call with the updated LSN, and sends periodic StandbyStatus as heartbeats.

func (*Stream) StreamToJSONLines

func (s *Stream) StreamToJSONLines(ctx context.Context, outFile string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL