Documentation ¶
Overview ¶
Defines a changelog Entry, which is a sum type of Schema or Modification. A stream of entries is provided as input to sinks, which export the data to the destination target.
Use pkg/subsciption to generate a changelog from a logical replication connection, or pkg/imports to load changes from existing Postgres data.
Index ¶
Constants ¶
const ( ModificationOperationImport = "IMPORT" ModificationOperationInsert = "INSERT" ModificationOperationUpdate = "UPDATE" ModificationOperationDelete = "DELETE" )
Variables ¶
var ModificationBuilder = modificationBuilderFunc(func(opts ...func(*Modification)) *Modification { m := &Modification{} for _, opt := range opts { opt(m) } if m.Name == "" { panic("missing modification.name") } if m.Namespace == "" { panic("missing modification.namespace") } return m })
ModificationBuilder provides a fluent interface around constructing Modifications. This is used by tests to easily create fixtures.
Functions ¶
This section is empty.
Types ¶
type Changelog ¶
type Changelog chan Entry
Changelog is the goal of pgsink, a channel of schema and modification messages. Any source of these changes, be it a logical replication subscription or an import job, must produce this channel.
type Entry ¶
type Entry struct { *Modification *Schema }
Entry is a poor-mans sum type for generic changelog entries
type Modification ¶
type Modification struct { Timestamp time.Time `json:"timestamp"` // commit timestamp, or time of import Namespace string `json:"namespace"` // Postgres schema Name string `json:"name"` // Postgres table name LSN *uint64 `json:"lsn"` // log sequence number, where appropriate Before interface{} `json:"before"` // row before modification, if relevant After interface{} `json:"after"` // row after modification }
Modification represents a row that was changed in the source database. The Before and After fields contain Golang native types that represent the row, both before and after this modification.
func (Modification) AfterOrBefore ¶
func (m Modification) AfterOrBefore() map[string]interface{}
AfterOrBefore provides the last existing row contents in the database. If this is a deletion, we'll get the contents of the row before the deletion, otherwise the after.
func (Modification) Operation ¶
func (m Modification) Operation() string
Operation infers the type of operation that generated this entry. It may become expensive to compute these on the fly each time, at which point we should make it a struct field on Modification.
func (Modification) TableReference ¶
func (m Modification) TableReference() string
type Schema ¶
type Schema struct { Timestamp time.Time `json:"timestamp"` // commit timestamp Namespace string `json:"namespace"` // Postgres schema Name string `json:"name"` // Postgres table name LSN *uint64 `json:"lsn"` // log sequence number, where appropriate Spec SchemaSpecification `json:"spec"` // schema definition }
Schema defines the structure of data pulled from Postgres. It can be generated from an existing logical.Relation, when combined with a decoder that translates the Postgres types to Golang.
In future, we'll want to be able to translate this schema type into official formats, like Avro.
func SchemaFromRelation ¶
SchemaFromRelation uses a logical.Relation and decoder to generate an intermediate schema
func (Schema) GetFingerprint ¶
GetFingerprint returns a unique idenfier for the schema.
The only important thing is that any given schema returns the same fingerprint for the duration of the Go process. Beyond that, you can use any value here.