sr

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2024 License: BSD-3-Clause Imports: 17 Imported by: 26

Documentation

Overview

Package sr provides a schema registry client and a helper type to encode values and decode data according to the schema registry wire format.

As mentioned on the Serde type, this package does not provide schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

The client does not automatically cache schemas, instead, the Serde type is used for the actual caching of IDs to how to encode/decode the IDs. The Client type itself simply speaks http to your schema registry and returns the results.

To read more about the schema registry, see the following:

https://docs.confluent.io/platform/current/schema-registry/develop/api.html

Index

Constants

View Source
const (
	// SoftDelete performs a soft deletion.
	SoftDelete = false
	// HardDelete performs a hard deletion. Values must be soft deleted
	// before they can be hard deleted.
	HardDelete = true
)
View Source
const GlobalSubject = ""

GlobalSubject is a constant to make API usage of requesting global subjects clearer.

Variables

View Source
var (
	// Normalize is a Param that configures whether or not to normalize
	// schema's in certain create- or get-schema operations.
	Normalize = Param{/* contains filtered or unexported fields */}

	// Verbose is a Param that configures whether or not to return verbose
	// error messages when checking compatibility.
	Verbose = Param{/* contains filtered or unexported fields */}

	// FetchMaxID is a Param that configures whether or not to fetch the
	// max schema ID in certain get-schema operations.
	FetchMaxID = Param{/* contains filtered or unexported fields */}

	// DefaultToGlobal is a Param that changes get-compatibility or
	// get-mode to return the global compatibility or mode if the requested
	// subject does not exist.
	DefaultToGlobal = Param{/* contains filtered or unexported fields */}

	// Force is a Param that updating the mode if you are setting the mode
	// to IMPORT and schemas currently exist.
	Force = Param{/* contains filtered or unexported fields */}

	// LatestOnly is a Param that configures whether or not to return only
	// the latest schema in certain get-schema operations.
	LatestOnly = Param{/* contains filtered or unexported fields */}

	// ShowDeleted is a Param that configures whether or not to return
	// deleted schemas or subjects in certain get operations.
	ShowDeleted = Param{/* contains filtered or unexported fields */}

	// DeletedOnly is a Param that configures whether to return only
	// deleted schemas or subjects in certain get operations.
	DeletedOnly = Param{/* contains filtered or unexported fields */}
)
View Source
var (
	// ErrNotRegistered is returned from Serde when attempting to encode a
	// value or decode an ID that has not been registered, or when using
	// Decode with a missing new value function.
	ErrNotRegistered = errors.New("registration is missing for encode/decode")

	// ErrBadHeader is returned from Decode when the input slice is shorter
	// than five bytes, or if the first byte is not the magic 0 byte.
	ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte")
)

Functions

func AppendEncode

func AppendEncode(b []byte, v any, h SerdeHeader, id int, index []int, enc func([]byte, any) ([]byte, error)) ([]byte, error)

AppendEncode encodes a value and prepends the header, appends it to b and returns b. If the encoding function fails, this returns an error.

func Encode

func Encode(v any, h SerdeHeader, id int, index []int, enc func(any) ([]byte, error)) ([]byte, error)

Encode encodes a value and prepends the header. If the encoding function fails, this returns an error.

func WithParams

func WithParams(ctx context.Context, p ...Param) context.Context

WithParams adds query parameters to the given context. This is a merge operation: any non-zero parameter is kept. The variadic nature of this allows for a nicer api:

sr.WithParams(ctx, sr.Format("default"), sr.FetchMaxID)

Types

type CheckCompatibilityResult

type CheckCompatibilityResult struct {
	Is       bool     `json:"is_compatible"` // Is is true if the schema is compatible.
	Messages []string `json:"messages"`      // Messages contains reasons a schema is not compatible.
}

CheckCompatibilityResult is the response from the check compatibility endpoint.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client talks to a schema registry and contains helper functions to serialize and deserialize objects according to schemas.

func NewClient

func NewClient(opts ...ClientOpt) (*Client, error)

NewClient returns a new schema registry client.

func (*Client) AllSchemas

func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error)

AllSchemas returns all schemas for all subjects.

This supports params SubjectPrefix, ShowDeleted, and LatestOnly.

func (*Client) CheckCompatibility

func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error)

CheckCompatibility checks if a schema is compatible with the given version that exists. You can use -1 to check compatibility with the latest version, and -2 to check compatibility against all versions.

This supports params Normalize and Verbose.

func (*Client) Compatibility

func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult

Compatibility returns the subject compatibility and global compatibility of each requested subject. The global compatibility can be requested by using either an empty subject or by specifying no subjects.

This supports params DefaultToGlobal.

This can return 200 or 500 per result.

func (*Client) CreateSchema

func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

CreateSchema attempts to create a schema in the given subject.

This supports param Normalize.

func (*Client) DeleteSchema

func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error

DeleteSchema deletes the schema at the given version. You must soft delete a schema before it can be hard deleted. You can use -1 to delete the latest version.

func (*Client) DeleteSubject

func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteHow) ([]int, error)

DeleteSubject deletes the subject. You must soft delete a subject before it can be hard deleted. This returns all versions that were deleted.

func (*Client) LookupSchema

func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

LookupSchema checks to see if a schema is already registered and if so, returns its ID and version in the SubjectSchema.

This supports params Normalize and Deleted.

func (*Client) Mode

func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult

Mode returns the subject and global mode of each requested subject. The global mode can be requested by using either an empty subject or by specifying no subjects.

This supports params DefaultToGlobal.

func (*Client) Opts added in v1.2.0

func (cl *Client) Opts() []ClientOpt

Opts returns the options that were used to create this client. This can be as a base to generate a new client, where you can add override options to the end of the original input list.

func (*Client) ResetCompatibility

func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult

ResetCompatibility deletes any subject-level compatibility and reverts to the global default. The global compatibility can be reset by either using an empty subject or by specifying no subjects.

This can return 200 or 500.

func (*Client) ResetMode

func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult

ResetMode deletes any subject modes and reverts to the global default.

func (*Client) SchemaByID

func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error)

SchemaByID returns the schema for a given schema ID.

This supports params Subject, Format, and FetchMaxID.

func (*Client) SchemaByVersion

func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error)

SchemaByVersion returns the schema for a given subject and version. You can use -1 as the version to return the latest schema.

This supports param ShowDeleted.

func (*Client) SchemaReferences

func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error)

SchemaReferences returns all schemas that references the input subject-version. You can use -1 to check the latest version.

This supports param ShowDeleted.

func (*Client) SchemaTextByID

func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error)

SchemaTextByID returns the actual text of a schema.

For example, if the schema for an ID is

"{\"type\":\"boolean\"}"

this will return

{"type":"boolean"}

This supports params Subject, Format.

func (*Client) SchemaTextByVersion

func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error)

SchemaTextByVersion returns the actual text of a schema, by subject and version. You can use -1 as the version to return the latest schema.

For example, if the schema for an ID is

"{\"type\":\"boolean\"}"

this will return

{"type":"boolean"}

This supports param ShowDeleted.

func (*Client) SchemaUsagesByID

func (cl *Client) SchemaUsagesByID(ctx context.Context, id int) ([]SubjectSchema, error)

SchemaUsagesByID returns all usages of a given schema ID. A single schema's can be reused in many subject-versions; this function can be used to map a schema to all subject-versions that use it.

This supports param ShowDeleted.

func (*Client) SchemaVersionsByID

func (cl *Client) SchemaVersionsByID(ctx context.Context, id int) ([]SubjectVersion, error)

SchemaVersionsByID returns all subject versions associated with a schema ID.

This supports params Subject and ShowDeleted.

func (*Client) Schemas

func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, error)

Schemas returns all schemas for the given subject.

This supports param ShowDeleted.

func (*Client) SetCompatibility

func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult

SetCompatibility sets the compatibility for each requested subject. The global compatibility can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.

func (*Client) SetMode

func (cl *Client) SetMode(ctx context.Context, mode Mode, subjects ...string) []ModeResult

SetMode sets the mode for each requested subject. The global mode can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.

This supports params Force.

func (*Client) SubjectVersions

func (cl *Client) SubjectVersions(ctx context.Context, subject string) ([]int, error)

SubjectVersions returns all versions for a given subject.

This supports params ShowDeleted and DeletedOnly.

func (*Client) Subjects

func (cl *Client) Subjects(ctx context.Context) ([]string, error)

Subjects returns subjects available in the registry.

This supports params SubjectPrefix, ShowDeleted, and DeletedOnly.

func (*Client) SubjectsByID

func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error)

SubjectsByID returns the subjects associated with a schema ID.

This supports params Subject and ShowDeleted.

func (*Client) SupportedTypes

func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)

SupportedTypes returns the schema types that are supported in the schema registry.

type ClientOpt

type ClientOpt interface {
	// contains filtered or unexported methods
}

ClientOpt is an option to configure a client.

func BasicAuth

func BasicAuth(user, pass string) ClientOpt

BasicAuth sets basic authorization to use for every request.

func BearerToken added in v1.1.0

func BearerToken(token string) ClientOpt

BearerToken sets an Authorization header to use for every request. The format will be: "Authorization: Bearer $token".

func DefaultParams

func DefaultParams(ps ...Param) ClientOpt

DefaultParams sets default parameters to apply to every request.

func DialTLSConfig

func DialTLSConfig(c *tls.Config) ClientOpt

DialTLSConfig sets a tls.Config to use in the default http client.

func HTTPClient

func HTTPClient(httpcl *http.Client) ClientOpt

HTTPClient sets the http client that the schema registry client uses, overriding the default client that speaks plaintext with a timeout of 5s.

func URLs

func URLs(urls ...string) ClientOpt

URLs sets the URLs that the client speaks to, overriding the default http://localhost:8081. This option automatically prefixes any URL that is missing an http:// or https:// prefix with http://.

func UserAgent

func UserAgent(ua string) ClientOpt

UserAgent sets the User-Agent to use in requests, overriding the default "franz-go".

type CompatibilityLevel

type CompatibilityLevel int

CompatibilityLevel as an enum representing config compatibility levels.

const (
	CompatNone CompatibilityLevel = 1 + iota
	CompatBackward
	CompatBackwardTransitive
	CompatForward
	CompatForwardTransitive
	CompatFull
	CompatFullTransitive
)

func (CompatibilityLevel) MarshalText

func (l CompatibilityLevel) MarshalText() ([]byte, error)

func (CompatibilityLevel) String

func (l CompatibilityLevel) String() string

func (*CompatibilityLevel) UnmarshalText

func (l *CompatibilityLevel) UnmarshalText(text []byte) error

type CompatibilityResult

type CompatibilityResult struct {
	Subject string `json:"-"` // The subject this compatibility result is for, or empty for the global compatibility..

	Level            CompatibilityLevel `json:"compatibilityLevel"` // The subject (or global) compatibility level.
	Alias            string             `json:"alias"`              // The subject alias, if any.
	Normalize        bool               `json:"normalize"`          // Whether or not schemas are normalized by default.
	Group            string             `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility.
	DefaultMetadata  *SchemaMetadata    `json:"defaultMetadata"`    // Default metadata used for schema registration.
	OverrideMetadata *SchemaMetadata    `json:"overrideMetadata"`   // Override metadata used for schema registration.
	DefaultRuleSet   *SchemaRuleSet     `json:"defaultRuleSet"`     // Default rule set used for schema registration.
	OverrideRuleSet  *SchemaRuleSet     `json:"overrideRuleSet"`    // Override rule set used for schema registration.

	Err error `json:"-"` // The error received for getting this compatibility.
}

CompatibilityResult is the compatibility level for a subject.

type ConfluentHeader

type ConfluentHeader struct{}

ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It starts with 0, then big endian uint32 of the ID, then index (only protobuf), then the encoded message.

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

func (*ConfluentHeader) AppendEncode

func (*ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error)

AppendEncode appends an encoded header to b according to the Confluent wire format and returns it. Error is always nil.

func (*ConfluentHeader) DecodeID

func (*ConfluentHeader) DecodeID(b []byte) (int, []byte, error)

DecodeID strips and decodes the schema ID from b. It returns the ID alongside the unread bytes. If the header does not contain the magic byte or b contains less than 5 bytes it returns ErrBadHeader.

func (*ConfluentHeader) DecodeIndex

func (*ConfluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error)

DecodeIndex strips and decodes indices from b. It returns the index slice alongside the unread bytes. It expects b to be the output of DecodeID (schema ID should already be stripped away). If maxLength is greater than 0 and the encoded data contains more indices than maxLength the function returns ErrNotRegistered.

type DeleteHow

type DeleteHow bool

DeleteHow is a typed bool indicating how subjects or schemas should be deleted.

type EncodingOpt

type EncodingOpt interface {
	// contains filtered or unexported methods
}

EncodingOpt is an option to configure the behavior of Serde.Encode and Serde.Decode.

func AppendEncodeFn

func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt

AppendEncodeFn allows Serde to encode a value to an existing slice. This can be more efficient than EncodeFn; this function is used if it exists.

func DecodeFn

func DecodeFn(fn func([]byte, any) error) EncodingOpt

DecodeFn allows Serde to decode into a value.

func EncodeFn

func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt

EncodeFn allows Serde to encode a value.

func GenerateFn

func GenerateFn(fn func() any) EncodingOpt

GenerateFn returns a new(Value) that can be decoded into. This function can be used to control the instantiation of a new type for DecodeNew.

func Index

func Index(index ...int) EncodingOpt

Index attaches a message index to a value. A single schema ID can be registered multiple times with different indices.

This option supports schemas that encode many different values from the same schema (namely, protobuf). The index into the schema to encode a particular message is specified with `index`.

NOTE: this option must be used for protobuf schemas.

For more information, see where `message-indexes` are described in:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

type Mode

type Mode int

Mode as an enum representing the "mode" of the registry or a subject.

const (
	ModeImport Mode = iota
	ModeReadOnly
	ModeReadWrite
)

func (Mode) MarshalText

func (m Mode) MarshalText() ([]byte, error)

func (Mode) String

func (m Mode) String() string

func (*Mode) UnmarshalText

func (m *Mode) UnmarshalText(text []byte) error

type ModeResult

type ModeResult struct {
	Subject string // The subject this mode result is for, or empty for the global mode.
	Mode    Mode   // The subject (or global) mode.
	Err     error  // The error received for getting this mode.
}

ModeResult is the mode for a subject.

type Param

type Param struct {
	// contains filtered or unexported fields
}

Param is a parameter that can be passed to various APIs. Each API documents the parameters they accept.

func Format

func Format(f string) Param

Format returns a Param that configures how schema's are returned in certain get-schema operations.

For Avro schemas, the Format param supports "default" or "resolved". For Protobuf schemas, the Format param supports "default", "ignore_extensions", or "serialized".

func Subject

func Subject(s string) Param

Subject returns a Param limiting which subject is returned in certain list-schema or list-subject operations.

func SubjectPrefix

func SubjectPrefix(pfx string) Param

SubjectPrefix returns a Param that filters subjects by prefix when listing schemas.

type ResponseError

type ResponseError struct {
	// Method is the requested http method.
	Method string `json:"-"`
	// URL is the full path that was requested that resulted in this error.
	URL string `json:"-"`
	// StatusCode is the status code that was returned for this error.
	StatusCode int `json:"-"`
	// Raw contains the raw response body.
	Raw []byte `json:"-"`

	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

ResponseError is the type returned from the schema registry for errors.

func (*ResponseError) Error

func (e *ResponseError) Error() string

type Schema

type Schema struct {
	// Schema is the actual unescaped text of a schema.
	Schema string `json:"schema"`

	// Type is the type of a schema. The default type is avro.
	Type SchemaType `json:"schemaType,omitempty"`

	// References declares other schemas this schema references. See the
	// docs on SchemaReference for more details.
	References []SchemaReference `json:"references,omitempty"`

	// SchemaMetadata is arbitrary information about the schema.
	SchemaMetadata *SchemaMetadata `json:"metadata,omitempty"`

	// SchemaRuleSet is a set of rules that govern the schema.
	SchemaRuleSet *SchemaRuleSet `json:"ruleSet,omitempty"`
}

Schema is the object form of a schema for the HTTP API.

type SchemaMetadata

type SchemaMetadata struct {
	Tags       map[string][]string `json:"tags,omitempty"`
	Properties map[string]string   `json:"properties,omitempty"`
	Sensitive  []string            `json:"sensitive,omitempty"`
}

SchemaMetadata is arbitrary information about the schema or its constituent parts, such as whether a field contains sensitive information or who created a data contract.

type SchemaReference

type SchemaReference struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

SchemaReference is a way for a one schema to reference another. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. For more details on references, see the following link:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references
https://docs.confluent.io/platform/current/schema-registry/develop/api.html

type SchemaRule

type SchemaRule struct {
	Name      string            `json:"name"`                // Name is a user-defined name to reference the rule.
	Doc       string            `json:"doc,omitempty"`       // Doc is an optional description of the rule.
	Kind      SchemaRuleKind    `json:"kind"`                // Kind is the type of rule.
	Mode      SchemaRuleMode    `json:"mode"`                // Mode is the mode of the rule.
	Type      string            `json:"type"`                // Type is the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata.
	Tags      []string          `json:"tags"`                // Tags to which this rule applies.
	Params    map[string]string `json:"params,omitempty"`    // Optional params for the rule.
	Expr      string            `json:"expr"`                // Expr is the rule expression.
	OnSuccess string            `json:"onSuccess,omitempty"` // OnSuccess is an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as "NONE,ERROR" for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ
	OnFailure string            `json:"onFailure,omitempty"` // OnFailure is an optional action to execute if the rule fails, otherwise the built-in action type NONE is used. See OnSuccess for more details.
	Disabled  bool              `json:"disabled,omitempty"`  // Disabled specifies whether the rule is disabled.
}

SchemaRule specifies integrity constraints or data policies in a data contract. These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue

https://docs.confluent.io/platform/current/schema-registry/fundamentals/data-contracts.html#rules

type SchemaRuleKind

type SchemaRuleKind int

SchemaRuleKind as an enum representing the kind of schema rule.

const (
	SchemaRuleKindTransform SchemaRuleKind = iota
	SchemaRuleKindCondition
)

func (SchemaRuleKind) MarshalText

func (k SchemaRuleKind) MarshalText() ([]byte, error)

func (SchemaRuleKind) String

func (k SchemaRuleKind) String() string

func (*SchemaRuleKind) UnmarshalText

func (k *SchemaRuleKind) UnmarshalText(text []byte) error

type SchemaRuleMode

type SchemaRuleMode int

SchemaRuleMode specifies a schema rule's mode.

Migration rules can be specified for an UPGRADE, DOWNGRADE, or both (UPDOWN). Migration rules are used during complex schema evolution.

Domain rules can be specified during serialization (WRITE), deserialization (READ) or both (WRITEREAD).

Domain rules can be used to transform the domain values in a message payload.

const (
	SchemaRuleModeUpgrade SchemaRuleMode = iota
	SchemaRuleModeDowngrade
	SchemaRuleModeUpdown
	SchemaRuleModeWrite
	SchemaRuleModeRead
	SchemaRuleModeWriteRead
)

func (SchemaRuleMode) MarshalText

func (m SchemaRuleMode) MarshalText() ([]byte, error)

func (SchemaRuleMode) String

func (m SchemaRuleMode) String() string

func (*SchemaRuleMode) UnmarshalText

func (m *SchemaRuleMode) UnmarshalText(text []byte) error

type SchemaRuleSet

type SchemaRuleSet struct {
	MigrationRules []SchemaRule `json:"migrationRules,omitempty"`
	DomainRules    []SchemaRule `json:"domainRules,omitempty"`
}

SchemaRuleSet groups migration rules and domain validation rules.

type SchemaType

type SchemaType int

SchemaType as an enum representing schema types. The default schema type is avro.

const (
	TypeAvro SchemaType = iota
	TypeProtobuf
	TypeJSON
)

func (SchemaType) MarshalText

func (t SchemaType) MarshalText() ([]byte, error)

func (SchemaType) String

func (t SchemaType) String() string

func (*SchemaType) UnmarshalText

func (t *SchemaType) UnmarshalText(text []byte) error

type Serde

type Serde struct {
	// contains filtered or unexported fields
}

Serde encodes and decodes values according to the schema registry wire format. A Serde itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

To use a Serde for encoding, you must pre-register schema ids and values you will encode, and then you can use the encode functions.

To use a Serde for decoding, you can either pre-register schema ids and values you will consume, or you can discover the schema every time you receive an ErrNotRegistered error from decode.

func NewSerde

func NewSerde(opts ...SerdeOrEncodingOpt) *Serde

NewSerde returns a new Serde using the supplied default options, which are applied to every registered type. These options are always applied first, so you can override them as necessary when registering.

This can be useful if you always want to use the same encoding or decoding functions.

func (*Serde) AppendEncode

func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error)

AppendEncode encodes a value and prepends the header according to the configured SerdeHeader, appends it to b and returns b. If EncodeFn was not registered, this returns ErrNotRegistered.

func (*Serde) Decode

func (s *Serde) Decode(b []byte, v any) error

Decode decodes b into v. If DecodeFn option was not used, this returns ErrNotRegistered.

Serde does not handle references in schemas; it is up to you to register the full decode function for any top-level ID, regardless of how many other schemas are referenced in top-level ID.

func (*Serde) DecodeID

func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error)

DecodeID decodes an ID from b, returning the ID and the remaining bytes, or an error.

func (*Serde) DecodeIndex

func (s *Serde) DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)

DecodeIndex decodes at most maxLength of a schema index from in, returning the index and remaining bytes, or an error. It expects b to be the output of DecodeID (schema ID should already be stripped away).

func (*Serde) DecodeNew

func (s *Serde) DecodeNew(b []byte) (any, error)

DecodeNew is the same as Decode, but decodes into a new value rather than the input value. If DecodeFn was not used, this returns ErrNotRegistered. GenerateFn can be used to control the instantiation of a new value, otherwise this uses reflect.New(reflect.TypeOf(v)).Interface().

func (*Serde) Encode

func (s *Serde) Encode(v any) ([]byte, error)

Encode encodes a value and prepends the header according to the configured SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered.

func (*Serde) MustAppendEncode

func (s *Serde) MustAppendEncode(b []byte, v any) []byte

MustAppendEncode returns the value of AppendEncode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) MustEncode

func (s *Serde) MustEncode(v any) []byte

MustEncode returns the value of Encode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) Register

func (s *Serde) Register(id int, v any, opts ...EncodingOpt)

Register registers a schema ID and the value it corresponds to, as well as the encoding or decoding functions. You need to register functions depending on whether you are only encoding, only decoding, or both.

type SerdeHeader

type SerdeHeader interface {
	// AppendEncode encodes a schema ID and optional index to b, returning the
	// updated slice or an error.
	AppendEncode(b []byte, id int, index []int) ([]byte, error)
	// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
	// or an error.
	DecodeID(in []byte) (id int, out []byte, err error)
	// DecodeIndex decodes at most maxLength of a schema index from in,
	// returning the index and remaining bytes, or an error.
	DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)
}

SerdeHeader encodes and decodes a message header.

type SerdeOpt

type SerdeOpt interface {
	// contains filtered or unexported methods
}

SerdeOpt is an option to configure Serde.

func Header(header SerdeHeader) SerdeOpt

Header defines the SerdeHeader used to encode and decode the message header.

type SerdeOrEncodingOpt

type SerdeOrEncodingOpt interface {
	// contains filtered or unexported methods
}

SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt.

type SetCompatibility

type SetCompatibility struct {
	Level            CompatibilityLevel `json:"compatibility"`                // The subject (or global) compatibility level.
	Alias            string             `json:"alias,omitempty"`              // The subject alias, if any.
	Normalize        bool               `json:"normalize,omitempty"`          // Whether or not schemas are normalized by default.
	Group            string             `json:"compatibilityGroup,omitempty"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility.
	DefaultMetadata  *SchemaMetadata    `json:"defaultMetadata,omitempty"`    // Default metadata used for schema registration.
	OverrideMetadata *SchemaMetadata    `json:"overrideMetadata,omitempty"`   // Override metadata used for schema registration.
	DefaultRuleSet   *SchemaRuleSet     `json:"defaultRuleSet,omitempty"`     // Default rule set used for schema registration.
	OverrideRuleSet  *SchemaRuleSet     `json:"overrideRuleSet,omitempty"`    // Override rule set used for schema registration.
}

SetCompatibility contains information used for setting global or per-subject compatibility configuration.

The main difference between this and the CompatibilityResult is that this struct marshals the compatibility level as "compatibility".

type SubjectSchema

type SubjectSchema struct {
	// Subject is the subject for this schema. This usually corresponds to
	// a Kafka topic, and whether this is for a key or value. For example,
	// "foo-key" would be the subject for the foo topic for serializing the
	// key field of a record.
	Subject string `json:"subject"`

	// Version is the version of this subject.
	Version int `json:"version"`

	// ID is the globally unique ID of the schema.
	ID int `json:"id"`

	Schema
}

SubjectSchema pairs the subject, global identifier, and version of a schema with the schema itself.

func CommSubjectSchemas

func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSchema)

CommSubjectSchemas splits l and r into three sets: what is unique in l, what is unique in r, and what is common in both. Duplicates in either map are eliminated.

type SubjectVersion

type SubjectVersion struct {
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

SubjectVersion is a subject version pair.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL