Documentation ¶
Overview ¶
Package sr provides a schema registry client and a helper type to encode values and decode data according to the schema registry wire format.
As mentioned on the Serde type, this package does not provide schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.
The client does not automatically cache schemas, instead, the Serde type is used for the actual caching of IDs to how to encode/decode the IDs. The Client type itself simply speaks http to your schema registry and returns the results.
To read more about the schema registry, see the following:
https://docs.confluent.io/platform/current/schema-registry/develop/api.html
Index ¶
- Constants
- Variables
- func AppendEncode(b []byte, v any, h SerdeHeader, id int, index []int, ...) ([]byte, error)
- func Encode(v any, h SerdeHeader, id int, index []int, enc func(any) ([]byte, error)) ([]byte, error)
- func WithParams(ctx context.Context, p ...Param) context.Context
- type CheckCompatibilityResult
- type Client
- func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error)
- func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error)
- func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult
- func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
- func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error
- func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteHow) ([]int, error)
- func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
- func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult
- func (cl *Client) Opts() []ClientOpt
- func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult
- func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult
- func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error)
- func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error)
- func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error)
- func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error)
- func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error)
- func (cl *Client) SchemaUsagesByID(ctx context.Context, id int) ([]SubjectSchema, error)
- func (cl *Client) SchemaVersionsByID(ctx context.Context, id int) ([]SubjectVersion, error)
- func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, error)
- func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult
- func (cl *Client) SetMode(ctx context.Context, mode Mode, subjects ...string) []ModeResult
- func (cl *Client) SubjectVersions(ctx context.Context, subject string) ([]int, error)
- func (cl *Client) Subjects(ctx context.Context) ([]string, error)
- func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error)
- func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)
- type ClientOpt
- type CompatibilityLevel
- type CompatibilityResult
- type ConfluentHeader
- type DeleteHow
- type EncodingOpt
- type Mode
- type ModeResult
- type Param
- type ResponseError
- type Schema
- type SchemaMetadata
- type SchemaReference
- type SchemaRule
- type SchemaRuleKind
- type SchemaRuleMode
- type SchemaRuleSet
- type SchemaType
- type Serde
- func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error)
- func (s *Serde) Decode(b []byte, v any) error
- func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error)
- func (s *Serde) DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)
- func (s *Serde) DecodeNew(b []byte) (any, error)
- func (s *Serde) Encode(v any) ([]byte, error)
- func (s *Serde) MustAppendEncode(b []byte, v any) []byte
- func (s *Serde) MustEncode(v any) []byte
- func (s *Serde) Register(id int, v any, opts ...EncodingOpt)
- type SerdeHeader
- type SerdeOpt
- type SerdeOrEncodingOpt
- type SetCompatibility
- type SubjectSchema
- type SubjectVersion
Constants ¶
const ( // SoftDelete performs a soft deletion. SoftDelete = false // HardDelete performs a hard deletion. Values must be soft deleted // before they can be hard deleted. HardDelete = true )
const GlobalSubject = ""
GlobalSubject is a constant to make API usage of requesting global subjects clearer.
Variables ¶
var ( // Normalize is a Param that configures whether or not to normalize // schema's in certain create- or get-schema operations. Normalize = Param{/* contains filtered or unexported fields */} // Verbose is a Param that configures whether or not to return verbose // error messages when checking compatibility. Verbose = Param{/* contains filtered or unexported fields */} // FetchMaxID is a Param that configures whether or not to fetch the // max schema ID in certain get-schema operations. FetchMaxID = Param{/* contains filtered or unexported fields */} // DefaultToGlobal is a Param that changes get-compatibility or // get-mode to return the global compatibility or mode if the requested // subject does not exist. DefaultToGlobal = Param{/* contains filtered or unexported fields */} // Force is a Param that updating the mode if you are setting the mode // to IMPORT and schemas currently exist. Force = Param{/* contains filtered or unexported fields */} // LatestOnly is a Param that configures whether or not to return only // the latest schema in certain get-schema operations. LatestOnly = Param{/* contains filtered or unexported fields */} // ShowDeleted is a Param that configures whether or not to return // deleted schemas or subjects in certain get operations. ShowDeleted = Param{/* contains filtered or unexported fields */} // DeletedOnly is a Param that configures whether to return only // deleted schemas or subjects in certain get operations. DeletedOnly = Param{/* contains filtered or unexported fields */} )
var ( // ErrNotRegistered is returned from Serde when attempting to encode a // value or decode an ID that has not been registered, or when using // Decode with a missing new value function. ErrNotRegistered = errors.New("registration is missing for encode/decode") // ErrBadHeader is returned from Decode when the input slice is shorter // than five bytes, or if the first byte is not the magic 0 byte. ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte") )
Functions ¶
func AppendEncode ¶
func AppendEncode(b []byte, v any, h SerdeHeader, id int, index []int, enc func([]byte, any) ([]byte, error)) ([]byte, error)
AppendEncode encodes a value and prepends the header, appends it to b and returns b. If the encoding function fails, this returns an error.
func Encode ¶
func Encode(v any, h SerdeHeader, id int, index []int, enc func(any) ([]byte, error)) ([]byte, error)
Encode encodes a value and prepends the header. If the encoding function fails, this returns an error.
func WithParams ¶
WithParams adds query parameters to the given context. This is a merge operation: any non-zero parameter is kept. The variadic nature of this allows for a nicer api:
sr.WithParams(ctx, sr.Format("default"), sr.FetchMaxID)
Types ¶
type CheckCompatibilityResult ¶
type CheckCompatibilityResult struct { Is bool `json:"is_compatible"` // Is is true if the schema is compatible. Messages []string `json:"messages"` // Messages contains reasons a schema is not compatible. }
CheckCompatibilityResult is the response from the check compatibility endpoint.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client talks to a schema registry and contains helper functions to serialize and deserialize objects according to schemas.
func (*Client) AllSchemas ¶
func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error)
AllSchemas returns all schemas for all subjects.
This supports params SubjectPrefix, ShowDeleted, and LatestOnly.
func (*Client) CheckCompatibility ¶
func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error)
CheckCompatibility checks if a schema is compatible with the given version that exists. You can use -1 to check compatibility with the latest version, and -2 to check compatibility against all versions.
func (*Client) Compatibility ¶
func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult
Compatibility returns the subject compatibility and global compatibility of each requested subject. The global compatibility can be requested by using either an empty subject or by specifying no subjects.
This supports params DefaultToGlobal.
This can return 200 or 500 per result.
func (*Client) CreateSchema ¶
func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
CreateSchema attempts to create a schema in the given subject.
This supports param Normalize.
func (*Client) DeleteSchema ¶
func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error
DeleteSchema deletes the schema at the given version. You must soft delete a schema before it can be hard deleted. You can use -1 to delete the latest version.
func (*Client) DeleteSubject ¶
DeleteSubject deletes the subject. You must soft delete a subject before it can be hard deleted. This returns all versions that were deleted.
func (*Client) LookupSchema ¶
func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
LookupSchema checks to see if a schema is already registered and if so, returns its ID and version in the SubjectSchema.
This supports params Normalize and Deleted.
func (*Client) Mode ¶
func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult
Mode returns the subject and global mode of each requested subject. The global mode can be requested by using either an empty subject or by specifying no subjects.
This supports params DefaultToGlobal.
func (*Client) Opts ¶ added in v1.2.0
Opts returns the options that were used to create this client. This can be as a base to generate a new client, where you can add override options to the end of the original input list.
func (*Client) ResetCompatibility ¶
func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult
ResetCompatibility deletes any subject-level compatibility and reverts to the global default. The global compatibility can be reset by either using an empty subject or by specifying no subjects.
This can return 200 or 500.
func (*Client) ResetMode ¶
func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult
ResetMode deletes any subject modes and reverts to the global default.
func (*Client) SchemaByID ¶
SchemaByID returns the schema for a given schema ID.
This supports params Subject, Format, and FetchMaxID.
func (*Client) SchemaByVersion ¶
func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error)
SchemaByVersion returns the schema for a given subject and version. You can use -1 as the version to return the latest schema.
This supports param ShowDeleted.
func (*Client) SchemaReferences ¶
func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error)
SchemaReferences returns all schemas that references the input subject-version. You can use -1 to check the latest version.
This supports param ShowDeleted.
func (*Client) SchemaTextByID ¶
SchemaTextByID returns the actual text of a schema.
For example, if the schema for an ID is
"{\"type\":\"boolean\"}"
this will return
{"type":"boolean"}
func (*Client) SchemaTextByVersion ¶
func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error)
SchemaTextByVersion returns the actual text of a schema, by subject and version. You can use -1 as the version to return the latest schema.
For example, if the schema for an ID is
"{\"type\":\"boolean\"}"
this will return
{"type":"boolean"}
This supports param ShowDeleted.
func (*Client) SchemaUsagesByID ¶
SchemaUsagesByID returns all usages of a given schema ID. A single schema's can be reused in many subject-versions; this function can be used to map a schema to all subject-versions that use it.
This supports param ShowDeleted.
func (*Client) SchemaVersionsByID ¶
SchemaVersionsByID returns all subject versions associated with a schema ID.
This supports params Subject and ShowDeleted.
func (*Client) Schemas ¶
Schemas returns all schemas for the given subject.
This supports param ShowDeleted.
func (*Client) SetCompatibility ¶
func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult
SetCompatibility sets the compatibility for each requested subject. The global compatibility can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.
func (*Client) SetMode ¶
SetMode sets the mode for each requested subject. The global mode can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.
This supports params Force.
func (*Client) SubjectVersions ¶
SubjectVersions returns all versions for a given subject.
This supports params ShowDeleted and DeletedOnly.
func (*Client) Subjects ¶
Subjects returns subjects available in the registry.
This supports params SubjectPrefix, ShowDeleted, and DeletedOnly.
func (*Client) SubjectsByID ¶
SubjectsByID returns the subjects associated with a schema ID.
This supports params Subject and ShowDeleted.
func (*Client) SupportedTypes ¶
func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)
SupportedTypes returns the schema types that are supported in the schema registry.
type ClientOpt ¶
type ClientOpt interface {
// contains filtered or unexported methods
}
ClientOpt is an option to configure a client.
func BearerToken ¶ added in v1.1.0
BearerToken sets an Authorization header to use for every request. The format will be: "Authorization: Bearer $token".
func DefaultParams ¶
DefaultParams sets default parameters to apply to every request.
func DialTLSConfig ¶
DialTLSConfig sets a tls.Config to use in the default http client.
func HTTPClient ¶
HTTPClient sets the http client that the schema registry client uses, overriding the default client that speaks plaintext with a timeout of 5s.
func URLs ¶
URLs sets the URLs that the client speaks to, overriding the default http://localhost:8081. This option automatically prefixes any URL that is missing an http:// or https:// prefix with http://.
type CompatibilityLevel ¶
type CompatibilityLevel int
CompatibilityLevel as an enum representing config compatibility levels.
const ( CompatNone CompatibilityLevel = 1 + iota CompatBackward CompatBackwardTransitive CompatForward CompatForwardTransitive CompatFull CompatFullTransitive )
func (CompatibilityLevel) MarshalText ¶
func (l CompatibilityLevel) MarshalText() ([]byte, error)
func (CompatibilityLevel) String ¶
func (l CompatibilityLevel) String() string
func (*CompatibilityLevel) UnmarshalText ¶
func (l *CompatibilityLevel) UnmarshalText(text []byte) error
type CompatibilityResult ¶
type CompatibilityResult struct { Subject string `json:"-"` // The subject this compatibility result is for, or empty for the global compatibility.. Level CompatibilityLevel `json:"compatibilityLevel"` // The subject (or global) compatibility level. Alias string `json:"alias"` // The subject alias, if any. Normalize bool `json:"normalize"` // Whether or not schemas are normalized by default. Group string `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. DefaultMetadata *SchemaMetadata `json:"defaultMetadata"` // Default metadata used for schema registration. OverrideMetadata *SchemaMetadata `json:"overrideMetadata"` // Override metadata used for schema registration. DefaultRuleSet *SchemaRuleSet `json:"defaultRuleSet"` // Default rule set used for schema registration. OverrideRuleSet *SchemaRuleSet `json:"overrideRuleSet"` // Override rule set used for schema registration. Err error `json:"-"` // The error received for getting this compatibility. }
CompatibilityResult is the compatibility level for a subject.
type ConfluentHeader ¶
type ConfluentHeader struct{}
ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It starts with 0, then big endian uint32 of the ID, then index (only protobuf), then the encoded message.
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func (*ConfluentHeader) AppendEncode ¶
AppendEncode appends an encoded header to b according to the Confluent wire format and returns it. Error is always nil.
func (*ConfluentHeader) DecodeID ¶
func (*ConfluentHeader) DecodeID(b []byte) (int, []byte, error)
DecodeID strips and decodes the schema ID from b. It returns the ID alongside the unread bytes. If the header does not contain the magic byte or b contains less than 5 bytes it returns ErrBadHeader.
func (*ConfluentHeader) DecodeIndex ¶
DecodeIndex strips and decodes indices from b. It returns the index slice alongside the unread bytes. It expects b to be the output of DecodeID (schema ID should already be stripped away). If maxLength is greater than 0 and the encoded data contains more indices than maxLength the function returns ErrNotRegistered.
type DeleteHow ¶
type DeleteHow bool
DeleteHow is a typed bool indicating how subjects or schemas should be deleted.
type EncodingOpt ¶
type EncodingOpt interface {
// contains filtered or unexported methods
}
EncodingOpt is an option to configure the behavior of Serde.Encode and Serde.Decode.
func AppendEncodeFn ¶
func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt
AppendEncodeFn allows Serde to encode a value to an existing slice. This can be more efficient than EncodeFn; this function is used if it exists.
func DecodeFn ¶
func DecodeFn(fn func([]byte, any) error) EncodingOpt
DecodeFn allows Serde to decode into a value.
func EncodeFn ¶
func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt
EncodeFn allows Serde to encode a value.
func GenerateFn ¶
func GenerateFn(fn func() any) EncodingOpt
GenerateFn returns a new(Value) that can be decoded into. This function can be used to control the instantiation of a new type for DecodeNew.
func Index ¶
func Index(index ...int) EncodingOpt
Index attaches a message index to a value. A single schema ID can be registered multiple times with different indices.
This option supports schemas that encode many different values from the same schema (namely, protobuf). The index into the schema to encode a particular message is specified with `index`.
NOTE: this option must be used for protobuf schemas.
For more information, see where `message-indexes` are described in:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
type Mode ¶
type Mode int
Mode as an enum representing the "mode" of the registry or a subject.
func (Mode) MarshalText ¶
func (*Mode) UnmarshalText ¶
type ModeResult ¶
type ModeResult struct { Subject string // The subject this mode result is for, or empty for the global mode. Mode Mode // The subject (or global) mode. Err error // The error received for getting this mode. }
ModeResult is the mode for a subject.
type Param ¶
type Param struct {
// contains filtered or unexported fields
}
Param is a parameter that can be passed to various APIs. Each API documents the parameters they accept.
func Format ¶
Format returns a Param that configures how schema's are returned in certain get-schema operations.
For Avro schemas, the Format param supports "default" or "resolved". For Protobuf schemas, the Format param supports "default", "ignore_extensions", or "serialized".
func Subject ¶
Subject returns a Param limiting which subject is returned in certain list-schema or list-subject operations.
func SubjectPrefix ¶
SubjectPrefix returns a Param that filters subjects by prefix when listing schemas.
type ResponseError ¶
type ResponseError struct { // Method is the requested http method. Method string `json:"-"` // URL is the full path that was requested that resulted in this error. URL string `json:"-"` // StatusCode is the status code that was returned for this error. StatusCode int `json:"-"` // Raw contains the raw response body. Raw []byte `json:"-"` ErrorCode int `json:"error_code"` Message string `json:"message"` }
ResponseError is the type returned from the schema registry for errors.
func (*ResponseError) Error ¶
func (e *ResponseError) Error() string
type Schema ¶
type Schema struct { // Schema is the actual unescaped text of a schema. Schema string `json:"schema"` // Type is the type of a schema. The default type is avro. Type SchemaType `json:"schemaType,omitempty"` // References declares other schemas this schema references. See the // docs on SchemaReference for more details. References []SchemaReference `json:"references,omitempty"` // SchemaMetadata is arbitrary information about the schema. SchemaMetadata *SchemaMetadata `json:"metadata,omitempty"` // SchemaRuleSet is a set of rules that govern the schema. SchemaRuleSet *SchemaRuleSet `json:"ruleSet,omitempty"` }
Schema is the object form of a schema for the HTTP API.
type SchemaMetadata ¶
type SchemaMetadata struct { Tags map[string][]string `json:"tags,omitempty"` Properties map[string]string `json:"properties,omitempty"` Sensitive []string `json:"sensitive,omitempty"` }
SchemaMetadata is arbitrary information about the schema or its constituent parts, such as whether a field contains sensitive information or who created a data contract.
type SchemaReference ¶
type SchemaReference struct { Name string `json:"name"` Subject string `json:"subject"` Version int `json:"version"` }
SchemaReference is a way for a one schema to reference another. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. For more details on references, see the following link:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references https://docs.confluent.io/platform/current/schema-registry/develop/api.html
type SchemaRule ¶
type SchemaRule struct { Name string `json:"name"` // Name is a user-defined name to reference the rule. Doc string `json:"doc,omitempty"` // Doc is an optional description of the rule. Kind SchemaRuleKind `json:"kind"` // Kind is the type of rule. Mode SchemaRuleMode `json:"mode"` // Mode is the mode of the rule. Type string `json:"type"` // Type is the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata. Tags []string `json:"tags"` // Tags to which this rule applies. Params map[string]string `json:"params,omitempty"` // Optional params for the rule. Expr string `json:"expr"` // Expr is the rule expression. OnSuccess string `json:"onSuccess,omitempty"` // OnSuccess is an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as "NONE,ERROR" for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ OnFailure string `json:"onFailure,omitempty"` // OnFailure is an optional action to execute if the rule fails, otherwise the built-in action type NONE is used. See OnSuccess for more details. Disabled bool `json:"disabled,omitempty"` // Disabled specifies whether the rule is disabled. }
SchemaRule specifies integrity constraints or data policies in a data contract. These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue
https://docs.confluent.io/platform/current/schema-registry/fundamentals/data-contracts.html#rules
type SchemaRuleKind ¶
type SchemaRuleKind int
SchemaRuleKind as an enum representing the kind of schema rule.
const ( SchemaRuleKindTransform SchemaRuleKind = iota SchemaRuleKindCondition )
func (SchemaRuleKind) MarshalText ¶
func (k SchemaRuleKind) MarshalText() ([]byte, error)
func (SchemaRuleKind) String ¶
func (k SchemaRuleKind) String() string
func (*SchemaRuleKind) UnmarshalText ¶
func (k *SchemaRuleKind) UnmarshalText(text []byte) error
type SchemaRuleMode ¶
type SchemaRuleMode int
SchemaRuleMode specifies a schema rule's mode.
Migration rules can be specified for an UPGRADE, DOWNGRADE, or both (UPDOWN). Migration rules are used during complex schema evolution.
Domain rules can be specified during serialization (WRITE), deserialization (READ) or both (WRITEREAD).
Domain rules can be used to transform the domain values in a message payload.
const ( SchemaRuleModeUpgrade SchemaRuleMode = iota SchemaRuleModeDowngrade SchemaRuleModeUpdown SchemaRuleModeWrite SchemaRuleModeRead SchemaRuleModeWriteRead )
func (SchemaRuleMode) MarshalText ¶
func (m SchemaRuleMode) MarshalText() ([]byte, error)
func (SchemaRuleMode) String ¶
func (m SchemaRuleMode) String() string
func (*SchemaRuleMode) UnmarshalText ¶
func (m *SchemaRuleMode) UnmarshalText(text []byte) error
type SchemaRuleSet ¶
type SchemaRuleSet struct { MigrationRules []SchemaRule `json:"migrationRules,omitempty"` DomainRules []SchemaRule `json:"domainRules,omitempty"` }
SchemaRuleSet groups migration rules and domain validation rules.
type SchemaType ¶
type SchemaType int
SchemaType as an enum representing schema types. The default schema type is avro.
const ( TypeAvro SchemaType = iota TypeProtobuf TypeJSON )
func (SchemaType) MarshalText ¶
func (t SchemaType) MarshalText() ([]byte, error)
func (SchemaType) String ¶
func (t SchemaType) String() string
func (*SchemaType) UnmarshalText ¶
func (t *SchemaType) UnmarshalText(text []byte) error
type Serde ¶
type Serde struct {
// contains filtered or unexported fields
}
Serde encodes and decodes values according to the schema registry wire format. A Serde itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.
To use a Serde for encoding, you must pre-register schema ids and values you will encode, and then you can use the encode functions.
To use a Serde for decoding, you can either pre-register schema ids and values you will consume, or you can discover the schema every time you receive an ErrNotRegistered error from decode.
func NewSerde ¶
func NewSerde(opts ...SerdeOrEncodingOpt) *Serde
NewSerde returns a new Serde using the supplied default options, which are applied to every registered type. These options are always applied first, so you can override them as necessary when registering.
This can be useful if you always want to use the same encoding or decoding functions.
func (*Serde) AppendEncode ¶
AppendEncode encodes a value and prepends the header according to the configured SerdeHeader, appends it to b and returns b. If EncodeFn was not registered, this returns ErrNotRegistered.
func (*Serde) Decode ¶
Decode decodes b into v. If DecodeFn option was not used, this returns ErrNotRegistered.
Serde does not handle references in schemas; it is up to you to register the full decode function for any top-level ID, regardless of how many other schemas are referenced in top-level ID.
func (*Serde) DecodeID ¶
DecodeID decodes an ID from b, returning the ID and the remaining bytes, or an error.
func (*Serde) DecodeIndex ¶
DecodeIndex decodes at most maxLength of a schema index from in, returning the index and remaining bytes, or an error. It expects b to be the output of DecodeID (schema ID should already be stripped away).
func (*Serde) DecodeNew ¶
DecodeNew is the same as Decode, but decodes into a new value rather than the input value. If DecodeFn was not used, this returns ErrNotRegistered. GenerateFn can be used to control the instantiation of a new value, otherwise this uses reflect.New(reflect.TypeOf(v)).Interface().
func (*Serde) Encode ¶
Encode encodes a value and prepends the header according to the configured SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered.
func (*Serde) MustAppendEncode ¶
MustAppendEncode returns the value of AppendEncode, panicking on error. This is a shortcut for if your encode function cannot error.
func (*Serde) MustEncode ¶
MustEncode returns the value of Encode, panicking on error. This is a shortcut for if your encode function cannot error.
type SerdeHeader ¶
type SerdeHeader interface { // AppendEncode encodes a schema ID and optional index to b, returning the // updated slice or an error. AppendEncode(b []byte, id int, index []int) ([]byte, error) // DecodeID decodes an ID from in, returning the ID and the remaining bytes, // or an error. DecodeID(in []byte) (id int, out []byte, err error) // DecodeIndex decodes at most maxLength of a schema index from in, // returning the index and remaining bytes, or an error. DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) }
SerdeHeader encodes and decodes a message header.
type SerdeOpt ¶
type SerdeOpt interface {
// contains filtered or unexported methods
}
SerdeOpt is an option to configure Serde.
func Header ¶
func Header(header SerdeHeader) SerdeOpt
Header defines the SerdeHeader used to encode and decode the message header.
type SerdeOrEncodingOpt ¶
type SerdeOrEncodingOpt interface {
// contains filtered or unexported methods
}
SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt.
type SetCompatibility ¶
type SetCompatibility struct { Level CompatibilityLevel `json:"compatibility"` // The subject (or global) compatibility level. Alias string `json:"alias,omitempty"` // The subject alias, if any. Normalize bool `json:"normalize,omitempty"` // Whether or not schemas are normalized by default. Group string `json:"compatibilityGroup,omitempty"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. DefaultMetadata *SchemaMetadata `json:"defaultMetadata,omitempty"` // Default metadata used for schema registration. OverrideMetadata *SchemaMetadata `json:"overrideMetadata,omitempty"` // Override metadata used for schema registration. DefaultRuleSet *SchemaRuleSet `json:"defaultRuleSet,omitempty"` // Default rule set used for schema registration. OverrideRuleSet *SchemaRuleSet `json:"overrideRuleSet,omitempty"` // Override rule set used for schema registration. }
SetCompatibility contains information used for setting global or per-subject compatibility configuration.
The main difference between this and the CompatibilityResult is that this struct marshals the compatibility level as "compatibility".
type SubjectSchema ¶
type SubjectSchema struct { // Subject is the subject for this schema. This usually corresponds to // a Kafka topic, and whether this is for a key or value. For example, // "foo-key" would be the subject for the foo topic for serializing the // key field of a record. Subject string `json:"subject"` // Version is the version of this subject. Version int `json:"version"` // ID is the globally unique ID of the schema. ID int `json:"id"` Schema }
SubjectSchema pairs the subject, global identifier, and version of a schema with the schema itself.
func CommSubjectSchemas ¶
func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSchema)
CommSubjectSchemas splits l and r into three sets: what is unique in l, what is unique in r, and what is common in both. Duplicates in either map are eliminated.
type SubjectVersion ¶
SubjectVersion is a subject version pair.