Documentation
¶
Index ¶
- Constants
- func NewRepository(conn *pgx.Conn) *repositoryImpl
- type ActionData
- type ActionKind
- type Begin
- type BinaryParser
- type Column
- type Commit
- type DataType
- type Delete
- type Event
- type Insert
- type Listener
- type NatsPublisher
- type Origin
- type Relation
- type RelationColumn
- type RelationData
- type TupleData
- type Update
- type WalTransaction
Constants ¶
const ( ErrPostgresConnection = "db connection error" ErrReplicationConnection = "replication connection error" ErrNatsConnection = "nats connection error" ErrPublishEvent = "publish message error" ErrUnmarshalMsg = "unmarshal wal message error" ErrAckWalMessage = "acknowledge wal message error" ErrSendStandbyStatus = "send standby status error" )
Constants with error text message
const ( StartServiceMessage = "service was started" StopServiceMessage = "service was stopped" )
Service info message.
const ( CommitMsgType byte = 'C' BeginMsgType byte = 'B' OriginMsgType byte = 'O' RelationMsgType byte = 'R' TypeMsgType byte = 'Y' InsertMsgType byte = 'I' UpdateMsgType byte = 'U' DeleteMsgType byte = 'D' NewTupleDataType byte = 'N' TextDataType byte = 't' NullDataType byte = 'n' ToastDataType byte = 'u' )
Variables ¶
This section is empty.
Functions ¶
func NewRepository ¶
Types ¶
type ActionData ¶ added in v1.0.0
type ActionData struct { Schema string Table string Kind ActionKind Columns []Column }
type ActionKind ¶ added in v1.0.0
type ActionKind string
const ( ActionKindInsert ActionKind = "INSERT" ActionKindUpdate ActionKind = "UPDATE" ActionKindDelete ActionKind = "DELETE" )
kind of wall message.
type Begin ¶ added in v1.0.0
type Begin struct { // Identifies the message as a begin message. LSN int64 // Commit timestamp of the transaction. Timestamp time.Time // Xid of the transaction. XID int32 }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type BinaryParser ¶ added in v1.0.0
type BinaryParser struct {
// contains filtered or unexported fields
}
func NewBinaryParser ¶ added in v1.0.0
func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser
func (*BinaryParser) ParseWalMessage ¶ added in v1.0.0
func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error
type Column ¶ added in v1.0.0
type Column struct {
// contains filtered or unexported fields
}
func (*Column) AssertValue ¶ added in v1.0.0
type Commit ¶ added in v1.0.0
type Commit struct { // Flags; currently unused (must be 0). Flags int8 // The LSN of the commit. LSN int64 // The end LSN of the transaction. TransactionLSN int64 // Commit timestamp of the transaction. Timestamp time.Time }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type Delete ¶ added in v1.0.0
type Delete struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type Event ¶
type Event struct { ID uuid.UUID `json:"id"` Schema string `json:"schema"` Table string `json:"table"` Action string `json:"action"` Data map[string]interface{} `json:"data"` EventTime time.Time `json:"commitTime"` }
Event event structure for publishing to the NATS server.
func (Event) GetSubjectName ¶
GetSubjectName creates subject name from the prefix, schema and table name.
func (Event) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Event) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Event) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Event) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type Insert ¶ added in v1.0.0
type Insert struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type Listener ¶
type Listener struct { LSN uint64 // contains filtered or unexported fields }
Listener main service struct.
func NewWalListener ¶
func (*Listener) AckWalMessage ¶
AckWalMessage acknowledge received wal message.
func (*Listener) SendPeriodicHeartbeats ¶
SendPeriodicHeartbeats send periodic keep alive hearbeats to the server.
func (*Listener) SendStandbyStatus ¶
SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.
type NatsPublisher ¶
type NatsPublisher struct {
// contains filtered or unexported fields
}
func NewNatsPublisher ¶
func NewNatsPublisher(conn stan.Conn) *NatsPublisher
func (NatsPublisher) Close ¶
func (n NatsPublisher) Close() error
type Origin ¶ added in v1.0.0
type Origin struct { // The LSN of the commit on the origin server. LSN int64 // name of the origin. Name string }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type Relation ¶ added in v1.0.0
type Relation struct { // ID of the relation. ID int32 // Namespace (empty string for pg_catalog). Namespace string // Relation name. Name string // Replica identity setting for the relation (same as relreplident in pg_class). Replica int8 Columns []RelationColumn }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type RelationColumn ¶ added in v1.0.0
type RelationData ¶ added in v1.0.0
type Update ¶ added in v1.0.0
type Update struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData // TupleData message part representing the contents of the old tuple or primary key. //Only present if the previous 'O' or 'K' part is present. OldRow []TupleData }
Logical Replication Message Formats. https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats#
type WalTransaction ¶ added in v1.0.0
type WalTransaction struct { LSN int64 BeginTime *time.Time CommitTime *time.Time RelationStore map[int32]RelationData Actions []ActionData }
func NewWalTransaction ¶ added in v1.0.0
func NewWalTransaction() *WalTransaction
func (*WalTransaction) Clear ¶ added in v1.0.0
func (w *WalTransaction) Clear()
func (WalTransaction) CreateActionData ¶ added in v1.0.0
func (w WalTransaction) CreateActionData( relationID int32, rows []TupleData, kind ActionKind, ) (a ActionData, err error)
func (*WalTransaction) CreateEventsWithFilter ¶ added in v1.0.0
func (w *WalTransaction) CreateEventsWithFilter( tableMap map[string][]string) []Event
CreateEventsWithFilter filter wal message by table, action and create events for each value.