Documentation ¶
Index ¶
- Constants
- func NewMockPublisher() *publisherMock
- type ActionData
- type ActionKind
- type Begin
- type BinaryParser
- type Column
- type Commit
- type DataType
- type Delete
- type Event
- type Insert
- type Listener
- type NatsPublisher
- type Origin
- type Relation
- type RelationColumn
- type RelationData
- type RepositoryImpl
- type TupleData
- type Update
- type WalTransaction
Constants ¶
const ( ErrPostgresConnection = "db connection error" ErrReplicationConnection = "replication connection error" ErrPublishEvent = "publish message error" ErrUnmarshalMsg = "unmarshal wal message error" ErrAckWalMessage = "acknowledge wal message error" ErrSendStandbyStatus = "send standby status error" )
Constants with error text message
const ( StartServiceMessage = "service was started" StopServiceMessage = "service was stopped" )
Service info message.
const ( // CommitMsgType protocol commit message type. CommitMsgType byte = 'C' // BeginMsgType protocol begin message type. BeginMsgType byte = 'B' // OriginMsgType protocol original message type. OriginMsgType byte = 'O' // RelationMsgType protocol relation message type. RelationMsgType byte = 'R' // TypeMsgType protocol message type. TypeMsgType byte = 'Y' // InsertMsgType protocol insert message type. InsertMsgType byte = 'I' // UpdateMsgType protocol update message type. UpdateMsgType byte = 'U' // DeleteMsgType protocol delete message type. DeleteMsgType byte = 'D' // NewTupleDataType protocol new tuple data type. NewTupleDataType byte = 'N' // TextDataType protocol test data type. TextDataType byte = 't' // NullDataType protocol NULL data type. NullDataType byte = 'n' // ToastDataType protocol toast data type. ToastDataType byte = 'u' )
Variables ¶
This section is empty.
Functions ¶
func NewMockPublisher ¶
func NewMockPublisher() *publisherMock
Types ¶
type ActionData ¶
type ActionData struct { Schema string Table string Kind ActionKind Columns []Column }
ActionData kind of WAL message data.
type ActionKind ¶
type ActionKind string
ActionKind kind of action on WAL message.
const ( ActionKindInsert ActionKind = "INSERT" ActionKindUpdate ActionKind = "UPDATE" ActionKindDelete ActionKind = "DELETE" )
kind of WAL message.
type Begin ¶
type Begin struct { // Identifies the message as a begin message. LSN int64 // Commit timestamp of the transaction. Timestamp time.Time // Xid of the transaction. XID int32 }
Begin message format.
type BinaryParser ¶
type BinaryParser struct {
// contains filtered or unexported fields
}
BinaryParser represent binary protocol parser.
func NewBinaryParser ¶
func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser
NewBinaryParser create instance of binary parser.
func (*BinaryParser) ParseWalMessage ¶
func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error
ParseWalMessage parse postgres WAL message.
type Column ¶
type Column struct {
// contains filtered or unexported fields
}
Column of the table with which changes occur.
func (*Column) AssertValue ¶
AssertValue converts bytes to a specific type depending on the type of this data in the database table.
type Commit ¶
type Commit struct { // Flags; currently unused (must be 0). Flags int8 // The LSN of the commit. LSN int64 // The end LSN of the transaction. TransactionLSN int64 // Commit timestamp of the transaction. Timestamp time.Time }
Commit message format.
type DataType ¶
type DataType struct { // ID of the data type. ID int32 // Namespace (empty string for pg_catalog). Namespace string // name of the data type. Name string }
DataType path of WAL message data.
type Delete ¶
type Delete struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData }
Delete message format.
type Event ¶
type Event struct { ID uuid.UUID `json:"id"` Schema string `json:"schema"` Table string `json:"table"` Action string `json:"action"` Data map[string]interface{} `json:"data"` EventTime time.Time `json:"commitTime"` Topic string `json:"-"` }
Event event structure for publishing to the NATS server.
func (Event) GetSubjectName ¶
GetSubjectName creates subject name from the prefix, schema and table name.
func (Event) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Event) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Event) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Event) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type Insert ¶
type Insert struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData }
Insert message format.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener main service struct.
func NewWalListener ¶
func NewWalListener( cfg *config.Config, repo repository, repl replication, publ publisher, parser parser, ) *Listener
NewWalListener create and initialize new service instance.
func (*Listener) AckWalMessage ¶
AckWalMessage acknowledge received wal message.
func (*Listener) SendPeriodicHeartbeats ¶
SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.
func (*Listener) SendStandbyStatus ¶
SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.
type NatsPublisher ¶
type NatsPublisher struct {
// contains filtered or unexported fields
}
NatsPublisher represent event publisher.
type Origin ¶
type Origin struct { // The LSN of the commit on the origin server. LSN int64 // name of the origin. Name string }
Origin message format.
type Relation ¶
type Relation struct { // ID of the relation. ID int32 // Namespace (empty string for pg_catalog). Namespace string // Relation name. Name string // Replica identity setting for the relation (same as relreplident in pg_class). Replica int8 Columns []RelationColumn }
Relation message format.
type RelationColumn ¶
type RelationColumn struct { // Flags for the column which marks the column as part of the key. Key bool // name of the column. Name string // ID of the column's data type. TypeID int32 // valueType modifier of the column (atttypmod). ModifierType int32 }
RelationColumn path of WAL message data.
type RelationData ¶
RelationData kind of WAL message data.
type RepositoryImpl ¶
type RepositoryImpl struct {
// contains filtered or unexported fields
}
RepositoryImpl service repository.
func NewRepository ¶
func NewRepository(conn *pgx.Conn) *RepositoryImpl
NewRepository returns a new instance of the repository.
func (RepositoryImpl) CreatePublication ¶
func (r RepositoryImpl) CreatePublication(name string) error
CreatePublication create publication fo all.
func (RepositoryImpl) GetSlotLSN ¶
func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)
GetSlotLSN returns the value of the last offset for a specific slot.
func (RepositoryImpl) IsAlive ¶
func (r RepositoryImpl) IsAlive() bool
IsAlive check database connection problems.
type Update ¶
type Update struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. Row []TupleData // TupleData message part representing the contents of the old tuple or primary key. //Only present if the previous 'O' or 'K' part is present. OldRow []TupleData }
Update message format.
type WalTransaction ¶
type WalTransaction struct { LSN int64 BeginTime *time.Time CommitTime *time.Time RelationStore map[int32]RelationData Actions []ActionData }
WalTransaction transaction specified WAL message.
func NewWalTransaction ¶
func NewWalTransaction() *WalTransaction
NewWalTransaction create and initialize new WAL transaction.
func (WalTransaction) CreateActionData ¶
func (w WalTransaction) CreateActionData( relationID int32, rows []TupleData, kind ActionKind, ) (a ActionData, err error)
CreateActionData create action from WAL message data.
func (*WalTransaction) CreateEventsWithFilter ¶
func (w *WalTransaction) CreateEventsWithFilter( tableMap map[string]config.Table) []Event
CreateEventsWithFilter filter WAL message by table, action and create events for each value.