schemaregistry

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 13 Imported by: 4

README

Conduit Schema Registry

License Test Go Report Card

Conduit Schema Registry is a service that stores and manages schemas. It is used internally in Conduit to store the schemas of records that are produced and consumed by connectors.

The schema registry can either be embedded in a Go application or run as a standalone service exposing a REST API.

[!NOTE] This is a work in progress and is not ready to be used as a standalone service in production.

REST API

The schema registry exposes a REST API compatible with the Confluent Schema Registry API. The API is documented in the Confluent Schema Registry documentation.

List of supported endpoints:

  • Schemas

    • GET /schemas/ids/{id}
    • GET /schemas/ids/{id}/schema
    • GET /schemas/types
    • GET /schemas/ids/{id}/versions
  • Subjects

    • GET /subjects
    • GET /subjects/{subject}/versions
    • DELETE /subjects/{subject}
    • GET /subjects/{subject}/versions/{version}
    • GET /subjects/{subject}/versions/{version}/schema
    • POST /subjects/{subject}/versions
    • POST /subjects/{subject}
    • DELETE /subjects/{subject}/versions/{version}
    • GET /subjects/{subject}/versions/{version}/referencedby
  • Mode

    • GET /mode
    • PUT /mode
    • GET /mode/{subject}
    • PUT /mode/{subject}
    • DELETE /mode/{subject}
  • Compatibility

    • POST /compatibility/subjects/{subject}/versions/{version}
    • POST /compatibility/subjects/{subject}/versions
  • Config

    • PUT /config
    • GET /config
    • PUT /config/{subject}
    • GET /config/{subject}
    • DELETE /config/{subject}
  • Exporters

    • GET /exporters
    • GET /contexts
    • POST /exporters
    • PUT /exporters/{name}
    • PUT /exporters/{name}/config
    • GET /exporters/{name}
    • GET /exporters/{name}/status
    • GET /exporters/{name}/config
    • PUT /exporters/{name}/pause
    • PUT /exporters/{name}/reset
    • PUT /exporters/{name}/resume
    • DELETE /exporters/{name}

Documentation

Index

Constants

View Source
const (
	ErrorCodeSubjectNotFound                            = 40401
	ErrorCodeVersionNotFound                            = 40402
	ErrorCodeSchemaNotFound                             = 40403
	ErrorCodeExporterNotFound                           = 40450
	ErrorCodeIncompatibleSchema                         = 40901
	ErrorCodeMissingOrInvalidExporterName               = 40950
	ErrorCodeMissingOrInvalidExporterConfig             = 40951
	ErrorCodeInvalidExporterSubjects                    = 40952
	ErrorCodeExporterAlreadyExists                      = 40960
	ErrorCodeExporterAlreadyRunning                     = 40961
	ErrorCodeExporterAlreadyStarting                    = 40962
	ErrorCodeExporterNotPaused                          = 40963
	ErrorCodeTooManyExporters                           = 40964
	ErrorCodeInvalidSchema                              = 42201
	ErrorCodeInvalidVersion                             = 42202
	ErrorCodeInvalidCompatibilityLevel                  = 42203
	ErrorCodeInvalidMode                                = 42204
	ErrorCodeErrorInTheBackendDataStore                 = 50001
	ErrorCodeOperationTimedOut                          = 50002
	ErrorCodeErrorWhileForwardingTheRequestToThePrimary = 50003
)

Variables

View Source
var (
	ErrSubjectNotFound                            = &sr.ResponseError{ErrorCode: ErrorCodeSubjectNotFound, Message: "Subject not found"}
	ErrVersionNotFound                            = &sr.ResponseError{ErrorCode: ErrorCodeVersionNotFound, Message: "Version not found"}
	ErrSchemaNotFound                             = &sr.ResponseError{ErrorCode: ErrorCodeSchemaNotFound, Message: "Schema not found"}
	ErrExporterNotFound                           = &sr.ResponseError{ErrorCode: ErrorCodeExporterNotFound, Message: "Exporter not found"}
	ErrIncompatibleSchema                         = &sr.ResponseError{ErrorCode: ErrorCodeIncompatibleSchema, Message: "Incompatible schema"}
	ErrMissingOrInvalidExporterName               = &sr.ResponseError{ErrorCode: ErrorCodeMissingOrInvalidExporterName, Message: "Missing or invalid exporter name"}
	ErrMissingOrInvalidExporterConfig             = &sr.ResponseError{ErrorCode: ErrorCodeMissingOrInvalidExporterConfig, Message: "Missing or invalid exporter config"}
	ErrInvalidExporterSubjects                    = &sr.ResponseError{ErrorCode: ErrorCodeInvalidExporterSubjects, Message: "Invalid exporter subjects"}
	ErrExporterAlreadyExists                      = &sr.ResponseError{ErrorCode: ErrorCodeExporterAlreadyExists, Message: "Exporter already exists"}
	ErrExporterAlreadyRunning                     = &sr.ResponseError{ErrorCode: ErrorCodeExporterAlreadyRunning, Message: "Exporter already running"}
	ErrExporterAlreadyStarting                    = &sr.ResponseError{ErrorCode: ErrorCodeExporterAlreadyStarting, Message: "Exporter already starting"}
	ErrExporterNotPaused                          = &sr.ResponseError{ErrorCode: ErrorCodeExporterNotPaused, Message: "Exporter not paused"}
	ErrTooManyExporters                           = &sr.ResponseError{ErrorCode: ErrorCodeTooManyExporters, Message: "Too many exporters"}
	ErrInvalidSchema                              = &sr.ResponseError{ErrorCode: ErrorCodeInvalidSchema, Message: "Invalid schema"}
	ErrInvalidVersion                             = &sr.ResponseError{ErrorCode: ErrorCodeInvalidVersion, Message: "Invalid version"}
	ErrInvalidCompatibilityLevel                  = &sr.ResponseError{ErrorCode: ErrorCodeInvalidCompatibilityLevel, Message: "Invalid compatibility level"}
	ErrInvalidMode                                = &sr.ResponseError{ErrorCode: ErrorCodeInvalidMode, Message: "Invalid mode"}
	ErrErrorInTheBackendDataStore                 = &sr.ResponseError{ErrorCode: ErrorCodeErrorInTheBackendDataStore, Message: "Error in the backend data store"}
	ErrOperationTimedOut                          = &sr.ResponseError{ErrorCode: ErrorCodeOperationTimedOut, Message: "Operation timed out"}
	ErrErrorWhileForwardingTheRequestToThePrimary = &sr.ResponseError{ErrorCode: ErrorCodeErrorWhileForwardingTheRequestToThePrimary, Message: "Error while forwarding the request to the primary"}
)

Functions

func StatusCode

func StatusCode(errCode int) int

StatusCode returns the HTTP status code for the given error.

Types

type SchemaRegistry

type SchemaRegistry struct {
	// contains filtered or unexported fields
}

SchemaRegistry is a schema registry that stores schemas in memory.

func NewSchemaRegistry

func NewSchemaRegistry(db database.DB) (*SchemaRegistry, error)

func (*SchemaRegistry) CreateSchema

func (r *SchemaRegistry) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (_ sr.SubjectSchema, err error)

func (*SchemaRegistry) SchemaByID

func (r *SchemaRegistry) SchemaByID(ctx context.Context, id int) (sr.Schema, error)

func (*SchemaRegistry) SchemaBySubjectVersion

func (r *SchemaRegistry) SchemaBySubjectVersion(ctx context.Context, subject string, version int) (sr.SubjectSchema, error)

func (*SchemaRegistry) SchemaVersionsBySubject

func (r *SchemaRegistry) SchemaVersionsBySubject(ctx context.Context, subject string) ([]int, error)

func (*SchemaRegistry) SubjectVersionsByID

func (r *SchemaRegistry) SubjectVersionsByID(ctx context.Context, id int) ([]sr.SubjectSchema, error)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server wraps a registry and provides an HTTP API for it.

func NewServer

func NewServer(logger *slog.Logger, registry *SchemaRegistry) *Server

NewServer creates a new server with the given logger and registry.

func (*Server) RegisterHandlers

func (srv *Server) RegisterHandlers(mux *http.ServeMux)

RegisterHandlers registers the server's handlers with the given ServeMux.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL