changelog

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Defines a changelog Entry, which is a sum type of Schema or Modification. A stream of entries is provided as input to sinks, which export the data to the destination target.

Use pkg/subsciption to generate a changelog from a logical replication connection, or pkg/imports to load changes from existing Postgres data.

Index

Constants

View Source
const (
	ModificationOperationImport = "IMPORT"
	ModificationOperationInsert = "INSERT"
	ModificationOperationUpdate = "UPDATE"
	ModificationOperationDelete = "DELETE"
)

Variables

View Source
var ModificationBuilder = modificationBuilderFunc(func(opts ...func(*Modification)) *Modification {
	m := &Modification{}
	for _, opt := range opts {
		opt(m)
	}

	if m.Name == "" {
		panic("missing modification.name")
	}
	if m.Namespace == "" {
		panic("missing modification.namespace")
	}

	return m
})

ModificationBuilder provides a fluent interface around constructing Modifications. This is used by tests to easily create fixtures.

Functions

This section is empty.

Types

type Changelog

type Changelog chan Entry

Changelog is the goal of pgsink, a channel of schema and modification messages. Any source of these changes, be it a logical replication subscription or an import job, must produce this channel.

type Entry

type Entry struct {
	*Modification
	*Schema
}

Entry is a poor-mans sum type for generic changelog entries

func (Entry) Unwrap

func (e Entry) Unwrap() interface{}

type Modification

type Modification struct {
	Timestamp time.Time   `json:"timestamp"` // commit timestamp, or time of import
	Namespace string      `json:"namespace"` // Postgres schema
	Name      string      `json:"name"`      // Postgres table name
	LSN       *uint64     `json:"lsn"`       // log sequence number, where appropriate
	Before    interface{} `json:"before"`    // row before modification, if relevant
	After     interface{} `json:"after"`     // row after modification
}

Modification represents a row that was changed in the source database. The Before and After fields contain Golang native types that represent the row, both before and after this modification.

func (Modification) AfterOrBefore

func (m Modification) AfterOrBefore() map[string]interface{}

AfterOrBefore provides the last existing row contents in the database. If this is a deletion, we'll get the contents of the row before the deletion, otherwise the after.

func (Modification) Operation

func (m Modification) Operation() string

Operation infers the type of operation that generated this entry. It may become expensive to compute these on the fly each time, at which point we should make it a struct field on Modification.

func (Modification) TableReference

func (m Modification) TableReference() string

type Schema

type Schema struct {
	Timestamp time.Time           `json:"timestamp"` // commit timestamp
	Namespace string              `json:"namespace"` // Postgres schema
	Name      string              `json:"name"`      // Postgres table name
	LSN       *uint64             `json:"lsn"`       // log sequence number, where appropriate
	Spec      SchemaSpecification `json:"spec"`      // schema definition
}

Schema defines the structure of data pulled from Postgres. It can be generated from an existing logical.Relation, when combined with a decoder that translates the Postgres types to Golang.

In future, we'll want to be able to translate this schema type into official formats, like Avro.

func SchemaFromRelation

func SchemaFromRelation(timestamp time.Time, lsn *uint64, relation *logical.Relation) Schema

SchemaFromRelation uses a logical.Relation and decoder to generate an intermediate schema

func (Schema) GetFingerprint

func (s Schema) GetFingerprint() string

GetFingerprint returns a unique idenfier for the schema.

The only important thing is that any given schema returns the same fingerprint for the duration of the Go process. Beyond that, you can use any value here.

func (Schema) TableReference

func (s Schema) TableReference() string

type SchemaSpecification

type SchemaSpecification struct {
	Columns []logical.Column `json:"columns"` // Postgres columns
}

type Table

type Table struct {
	Schema    string `json:"schema"`
	TableName string `json:"table_name"`
}

Table uniquely identifies a Postgres table, by both schema and table name.

func (Table) String

func (t Table) String() string

type Tables

type Tables []Table

func (Tables) Diff

func (s1 Tables) Diff(s2 Tables) Tables

func (Tables) Includes

func (ss Tables) Includes(s Table) bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL