schemalog

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SchemaName = "pgstream"
	TableName  = "schema_log"
)

Variables

View Source
var ErrNoRows = errors.New("no rows")

Functions

This section is empty.

Types

type Column

type Column struct {
	Name         string  `json:"name"`
	DataType     string  `json:"type"`
	DefaultValue *string `json:"default,omitempty"`
	Nullable     bool    `json:"nullable"`
	Unique       bool    `json:"unique"`
	// Metadata is NOT typed here because we don't fully control the content that is sent from the publisher.
	Metadata   *string `json:"metadata"`
	PgstreamID string  `json:"pgstream_id"`
}

func (*Column) IsEqual

func (c *Column) IsEqual(other *Column) bool

type LogEntry

type LogEntry struct {
	ID         xid.ID                   `json:"id"`
	Version    int64                    `json:"version"`
	SchemaName string                   `json:"schema_name"`
	CreatedAt  SchemaCreatedAtTimestamp `json:"created_at"`
	Schema     Schema                   `json:"schema"`
	// Acked indicates if the schema has been processed and acknowledged by
	// pgstream after being updated in the source database
	Acked bool `json:"acked"`
}

LogEntry contains the information relating to a schema log

func (*LogEntry) After

func (m *LogEntry) After(other *LogEntry) bool

func (*LogEntry) Diff

func (m *LogEntry) Diff(previous *LogEntry) *SchemaDiff

func (*LogEntry) GetTableByName

func (m *LogEntry) GetTableByName(tableName string) *Table

func (*LogEntry) IsEmpty

func (m *LogEntry) IsEmpty() bool

func (*LogEntry) IsEqual

func (m *LogEntry) IsEqual(other *LogEntry) bool

func (*LogEntry) UnmarshalJSON

func (m *LogEntry) UnmarshalJSON(b []byte) error

type Schema

type Schema struct {
	Tables []Table `json:"tables"`
	// Dropped will be true if the schema has been deleted
	Dropped bool `json:"dropped,omitempty"`
}

func (*Schema) Diff

func (s *Schema) Diff(previous *Schema) *SchemaDiff

func (*Schema) IsEqual

func (s *Schema) IsEqual(other *Schema) bool

func (*Schema) MarshalJSON

func (s *Schema) MarshalJSON() ([]byte, error)

type SchemaCreatedAtTimestamp

type SchemaCreatedAtTimestamp struct {
	time.Time
}

SchemaCreatedAtTimestamp is a wrapper around time.Time that allows us to parse to and from the PG timestamp format.

func NewSchemaCreatedAtTimestamp

func NewSchemaCreatedAtTimestamp(t time.Time) SchemaCreatedAtTimestamp

func (SchemaCreatedAtTimestamp) MarshalJSON

func (s SchemaCreatedAtTimestamp) MarshalJSON() ([]byte, error)

func (*SchemaCreatedAtTimestamp) Scan

func (s *SchemaCreatedAtTimestamp) Scan(src interface{}) error

func (SchemaCreatedAtTimestamp) TimestampValue

func (s SchemaCreatedAtTimestamp) TimestampValue() (pgtype.Timestamp, error)

func (*SchemaCreatedAtTimestamp) UnmarshalJSON

func (s *SchemaCreatedAtTimestamp) UnmarshalJSON(b []byte) error

type SchemaDiff

type SchemaDiff struct {
	TablesToRemove      []Table
	ColumnsToAdd        []Column
	PrimaryKeyChange    []string
	UniqueNotNullChange []string
}

func (*SchemaDiff) Empty

func (d *SchemaDiff) Empty() bool

type Store

type Store interface {
	Fetch(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)
	Ack(ctx context.Context, le *LogEntry) error
	Close() error
}

type StoreCache

type StoreCache struct {
	// contains filtered or unexported fields
}

StoreCache is a wrapper around a schemalog Store that provides an in memory caching mechanism to reduce the amount of calls to the database. It is not concurrency safe.

func NewStoreCache

func NewStoreCache(store Store) *StoreCache

func (*StoreCache) Ack

func (s *StoreCache) Ack(ctx context.Context, entry *LogEntry) error

func (*StoreCache) Close

func (s *StoreCache) Close() error

func (*StoreCache) Fetch

func (s *StoreCache) Fetch(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)

type Table

type Table struct {
	Oid               string   `json:"oid"`
	Name              string   `json:"name"`
	Columns           []Column `json:"columns"`
	PrimaryKeyColumns []string `json:"primary_key_columns"`
	// PgstreamID is a unique identifier of the table generated by pgstream
	PgstreamID string `json:"pgstream_id"`
}

func (*Table) GetColumnByName

func (t *Table) GetColumnByName(name string) *Column

func (*Table) GetFirstUniqueNotNullColumn

func (t *Table) GetFirstUniqueNotNullColumn() *Column

GetFirstUniqueNotNullColumn will return the first unique not null column in the table. It will sort the columns by pgstream ID, and return the first one matching the not null/unique constraints. It uses the pgstream id instead of the name since the id doesn't change.

func (*Table) IsEqual

func (t *Table) IsEqual(other *Table) bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL