schemaregistry

package module
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: MIT Imports: 13 Imported by: 0

README

Schema Registry Client

GoDoc

This repository contains wrapper function to communicate Confluent's Kafka schema registry via REST API and schema dynamic sync directly from kafka topic.

Client

Download library using go get -u github.com/tryfix/schema-registry/v2

Following code slice create a schema registry client

import schemaregistry "github.com/tryfix/schemaregistry/v2"

registry, _ := NewRegistry(
		`http://localhost:8081/`,
		WithBackgroundSync(5*time.Second),
	)

Register an event com.example.events.test with version 1

import schemaregistry "github.com/tryfix/schemaregistry/v2"

if err := registry.Register(`com.example.events.test`, 1, func(unmarshaler Unmarshaler) (v interface{}, err error) {
		record := SampleRecord{}
		if err := unmarshaler.Unmarshal(&record); err != nil {
			return nil, err
		}

		return record, nil
	}); err != nil {
		log.Fatal(err)
	}

Message encoding/decoding using above registered schema

// avro message structure
type SampleRecord struct {
Field1 int     `avro:"field1"`
Field2 float64 `avro:"field2"`
Field3 string  `avro:"field3"`
}

// Get encoder  
encoder := registry.WithSchema(`com.example.events.test`, 1)

// Sample message
record := SampleRecord{
Field1: 1,
Field2: 2.0,
Field3: "text",
}

// Message encode to byte array
bytePayload, err := encoder.Encode(record)
if err!=nil {
    panic(err)
}

// Decode message
ev, err := encoder.Decode(bytePayload)
if err != nil {
panic(err)
}
fmt.Printf("%+v", ev)

Message can be decoded through generic encoder as below

// Decode message as generic encoder
ev, err := registry.GenericEncoder().Decode(bytePayload) // Returns SampleRecord
if err != nil {
panic(err)
}

Message Structure

Encoded messages are published with magic byte and a schema ID attached to it. Following structure shows the message format used in the library to encode the message.

+====================+=== =================+======================+
| Magic byte(1 byte) | Schema ID(4 bytes) | Payload              |
+====================+====================+======================+

ToDo

  • write benchmarks

Documentation

Overview

Example (Avro)
// Init a new schema registry instance and connect
url := `http://localhost:8081/`
registry, err := NewRegistry(
	url,
	WithLogger(log.NewLog().Log(log.WithLevel(log.TRACE))),
	WithBackgroundSync(5*time.Second),
	// MockClient for examples only
	WithMockClient(srclient.CreateMockSchemaRegistryClient(url)),
)
if err != nil {
	log.Fatal(err)
}

// Start Background Sync to detect new Versions
if err := registry.Sync(); err != nil {
	log.Fatal(err)
}

type SampleRecord struct {
	Field1 int     `avro:"field1"`
	Field2 float64 `avro:"field2"`
	Field3 string  `avro:"field3"`
}

subject := `test-subject-avro`
if err := registry.Register(subject, 1, func(unmarshaler Unmarshaler) (v interface{}, err error) {
	record := SampleRecord{}
	if err := unmarshaler.Unmarshal(&record); err != nil {
		return nil, err
	}

	return record, nil
}); err != nil {
	log.Fatal(err)
}

// Encode the message
record := SampleRecord{
	Field1: 1,
	Field2: 2.0,
	Field3: "text",
}

bytePayload, err := registry.WithSchema(subject, 1).Encode(record)
if err != nil {
	panic(err)
}

// Decode the message
ev, err := registry.GenericEncoder().Decode(bytePayload) // Returns SampleRecord
if err != nil {
	panic(err)
}

fmt.Printf("%+v", ev)
Output:

Example (Protobuf)
// Init a new schema registry instance and connect
url := `http://localhost:8081/`
registry, err := NewRegistry(
	url,
	WithBackgroundSync(5*time.Second),
	WithLogger(log.NewLog().Log(log.WithLevel(log.TRACE))),
	// MockClient for examples only
	WithMockClient(srclient.CreateMockSchemaRegistryClient(url)),
)
if err != nil {
	log.Fatal(err)
}

// Start Background Sync to detect new Versions
if err := registry.Sync(); err != nil {
	log.Fatal(err)
}

subject := `test-subject-protobuf`
if err := registry.Register(subject, 1, func(unmarshaler Unmarshaler) (v interface{}, err error) {
	record := &com_mycorp_mynamespace.SampleRecord{}
	if err := unmarshaler.Unmarshal(&record); err != nil {
		return nil, err
	}

	return record, nil
}); err != nil {
	log.Fatal(err)
}

// Encode the message
record := &com_mycorp_mynamespace.SampleRecord{
	Field1: 1,
	Field2: 2.0,
	Field3: "text",
}

bytePayload, err := registry.WithSchema(subject, 1).Encode(record)
if err != nil {
	panic(err)
}

// Decode the message
ev, err := registry.GenericEncoder().Decode(bytePayload) // Returns *com_mycorp_mynamespace.SampleRecord
if err != nil {
	panic(err)
}

fmt.Printf("%+v", ev)
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Sync

func Sync(syncInterval time.Duration, logger log.Logger, registry *Registry) error

Types

type AvroMarshaller

type AvroMarshaller struct {
	// contains filtered or unexported fields
}

func NewAvroMarshaller

func NewAvroMarshaller(schema string) *AvroMarshaller

func (*AvroMarshaller) Init

func (s *AvroMarshaller) Init() error

func (*AvroMarshaller) Marshall

func (s *AvroMarshaller) Marshall(data interface{}) ([]byte, error)

func (*AvroMarshaller) NewUnmarshaler

func (s *AvroMarshaller) NewUnmarshaler(data []byte) Unmarshaler

type AvroUnmarshaler

type AvroUnmarshaler struct {
	// contains filtered or unexported fields
}

func (*AvroUnmarshaler) Unmarshal

func (s *AvroUnmarshaler) Unmarshal(in interface{}) error

type Encoder

type Encoder interface {
	Encode(v interface{}) ([]byte, error)
	Decode(data []byte) (interface{}, error)
}

func NewRegistryEncoder

func NewRegistryEncoder(registry *Registry, subject *Subject) Encoder

NewRegistryEncoder NewEncoder return the Encoder for given Subject from the Registry

type GenericEncoder

type GenericEncoder struct {
	Encoder
}

GenericEncoder holds the reference to Registry and Subject which can be used to decode messages

if err := registry.Register(`test-subject-avro`, schemaregistry.VersionAll,

	func(unmarshaler schemaregistry.Unmarshaler) (v interface{}, err error) {
    re := SampleRecord{}
    if err := unmarshaler.Unmarshal(&re); err != nil {
        return nil, err
    }

    return re, nil
}); err != nil {

    log.Fatal(err)
}

// Encode logic...

ev, err := registry.GenericEncoder().Decode(payload) // Returns SampleRecord
if err != nil {
    panic(err)
}

func (*GenericEncoder) Encode

func (s *GenericEncoder) Encode(_ interface{}) ([]byte, error)

type Marshaller

type Marshaller interface {
	Init() error
	Marshall(v interface{}) ([]byte, error)
	NewUnmarshaler(data []byte) Unmarshaler
}

func NewProtoMarshaller

func NewProtoMarshaller() Marshaller

type Option

type Option func(*Options)

Option is a type to host NewRegistry configurations

func WithBackgroundSync

func WithBackgroundSync(syncInterval time.Duration) Option

WithBackgroundSync Checks for new schemas by regularly querying schema registry

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger returns a Configurations to create a NewRegistry with given PrefixedLogger

func WithMockClient

func WithMockClient(client *registry.MockSchemaRegistryClient) Option

WithMockClient create a mock version of the registry(Testing purposes only)

type Options

type Options struct {
	// contains filtered or unexported fields
}

type ProtoMarshaller

type ProtoMarshaller struct{}

func (*ProtoMarshaller) Init

func (s *ProtoMarshaller) Init() error

func (*ProtoMarshaller) Marshall

func (s *ProtoMarshaller) Marshall(v interface{}) ([]byte, error)

func (*ProtoMarshaller) NewUnmarshaler

func (s *ProtoMarshaller) NewUnmarshaler(data []byte) Unmarshaler

type ProtoUnmarshaler

type ProtoUnmarshaler struct {
	// contains filtered or unexported fields
}

func (*ProtoUnmarshaler) Unmarshal

func (s *ProtoUnmarshaler) Unmarshal(in interface{}) error

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry type holds schema registry details

func NewRegistry

func NewRegistry(url string, opts ...Option) (*Registry, error)

NewRegistry returns a Registry instance

func (*Registry) GenericEncoder

func (r *Registry) GenericEncoder() Encoder

GenericEncoder returns a placeholder encoder for decoders. It can be used to decode any schema version for registered subjects without explicitly mentioning the Subject:Version combination

func (*Registry) Print

func (r *Registry) Print(subject *Subject)

func (*Registry) Register

func (r *Registry) Register(subjectName string, version Version, unmarshalerFunc UnmarshalerFunc) error

Register registers the given subject, version and UnmarshalerFunc in the Registry

func (*Registry) Sync

func (r *Registry) Sync() error

Sync function starts the background process looking for news schema versions for already registered subjects

Newly Created Schemas will register in background and the client does not need any restarts

func (*Registry) WithLatestSchema

func (r *Registry) WithLatestSchema(subject string) Encoder

WithLatestSchema returns the latest event version encoder registered under given subject

func (*Registry) WithSchema

func (r *Registry) WithSchema(subject string, version Version) Encoder

WithSchema return the specific encoder which registered at the initialization under the subject and version

type RegistryEncoder

type RegistryEncoder struct {
	// contains filtered or unexported fields
}

RegistryEncoder holds the reference to Registry and Subject which can be used to encode and decode messages

func (*RegistryEncoder) Decode

func (s *RegistryEncoder) Decode(data []byte) (interface{}, error)

Decode returns the decoded go interface of avro encoded message and error if its unable to decode

func (*RegistryEncoder) Encode

func (s *RegistryEncoder) Encode(data interface{}) ([]byte, error)

Encode return a byte slice with a avro encoded message. magic byte and schema id will be appended to its beginning

╔════════════════════╤════════════════════╤════════════════════════╗
║ magic byte(1 byte) │ schema id(4 bytes) │ Encoded Message		   ║
╚════════════════════╧════════════════════╧════════════════════════╝

type Subject

type Subject struct {
	Schema          string  // The actual AVRO subject
	Subject         string  // Subject where the subject is registered for
	Version         Version // Version within this subject
	Id              int     // Registry's unique id
	UnmarshalerFunc UnmarshalerFunc
	// contains filtered or unexported fields
}

Subject holds the Schema information of the registered subject

func (Subject) String

func (s Subject) String() string

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(in interface{}) error
}

type UnmarshalerFunc

type UnmarshalerFunc func(unmarshaler Unmarshaler) (interface{}, error)

type Version

type Version int

Version is the type to hold default register vrsion options

const (
	//VersionLatest constant hold the flag to register the latest version of the subject
	VersionLatest Version = -1
	//VersionAll constant hold the flag to register all the versions of the subject
	VersionAll Version = -2
)

func (Version) String

func (v Version) String() string

String returns the version type

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL