Documentation ¶
Index ¶
- Variables
- type Column
- type Database
- func (db *Database) Close(ctx context.Context) error
- func (db *Database) Commit(ctx context.Context) error
- func (db *Database) Connect(ctx context.Context) error
- func (db *Database) CreateReplicationSlot(ctx context.Context)
- func (db *Database) Discover(ctx context.Context) error
- func (db *Database) DropReplicationSlot(ctx context.Context)
- func (db *Database) HandleLogical(ctx context.Context, lsn pglogrepl.LSN, msg pglogrepl.Message) error
- func (db *Database) PrintSatus()
- func (db *Database) RegisterSlotLagMetric(ctx context.Context)
- func (db *Database) Reindex(ctx context.Context) error
- func (db *Database) StartReplication(ctx context.Context, at pglogrepl.LSN) error
- func (db *Database) Tx(ctx context.Context) error
- type DecoderValue
- type Doc
- type Document
- type ESAction
- type Inline
- type Position
- type Schema
- type StreamPipe
- type Table
Constants ¶
This section is empty.
Variables ¶
var ( // ErrZeroType Postgres type OID can not be zero ErrZeroType = errors.New("postgres type OID can't be zero") // ErrTypeNotFound can not find type OID among discovered types ErrTypeNotFound = errors.New("can not find discovered type") )
var ( ErrTimeout = errors.New("timeout") ErrChClosed = errors.New("document channel is closed") )
var ( // ErrColumnOutOfRange means that received result tuple is smaller than expected column position ErrColumnOutOfRange = errors.New("column out of range") )
var ( // ErrUnknownType type discovery failed ErrUnknownType = errors.New("unknown type") )
Functions ¶
This section is empty.
Types ¶
type Column ¶
type Column struct {
// contains filtered or unexported fields
}
func (*Column) MarshalJSON ¶
type Database ¶
type Database struct { SlotName string Publication string StandbyTimeout time.Duration // contains filtered or unexported fields }
func (*Database) CreateReplicationSlot ¶
CreateReplicationSlot creates a replication slot at current position and uses newly created snapshot in current transaction. For the sake of consistency it's important to use this method and initial data copying within transaction db.Tx(ctx) db.CreateReplicationSlot(ctx) ... copy data db.Commit(ctx)
func (*Database) Discover ¶
Discover uses Postgres publication and table or column comments to generate replication config Only tables explosed via Publication will be considered for exporting to ES.
func (*Database) DropReplicationSlot ¶
func (*Database) HandleLogical ¶
func (*Database) PrintSatus ¶
func (db *Database) PrintSatus()
PrintStatus prints some debug information TODO: remove
func (*Database) RegisterSlotLagMetric ¶
func (*Database) StartReplication ¶
StartReplication switches replConn into `CopyBoth` mode and starts streaming. decoded logical messages are passed for further processing, while status updates happens here. during the streaming, replConn is locked and can not be used for anything else.
XXX: PrimaryKeepaliveMessage.ServerWALEnd is actually the location up to which the WAL is sent See: https://stackoverflow.com/questions/71016200/proper-standby-status-update-in-streaming-replication-protocol
type DecoderValue ¶
type DecoderValue interface { pgtype.TextDecoder pgtype.BinaryDecoder // Not all types support BinaryDecoder. pgtype.Value }
DecoderValue can decode value and return pgtype object which implies Value interface.
type Document ¶
type Document struct { Position Meta []byte // Op type, index and document id Data []byte // document content or script }
Document represents one single operation in bulk request.
type Inline ¶
type Inline struct {
// contains filtered or unexported fields
}
Inline defines table like abstraction for inligning into ES doc
type Schema ¶
type Schema struct {
// contains filtered or unexported fields
}
Schema describes Postgres schema/namespace
type StreamPipe ¶
type StreamPipe struct {
// contains filtered or unexported fields
}
StreamPipe connects decoded postgres stream with elasticsearch client. There should not be any other interaction.
func NewStreamPipe ¶
func NewStreamPipe(ctx context.Context) *StreamPipe
func (*StreamPipe) CommitPosition ¶
func (p *StreamPipe) CommitPosition(pos pglogrepl.LSN)
func (*StreamPipe) Position ¶
func (p *StreamPipe) Position() pglogrepl.LSN
type Table ¶
type Table struct {
// contains filtered or unexported fields
}
func (*Table) CopyAll ¶
Selects all rown from table, and populates results into Database.results chanel. Copy existing data snapshoted by slot creation, using simple protocol.
func (*Table) EncodeUpdateRowJSON ¶
EncodeUpdateRowJSON wraps row into `{"doc": ... }` object, required by ElasticSearch bulk request syntax for update queries
func (*Table) MarshalJSON ¶
func (*Table) SetRelationID ¶
TODO: take RelID set cache out of this tree config