Documentation ¶
Overview ¶
Example (Avro) ¶
// Init a new schema registry instance and connect url := `http://localhost:8081/` registry, err := NewRegistry( url, WithLogger(log.NewLog().Log(log.WithLevel(log.TRACE))), WithBackgroundSync(5*time.Second), // MockClient for examples only WithMockClient(srclient.CreateMockSchemaRegistryClient(url)), ) if err != nil { log.Fatal(err) } // Start Background Sync to detect new Versions if err := registry.Sync(); err != nil { log.Fatal(err) } type SampleRecord struct { Field1 int `avro:"field1"` Field2 float64 `avro:"field2"` Field3 string `avro:"field3"` } subject := `test-subject-avro` if err := registry.Register(subject, 1, func(unmarshaler Unmarshaler) (v interface{}, err error) { record := SampleRecord{} if err := unmarshaler.Unmarshal(&record); err != nil { return nil, err } return record, nil }); err != nil { log.Fatal(err) } // Encode the message record := SampleRecord{ Field1: 1, Field2: 2.0, Field3: "text", } bytePayload, err := registry.WithSchema(subject, 1).Encode(record) if err != nil { panic(err) } // Decode the message ev, err := registry.GenericEncoder().Decode(bytePayload) // Returns SampleRecord if err != nil { panic(err) } fmt.Printf("%+v", ev)
Output:
Example (Protobuf) ¶
// Init a new schema registry instance and connect url := `http://localhost:8081/` registry, err := NewRegistry( url, WithBackgroundSync(5*time.Second), WithLogger(log.NewLog().Log(log.WithLevel(log.TRACE))), // MockClient for examples only WithMockClient(srclient.CreateMockSchemaRegistryClient(url)), ) if err != nil { log.Fatal(err) } // Start Background Sync to detect new Versions if err := registry.Sync(); err != nil { log.Fatal(err) } subject := `test-subject-protobuf` if err := registry.Register(subject, 1, func(unmarshaler Unmarshaler) (v interface{}, err error) { record := &com_mycorp_mynamespace.SampleRecord{} if err := unmarshaler.Unmarshal(&record); err != nil { return nil, err } return record, nil }); err != nil { log.Fatal(err) } // Encode the message record := &com_mycorp_mynamespace.SampleRecord{ Field1: 1, Field2: 2.0, Field3: "text", } bytePayload, err := registry.WithSchema(subject, 1).Encode(record) if err != nil { panic(err) } // Decode the message ev, err := registry.GenericEncoder().Decode(bytePayload) // Returns *com_mycorp_mynamespace.SampleRecord if err != nil { panic(err) } fmt.Printf("%+v", ev)
Output:
Index ¶
- func Sync(syncInterval time.Duration, logger log.Logger, registry *Registry) error
- type AvroMarshaller
- type AvroUnmarshaler
- type Encoder
- type GenericEncoder
- type Marshaller
- type Option
- type Options
- type ProtoMarshaller
- type ProtoUnmarshaler
- type Registry
- func (r *Registry) GenericEncoder() Encoder
- func (r *Registry) Print(subject *Subject)
- func (r *Registry) Register(subjectName string, version Version, unmarshalerFunc UnmarshalerFunc) error
- func (r *Registry) Sync() error
- func (r *Registry) WithLatestSchema(subject string) Encoder
- func (r *Registry) WithSchema(subject string, version Version) Encoder
- type RegistryEncoder
- type Subject
- type Unmarshaler
- type UnmarshalerFunc
- type Version
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AvroMarshaller ¶
type AvroMarshaller struct {
// contains filtered or unexported fields
}
func NewAvroMarshaller ¶
func NewAvroMarshaller(schema string) *AvroMarshaller
func (*AvroMarshaller) Init ¶
func (s *AvroMarshaller) Init() error
func (*AvroMarshaller) Marshall ¶
func (s *AvroMarshaller) Marshall(data interface{}) ([]byte, error)
func (*AvroMarshaller) NewUnmarshaler ¶
func (s *AvroMarshaller) NewUnmarshaler(data []byte) Unmarshaler
type AvroUnmarshaler ¶
type AvroUnmarshaler struct {
// contains filtered or unexported fields
}
func (*AvroUnmarshaler) Unmarshal ¶
func (s *AvroUnmarshaler) Unmarshal(in interface{}) error
type Encoder ¶
type Encoder interface { Encode(v interface{}) ([]byte, error) Decode(data []byte) (interface{}, error) }
func NewRegistryEncoder ¶
NewRegistryEncoder NewEncoder return the Encoder for given Subject from the Registry
type GenericEncoder ¶
type GenericEncoder struct {
Encoder
}
GenericEncoder holds the reference to Registry and Subject which can be used to decode messages
if err := registry.Register(`test-subject-avro`, schemaregistry.VersionAll,
func(unmarshaler schemaregistry.Unmarshaler) (v interface{}, err error) { re := SampleRecord{} if err := unmarshaler.Unmarshal(&re); err != nil { return nil, err } return re, nil }); err != nil { log.Fatal(err) } // Encode logic... ev, err := registry.GenericEncoder().Decode(payload) // Returns SampleRecord if err != nil { panic(err) }
func (*GenericEncoder) Encode ¶
func (s *GenericEncoder) Encode(_ interface{}) ([]byte, error)
type Marshaller ¶
type Marshaller interface { Init() error Marshall(v interface{}) ([]byte, error) NewUnmarshaler(data []byte) Unmarshaler }
func NewProtoMarshaller ¶
func NewProtoMarshaller() Marshaller
type Option ¶
type Option func(*Options)
Option is a type to host NewRegistry configurations
func WithBackgroundSync ¶
WithBackgroundSync Checks for new schemas by regularly querying schema registry
func WithLogger ¶
WithLogger returns a Configurations to create a NewRegistry with given PrefixedLogger
func WithMockClient ¶
func WithMockClient(client *registry.MockSchemaRegistryClient) Option
WithMockClient create a mock version of the registry(Testing purposes only)
type ProtoMarshaller ¶
type ProtoMarshaller struct{}
func (*ProtoMarshaller) Init ¶
func (s *ProtoMarshaller) Init() error
func (*ProtoMarshaller) Marshall ¶
func (s *ProtoMarshaller) Marshall(v interface{}) ([]byte, error)
func (*ProtoMarshaller) NewUnmarshaler ¶
func (s *ProtoMarshaller) NewUnmarshaler(data []byte) Unmarshaler
type ProtoUnmarshaler ¶
type ProtoUnmarshaler struct {
// contains filtered or unexported fields
}
func (*ProtoUnmarshaler) Unmarshal ¶
func (s *ProtoUnmarshaler) Unmarshal(in interface{}) error
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry type holds schema registry details
func NewRegistry ¶
NewRegistry returns a Registry instance
func (*Registry) GenericEncoder ¶
GenericEncoder returns a placeholder encoder for decoders. It can be used to decode any schema version for registered subjects without explicitly mentioning the Subject:Version combination
func (*Registry) Register ¶
func (r *Registry) Register(subjectName string, version Version, unmarshalerFunc UnmarshalerFunc) error
Register registers the given subject, version and UnmarshalerFunc in the Registry
func (*Registry) Sync ¶
Sync function starts the background process looking for news schema versions for already registered subjects
Newly Created Schemas will register in background and the client does not need any restarts
func (*Registry) WithLatestSchema ¶
WithLatestSchema returns the latest event version encoder registered under given subject
type RegistryEncoder ¶
type RegistryEncoder struct {
// contains filtered or unexported fields
}
RegistryEncoder holds the reference to Registry and Subject which can be used to encode and decode messages
func (*RegistryEncoder) Decode ¶
func (s *RegistryEncoder) Decode(data []byte) (interface{}, error)
Decode returns the decoded go interface of avro encoded message and error if its unable to decode
func (*RegistryEncoder) Encode ¶
func (s *RegistryEncoder) Encode(data interface{}) ([]byte, error)
Encode return a byte slice with a avro encoded message. magic byte and schema id will be appended to its beginning
╔════════════════════╤════════════════════╤════════════════════════╗ ║ magic byte(1 byte) │ schema id(4 bytes) │ Encoded Message ║ ╚════════════════════╧════════════════════╧════════════════════════╝
type Subject ¶
type Subject struct { Schema string // The actual AVRO subject Subject string // Subject where the subject is registered for Version Version // Version within this subject Id int // Registry's unique id UnmarshalerFunc UnmarshalerFunc // contains filtered or unexported fields }
Subject holds the Schema information of the registered subject
type Unmarshaler ¶
type Unmarshaler interface {
Unmarshal(in interface{}) error
}
type UnmarshalerFunc ¶
type UnmarshalerFunc func(unmarshaler Unmarshaler) (interface{}, error)