pgtypeavro

package
v0.0.0-...-c45e23a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2023 License: MIT Imports: 10 Imported by: 0

README

pgtypeavro

A simple "lib" that makes allows for generating Avro schemas from a PostgreSQL database. Specifically, it makes it possible to convert a pglogrepl.RelationMessage into an Avro schema.

It has support for all primitive avro types, complex types, as well as the following logical types:

  • decimal
  • uuid
  • date
  • time-millis
  • time-micros
  • timestamp-micros
  • local-timestamp-millis
  • local-timestamp-micros
  • duration

It has support for the following limited set of PostgreSQL types, as well as the array variants of these types:

  • bool
  • char
  • varchar
  • bpchar
  • date
  • float4
  • float8
  • int2
  • int4
  • int8
  • json
  • jsonb
  • text
  • time
  • timestamp
  • timestamptz
  • uuid
  • numeric

Example usage

package main

import (
	pgtypeavro "creek/pgtype-avro"
	"encoding/json"
	"log"

	"github.com/jackc/pglogrepl"
)

func main() {
	var relMsg pglogrepl.RelationMessage // A relation message from pglogrepl


	schema, err := pgtypeavro.New(relMsg).RelationMessageToAvro()
	if err != nil {
		log.Fatal(err)
	}

	bin, err := json.Marshal(schema)

	log.Println(string(bin))
}

Documentation

Index

Constants

View Source
const Infinity = "infinity"
View Source
const NegativeInfinity = "negative_infinity_ca5991f51367e3e4"

NegativeInfinity Since avro names can't start with "-", we use a "magic" prefix. When decoding, this should be converted to -infinity.

Variables

View Source
var (

	/// Base types
	Bool        = SQLType{Name: "bool", OID: pgtype.BoolOID}
	Char        = SQLType{Name: "char", OID: pgtype.QCharOID}
	VarChar     = SQLType{Name: "varchar", OID: pgtype.VarcharOID}
	BPChar      = SQLType{Name: "bpchar", OID: pgtype.BPCharOID}
	Date        = SQLType{Name: "date", OID: pgtype.DateOID}
	Float4      = SQLType{Name: "float4", OID: pgtype.Float4OID}
	Float8      = SQLType{Name: "float8", OID: pgtype.Float8OID}
	Int2        = SQLType{Name: "int2", OID: pgtype.Int2OID}
	Int4        = SQLType{Name: "int4", OID: pgtype.Int4OID}
	Int8        = SQLType{Name: "int8", OID: pgtype.Int8OID}
	JSON        = SQLType{Name: "json", OID: pgtype.JSONOID}
	JSONB       = SQLType{Name: "jsonb", OID: pgtype.JSONBOID}
	Text        = SQLType{Name: "text", OID: pgtype.TextOID}
	Time        = SQLType{Name: "time", OID: pgtype.TimeOID}
	Timestamp   = SQLType{Name: "timestamp", OID: pgtype.TimestampOID}
	Timestamptz = SQLType{Name: "timestamptz", OID: pgtype.TimestamptzOID}
	UUID        = SQLType{Name: "uuid", OID: pgtype.UUIDOID}
	Numeric     = SQLType{Name: "numeric", OID: pgtype.NumericOID}

	/// Array types
	BoolArray        = SQLType{Name: "_bool", OID: pgtype.BoolArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.BoolOID}}
	CharArray        = SQLType{Name: "_char", OID: pgtype.QCharArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.QCharOID}}
	VarCharArray     = SQLType{Name: "_varchar", OID: pgtype.VarcharArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.VarcharOID}}
	BPCharArray      = SQLType{Name: "_bpchar", OID: pgtype.BPCharArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.BPCharOID}}
	DateArray        = SQLType{Name: "_date", OID: pgtype.DateArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.DateOID}}
	Float4Array      = SQLType{Name: "_float4", OID: pgtype.Float4ArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.Float4OID}}
	Float8Array      = SQLType{Name: "_float8", OID: pgtype.Float8ArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.Float8OID}}
	Int2Array        = SQLType{Name: "_int2", OID: pgtype.Int2ArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.Int2OID}}
	Int4Array        = SQLType{Name: "_int4", OID: pgtype.Int4ArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.Int4OID}}
	Int8Array        = SQLType{Name: "_int8", OID: pgtype.Int8ArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.Int8OID}}
	JSONArray        = SQLType{Name: "_json", OID: pgtype.JSONArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.JSONOID}}
	JSONBArray       = SQLType{Name: "_jsonb", OID: pgtype.JSONBArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.JSONBOID}}
	TextArray        = SQLType{Name: "_text", OID: pgtype.TextArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.TextOID}}
	TimeArray        = SQLType{Name: "_time", OID: pgtype.TimeArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.TimeOID}}
	TimestampArray   = SQLType{Name: "_timestamp", OID: pgtype.TimestampArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.TimestampOID}}
	TimestamptzArray = SQLType{Name: "_timestamptz", OID: pgtype.TimestamptzArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.TimestamptzOID}}
	UUIDArray        = SQLType{Name: "_uuid", OID: pgtype.UUIDArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.UUIDOID}}
	NumericArray     = SQLType{Name: "_numeric", OID: pgtype.NumericArrayOID, Variant: &SQLTypeVariant{Kind: SQLTypeArray, ItemOID: pgtype.NumericOID}}
)

Functions

func FoldWhile

func FoldWhile[I any, A any](slice []I, combined func(accumulator A, val I) (A, bool), init A) A

func MapToNativeTypes

func MapToNativeTypes(m map[string]any) (map[string]any, error)

Types

type Array

type Array struct {
	Type    Type   `json:"type"`
	Items   Schema `json:"items"`
	Default []any  `json:"default,omitempty"`
	PgType  string `json:"pgType,omitempty"`
}

func (*Array) TypeName

func (a *Array) TypeName() Type

type AvroConverter

type AvroConverter struct {
	// contains filtered or unexported fields
}

func (*AvroConverter) RelationMessageKeysToAvro

func (a *AvroConverter) RelationMessageKeysToAvro() (*Record, error)

func (*AvroConverter) RelationMessageToAvro

func (a *AvroConverter) RelationMessageToAvro() (*Record, error)

type DerivedType

type DerivedType struct {
	Type        Type        `json:"type"`
	LogicalType LogicalType `json:"logicalType"`
	Precision   int         `json:"precision,omitempty"`
	Scale       int         `json:"scale,omitempty"`
}

func (*DerivedType) TypeName

func (t *DerivedType) TypeName() Type

type Enum

type Enum struct {
	Type      Type     `json:"type"`
	Name      string   `json:"name"`
	Namespace string   `json:"namespace,omitempty"`
	Doc       string   `json:"doc,omitempty"`
	Symbols   []string `json:"symbols"`
	Default   string   `json:"default,omitempty"`
	Aliases   []string `json:"aliases,omitempty"`
}

func (*Enum) TypeName

func (e *Enum) TypeName() Type

type LogicalType

type LogicalType Type
const (
	LogicalTypeDecimal              LogicalType = "decimal"
	LogicalTypeUUID                 LogicalType = "uuid"
	LogicalTypeDate                 LogicalType = "date"
	LogicalTypeTimeMillis           LogicalType = "time-millis"
	LogicalTypeTimeMicros           LogicalType = "time-micros"
	LogicalTypeTimestampMicros      LogicalType = "timestamp-micros"
	LogicalTypeLocalTimestampMillis LogicalType = "local-timestamp-millis"
	LogicalTypeLocalTimestampMicros LogicalType = "local-timestamp-micros"
	LogicalTypeDuration             LogicalType = "duration"
)

type Order

type Order string
const (
	OrderAscending  Order = "ascending"
	OrderDescending Order = "descending"
	OrderIgnore     Order = "ignore"
)

type PgToSchema

type PgToSchema interface {
	ToSchema() Schema
}

type PgType

type PgType struct {
	Mapper PgToSchema
	Name   string
	OID    uint32
}

type Record

type Record struct {
	Type      Type           `json:"type"`
	Name      string         `json:"name"`
	Namespace string         `json:"namespace,omitempty"`
	Doc       string         `json:"doc,omitempty"`
	Aliases   []string       `json:"aliases,omitempty"`
	Fields    []*RecordField `json:"fields"`
}

func (*Record) TypeName

func (r *Record) TypeName() Type

type RecordField

type RecordField struct {
	Name    string   `json:"name"`
	Doc     string   `json:"doc,omitempty"`
	Type    Schema   `json:"type"`
	Default any      `json:"default,omitempty"`
	Order   Order    `json:"order,omitempty"`
	Aliases []string `json:"aliases,omitempty"`
	PgType  string   `json:"pgType,omitempty"`
	PgKey   *bool    `json:"pgKey,omitempty"`
}

type SQLType

type SQLType struct {
	Name    string
	OID     uint32
	Variant *SQLTypeVariant
}

type SQLTypeKind

type SQLTypeKind string
const (
	SQLTypeArray      SQLTypeKind = "array"
	SQLTypeRange      SQLTypeKind = "range"
	SQLTypeMultirange SQLTypeKind = "multirange"
)

Only array types are supported at the moment

type SQLTypeMap

type SQLTypeMap struct {
	// contains filtered or unexported fields
}

func NewTypeMap

func NewTypeMap() *SQLTypeMap

func (*SQLTypeMap) RegisterType

func (tm *SQLTypeMap) RegisterType(t *SQLType)

func (*SQLTypeMap) TypeForOID

func (tm *SQLTypeMap) TypeForOID(oid uint32) (*SQLType, bool)

type SQLTypeVariant

type SQLTypeVariant struct {
	Kind    SQLTypeKind
	ItemOID uint32
}

type Schema

type Schema interface {
	TypeName() Type
}

type Type

type Type string
const (
	// Primitive types
	TypeNull   Type = "null"
	TypeBool   Type = "boolean"
	TypeInt    Type = "int"
	TypeLong   Type = "long"
	TypeFloat  Type = "float"
	TypeDouble Type = "double"

	TypeBytes Type = "bytes"
	TypeStr   Type = "string"

	// Complex types
	TypeRecord Type = "record"
	TypeEnum   Type = "enum"
	TypeArray  Type = "array"
	TypeMap    Type = "map"
	TypeUnion  Type = "union"
	TypeFixed  Type = "fixed"
)

func (Type) TypeName

func (t Type) TypeName() Type

type Union

type Union []Schema

func NewUnion

func NewUnion(schema Schema, schemas ...Schema) Union

func (Union) Members

func (u Union) Members() []Schema

func (Union) TypeName

func (u Union) TypeName() Type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL