schemaregistry

package
v0.11.0-nightly.20240629 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultSchemaFactories = map[sr.SchemaType]SchemaFactory{
	avro.Type: {
		Parse:         func(s string) (Schema, error) { return avro.Parse(s) },
		SchemaForType: func(v any) (Schema, error) { return avro.SchemaForType(v) },
	},
}

Functions

func ExampleSchemaRegistryURL

func ExampleSchemaRegistryURL(exampleName string, port int) (string, func())

ExampleSchemaRegistryURL creates a fake in-memory schema registry server and returns its address and a cleanup function which should be executed in a deferred call.

This method is only used if examples are run without --tags=integration. It is meant as a utility to allow faster iteration when developing, please run integration tests to ensure the code works with a real schema registry.

func TestSchemaRegistryURL

func TestSchemaRegistryURL(t testing.TB) string

TestSchemaRegistryURL creates a fake in-memory schema registry server and returns its address.

This method is only used if the tests are run without --tags=integration. It is meant as a utility to allow faster iteration when developing, please run integration tests to ensure the code works with a real schema registry.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a schema registry client that caches schemas. It is safe for concurrent use.

func NewClient

func NewClient(logger log.CtxLogger, opts ...sr.Opt) (*Client, error)

NewClient creates a new client using the provided logger and schema registry client options.

func (*Client) CreateSchema

func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (sr.SubjectSchema, error)

CreateSchema checks if the schema is already registered in the cache and returns the associated sr.SubjectSchema if it is found. Otherwise, the schema is sent to the schema registry and stored in the cache, if the registration was successful.

func (*Client) SchemaByID

func (c *Client) SchemaByID(ctx context.Context, id int) (sr.Schema, error)

SchemaByID checks if the schema is already registered in the cache and returns the associated sr.Schema if it is found. Otherwise, the schema is retrieved from the schema registry and stored in the cache. Note that the returned schema does not contain a subject and version, so the cache will not have an effect on methods that return a sr.SubjectSchema.

func (*Client) SchemaBySubjectVersion

func (c *Client) SchemaBySubjectVersion(ctx context.Context, subject string, version int) (sr.SubjectSchema, error)

SchemaBySubjectVersion checks if the schema is already registered in the cache and returns the associated sr.SubjectSchema if it is found. Otherwise, the schema is retrieved from the schema registry and stored in the cache.

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

func NewDecoder

func NewDecoder(client *Client, logger log.CtxLogger, serde *sr.Serde) *Decoder

func (*Decoder) Decode

type DownloadSchemaStrategy

type DownloadSchemaStrategy struct {
	Subject string
	// TODO add support for specifying "latest" - https://github.com/ConduitIO/conduit/issues/1095
	Version int
}

func (DownloadSchemaStrategy) GetSchema

type Encoder

type Encoder struct {
	SchemaStrategy
	// contains filtered or unexported fields
}

func NewEncoder

func NewEncoder(client *Client, logger log.CtxLogger, serde *sr.Serde, strategy SchemaStrategy) *Encoder

func (*Encoder) Encode

type ExtractAndUploadSchemaStrategy

type ExtractAndUploadSchemaStrategy struct {
	Type    sr.SchemaType
	Subject string
}

func (ExtractAndUploadSchemaStrategy) GetSchema

type Schema

type Schema interface {
	// Marshal returns the encoded representation of v.
	Marshal(v any) ([]byte, error)
	// Unmarshal parses encoded data and stores the result in the value pointed
	// to by v. If v is nil or not a pointer, Unmarshal returns an error.
	Unmarshal(b []byte, v any) error
	// String returns the textual representation of the schema.
	String() string
}

type SchemaFactory

type SchemaFactory struct {
	// Parse takes the textual representation of the schema and parses it into
	// a Schema.
	Parse func(string) (Schema, error)
	// SchemaForType returns a Schema that matches the structure of v.
	SchemaForType func(v any) (Schema, error)
}

type SchemaStrategy

type SchemaStrategy interface {
	GetSchema(context.Context, *Client, log.CtxLogger, opencdc.StructuredData) (Schema, sr.SubjectSchema, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL