listener

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrPostgresConnection    = "db connection error"
	ErrReplicationConnection = "replication connection error"
	ErrNatsConnection        = "nats connection error"
	ErrPublishEvent          = "publish message error"
	ErrUnmarshalMsg          = "unmarshal wal message error"
	ErrAckWalMessage         = "acknowledge wal message error"
	ErrSendStandbyStatus     = "send standby status error"
)

Constants with error text message

View Source
const (
	StartServiceMessage = "service was started"
	StopServiceMessage  = "service was stopped"
)

Service info message.

View Source
const (
	CommitMsgType   byte = 'C'
	BeginMsgType    byte = 'B'
	OriginMsgType   byte = 'O'
	RelationMsgType byte = 'R'
	TypeMsgType     byte = 'Y'
	InsertMsgType   byte = 'I'
	UpdateMsgType   byte = 'U'
	DeleteMsgType   byte = 'D'

	NewTupleDataType byte = 'N'
	TextDataType     byte = 't'
	NullDataType     byte = 'n'
	ToastDataType    byte = 'u'
)

Variables

This section is empty.

Functions

func NewRepository

func NewRepository(conn *pgx.Conn) *repositoryImpl

Types

type ActionData added in v1.0.0

type ActionData struct {
	Schema  string
	Table   string
	Kind    ActionKind
	Columns []Column
}

type ActionKind added in v1.0.0

type ActionKind string
const (
	ActionKindInsert ActionKind = "INSERT"
	ActionKindUpdate ActionKind = "UPDATE"
	ActionKindDelete ActionKind = "DELETE"
)

kind of wall message.

type Begin added in v1.0.0

type Begin struct {
	// Identifies the message as a begin message.
	LSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
	// 	Xid of the transaction.
	XID int32
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type BinaryParser added in v1.0.0

type BinaryParser struct {
	// contains filtered or unexported fields
}

func NewBinaryParser added in v1.0.0

func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser

func (*BinaryParser) ParseWalMessage added in v1.0.0

func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error

type Column added in v1.0.0

type Column struct {
	// contains filtered or unexported fields
}

func (*Column) AssertValue added in v1.0.0

func (c *Column) AssertValue(src []byte)

type Commit added in v1.0.0

type Commit struct {
	// Flags; currently unused (must be 0).
	Flags int8
	// The LSN of the commit.
	LSN int64
	// The end LSN of the transaction.
	TransactionLSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type DataType added in v1.0.0

type DataType struct {
	// ID of the data type.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// name of the data type.
	Name string
}

type Delete added in v1.0.0

type Delete struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type Event

type Event struct {
	ID        uuid.UUID              `json:"id"`
	Schema    string                 `json:"schema"`
	Table     string                 `json:"table"`
	Action    string                 `json:"action"`
	Data      map[string]interface{} `json:"data"`
	EventTime time.Time              `json:"commitTime"`
}

Event event structure for publishing to the NATS server.

func (Event) GetSubjectName

func (e Event) GetSubjectName(prefix string) string

GetSubjectName creates subject name from the prefix, schema and table name.

func (Event) MarshalEasyJSON

func (v Event) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Event) MarshalJSON

func (v Event) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Event) UnmarshalEasyJSON

func (v *Event) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Event) UnmarshalJSON

func (v *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type Insert added in v1.0.0

type Insert struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type Listener

type Listener struct {
	LSN uint64
	// contains filtered or unexported fields
}

Listener main service struct.

func NewWalListener

func NewWalListener(
	cfg *config.Config,
	repo repository,
	repl replication,
	publ publisher,
	parser parser,
) *Listener

func (*Listener) AckWalMessage

func (l *Listener) AckWalMessage(lsn uint64) error

AckWalMessage acknowledge received wal message.

func (*Listener) Process

func (l *Listener) Process() error

Process is main service entry point.

func (*Listener) SendPeriodicHeartbeats

func (l *Listener) SendPeriodicHeartbeats(ctx context.Context)

SendPeriodicHeartbeats send periodic keep alive hearbeats to the server.

func (*Listener) SendStandbyStatus

func (l *Listener) SendStandbyStatus() error

SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.

func (*Listener) Stop

func (l *Listener) Stop() error

Stop is a finalizer function.

func (*Listener) Stream

func (l *Listener) Stream(ctx context.Context)

Stream receive event from PostgreSQL. Accept message, apply filter and publish it in NATS server.

type NatsPublisher

type NatsPublisher struct {
	// contains filtered or unexported fields
}

func NewNatsPublisher

func NewNatsPublisher(conn stan.Conn) *NatsPublisher

func (NatsPublisher) Close

func (n NatsPublisher) Close() error

func (NatsPublisher) Publish

func (n NatsPublisher) Publish(subject string, event Event) error

type Origin added in v1.0.0

type Origin struct {
	// The LSN of the commit on the origin server.
	LSN int64
	// name of the origin.
	Name string
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type Relation added in v1.0.0

type Relation struct {
	// ID of the relation.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// Relation name.
	Name string
	// Replica identity setting for the relation (same as relreplident in pg_class).
	Replica int8
	Columns []RelationColumn
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type RelationColumn added in v1.0.0

type RelationColumn struct {
	// Flags for the column which marks the column as part of the key.
	Key bool
	// name of the column.
	Name string
	// ID of the column's data type.
	TypeID int32
	// valueType modifier of the column (atttypmod).
	ModifierType int32
}

type RelationData added in v1.0.0

type RelationData struct {
	Schema  string
	Table   string
	Columns []Column
}

type TupleData added in v1.0.0

type TupleData struct {
	Value []byte
}

type Update added in v1.0.0

type Update struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
	// TupleData message part representing the contents of the old tuple or primary key.
	//Only present if the previous 'O' or 'K' part is present.
	OldRow []TupleData
}

Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#

type WalTransaction added in v1.0.0

type WalTransaction struct {
	LSN           int64
	BeginTime     *time.Time
	CommitTime    *time.Time
	RelationStore map[int32]RelationData
	Actions       []ActionData
}

func NewWalTransaction added in v1.0.0

func NewWalTransaction() *WalTransaction

func (*WalTransaction) Clear added in v1.0.0

func (w *WalTransaction) Clear()

func (WalTransaction) CreateActionData added in v1.0.0

func (w WalTransaction) CreateActionData(
	relationID int32,
	rows []TupleData,
	kind ActionKind,
) (a ActionData, err error)

func (*WalTransaction) CreateEventsWithFilter added in v1.0.0

func (w *WalTransaction) CreateEventsWithFilter(
	tableMap map[string][]string) []Event

CreateEventsWithFilter filter wal message by table, action and create events for each value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL