Documentation
¶
Index ¶
- Constants
- func Expect(actual, expected interface{}) error
- func RegisterRuleAction(ruleAction RuleAction)
- func RegisterRuleExecutor(ruleExecutor RuleExecutor)
- func RegisterRuleOverride(ruleOverride *RuleOverride)
- func ResolveReferences(c schemaregistry.Client, schema schemaregistry.SchemaInfo, ...) error
- func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.SchemaInfo) (string, error)
- type AbstractFieldRuleExecutor
- type BaseDeserializer
- func (s *BaseDeserializer) ConfigureDeserializer(client schemaregistry.Client, serdeType Type, conf *DeserializerConfig) error
- func (s *BaseDeserializer) GetReaderSchema(subject string) (*schemaregistry.SchemaMetadata, error)
- func (s *BaseDeserializer) GetSchema(topic string, payload []byte) (schemaregistry.SchemaInfo, error)
- type BaseSerializer
- func (s *BaseSerializer) ConfigureSerializer(client schemaregistry.Client, serdeType Type, conf *SerializerConfig) error
- func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error)
- func (s *BaseSerializer) WriteBytes(id int, msgBytes []byte) ([]byte, error)
- type Deserializer
- type DeserializerConfig
- type ErrorAction
- type FailFunc
- type FieldContext
- type FieldRuleExecutor
- type FieldTransform
- type FieldTransformer
- type FieldType
- type MessageFactory
- type Migration
- type NoneAction
- type RuleAction
- type RuleBase
- type RuleConditionErr
- type RuleContext
- func (r *RuleContext) CurrentField() *FieldContext
- func (r *RuleContext) EnterField(containingMessage interface{}, fullName string, name string, ...) (FieldContext, bool)
- func (r *RuleContext) GetParameter(name string) *string
- func (r *RuleContext) GetTags(fullName string) map[string]bool
- func (r *RuleContext) LeaveField()
- type RuleExecutor
- type RuleOverride
- type RuleRegistry
- func (r *RuleRegistry) Clear()
- func (r *RuleRegistry) GetAction(name string) RuleAction
- func (r *RuleRegistry) GetActions() []RuleAction
- func (r *RuleRegistry) GetExecutor(name string) RuleExecutor
- func (r *RuleRegistry) GetExecutors() []RuleExecutor
- func (r *RuleRegistry) GetOverride(name string) *RuleOverride
- func (r *RuleRegistry) GetOverrides() []*RuleOverride
- func (r *RuleRegistry) RegisterAction(ruleAction RuleAction)
- func (r *RuleRegistry) RegisterExecutor(ruleExecutor RuleExecutor)
- func (r *RuleRegistry) RegisterOverride(ruleOverride *RuleOverride)
- type Serde
- func (s *Serde) Close() error
- func (s *Serde) ExecuteMigrations(migrations []Migration, subject string, topic string, msg interface{}) (interface{}, error)
- func (s *Serde) ExecuteRules(subject string, topic string, ruleMode schemaregistry.RuleMode, ...) (interface{}, error)
- func (s *Serde) GetMigrations(subject string, topic string, sourceInfo *schemaregistry.SchemaInfo, ...) ([]Migration, error)
- func (s *Serde) SetRuleRegistry(registry *RuleRegistry, ruleConfig map[string]string) error
- type Serializer
- type SerializerConfig
- type SubjectNameStrategyFunc
- type Type
Constants ¶
const ( // KeySerde denotes a key Serde KeySerde = 1 // ValueSerde denotes a value Serde ValueSerde = 2 )
const ( // EnableValidation enables validation EnableValidation = true // DisableValidation disables validation DisableValidation = false )
const ( // TypeRecord represents a record TypeRecord = 1 // TypeEnum represents an enum TypeEnum = 2 // TypeArray represents an array TypeArray = 3 // TypeMap represents a map TypeMap = 4 // TypeCombined represents a combined TypeCombined = 5 // TypeFixed represents a fixed TypeFixed = 6 // TypeString represents a string TypeString = 7 // TypeBytes represents bytes TypeBytes = 8 // TypeInt represents an int TypeInt = 9 // TypeLong represents a long TypeLong = 10 // TypeFloat represents a float TypeFloat = 11 // TypeDouble represents a double TypeDouble = 12 // TypeBoolean represents a Boolean TypeBoolean = 13 // TypeNull represents a null TypeNull = 14 )
const MagicByte byte = 0x0
MagicByte is prepended to the serialized payload
Variables ¶
This section is empty.
Functions ¶
func Expect ¶
func Expect(actual, expected interface{}) error
Expect compares the actual and expected values
func RegisterRuleAction ¶ added in v2.5.0
func RegisterRuleAction(ruleAction RuleAction)
RegisterRuleAction is used to register a new global rule action.
func RegisterRuleExecutor ¶ added in v2.5.0
func RegisterRuleExecutor(ruleExecutor RuleExecutor)
RegisterRuleExecutor is used to register a new global rule executor.
func RegisterRuleOverride ¶ added in v2.8.0
func RegisterRuleOverride(ruleOverride *RuleOverride)
RegisterRuleOverride is used to register a new global rule override.
func ResolveReferences ¶
func ResolveReferences(c schemaregistry.Client, schema schemaregistry.SchemaInfo, deps map[string]string) error
ResolveReferences resolves schema references
func TopicNameStrategy ¶
func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.SchemaInfo) (string, error)
TopicNameStrategy creates a subject name by appending -[key|value] to the topic name.
Types ¶
type AbstractFieldRuleExecutor ¶ added in v2.5.0
type AbstractFieldRuleExecutor struct {
FieldRuleExecutor
}
AbstractFieldRuleExecutor represents an abstract field rule executor
func (*AbstractFieldRuleExecutor) Transform ¶ added in v2.5.0
func (a *AbstractFieldRuleExecutor) Transform(ctx RuleContext, msg interface{}) (interface{}, error)
Transform transforms the message using the rule
type BaseDeserializer ¶
type BaseDeserializer struct { Serde Conf *DeserializerConfig }
BaseDeserializer represents basic deserializer info
func (*BaseDeserializer) ConfigureDeserializer ¶
func (s *BaseDeserializer) ConfigureDeserializer(client schemaregistry.Client, serdeType Type, conf *DeserializerConfig) error
ConfigureDeserializer configures the Deserializer
func (*BaseDeserializer) GetReaderSchema ¶ added in v2.5.0
func (s *BaseDeserializer) GetReaderSchema(subject string) (*schemaregistry.SchemaMetadata, error)
GetReaderSchema returns a schema for reading
func (*BaseDeserializer) GetSchema ¶
func (s *BaseDeserializer) GetSchema(topic string, payload []byte) (schemaregistry.SchemaInfo, error)
GetSchema returns a schema for a payload
type BaseSerializer ¶
type BaseSerializer struct { Serde Conf *SerializerConfig }
BaseSerializer represents basic serializer info
func (*BaseSerializer) ConfigureSerializer ¶
func (s *BaseSerializer) ConfigureSerializer(client schemaregistry.Client, serdeType Type, conf *SerializerConfig) error
ConfigureSerializer configures the Serializer
func (*BaseSerializer) GetID ¶
func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error)
GetID returns a schema ID for the given schema
func (*BaseSerializer) WriteBytes ¶
func (s *BaseSerializer) WriteBytes(id int, msgBytes []byte) ([]byte, error)
WriteBytes writes the serialized payload prepended by the MagicByte
type Deserializer ¶
type Deserializer interface { ConfigureDeserializer(client schemaregistry.Client, serdeType Type, conf *DeserializerConfig) error // Deserialize will call the MessageFactory to create an object // into which we will unmarshal data. Deserialize(topic string, payload []byte) (interface{}, error) // DeserializeInto will unmarshal data into the given object. DeserializeInto(topic string, payload []byte, msg interface{}) error Close() error }
Deserializer represents a deserializer
type DeserializerConfig ¶
type DeserializerConfig struct { // UseLatestVersion specifies whether to use the latest schema version during deserialization UseLatestVersion bool // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization UseLatestWithMetadata map[string]string // RuleConfig specifies configuration options to the rules RuleConfig map[string]string }
DeserializerConfig is used to pass multiple configuration options to the deserializers.
func NewDeserializerConfig ¶
func NewDeserializerConfig() *DeserializerConfig
NewDeserializerConfig returns a new configuration instance with sane defaults.
type ErrorAction ¶ added in v2.5.0
type ErrorAction struct { }
ErrorAction represents an error action
func (ErrorAction) Close ¶ added in v2.5.0
func (a ErrorAction) Close() error
Close closes the action
func (ErrorAction) Configure ¶ added in v2.5.0
func (a ErrorAction) Configure(clientConfig *schemaregistry.Config, config map[string]string) error
Configure configures the action
func (ErrorAction) Run ¶ added in v2.5.0
func (a ErrorAction) Run(ctx RuleContext, msg interface{}, err error) error
Run runs the action
type FailFunc ¶
FailFunc is a function to call in case of failure
var MaybeFail FailFunc
MaybeFail represents a fail function
func InitFailFunc ¶
InitFailFunc returns an initial fail function
func InitFailFuncBenchmark ¶ added in v2.4.0
InitFailFuncBenchmark returns an initial fail function
type FieldContext ¶ added in v2.5.0
type FieldContext struct { ContainingMessage interface{} FullName string Name string Type FieldType Tags map[string]bool }
FieldContext represents a field context
func (*FieldContext) IsPrimitive ¶ added in v2.5.0
func (f *FieldContext) IsPrimitive() bool
IsPrimitive returns true if the field is a primitive
func (*FieldContext) TypeName ¶ added in v2.5.0
func (f *FieldContext) TypeName() string
TypeName returns the type name
type FieldRuleExecutor ¶ added in v2.5.0
type FieldRuleExecutor interface { RuleExecutor NewTransform(ctx RuleContext) (FieldTransform, error) }
FieldRuleExecutor represents a field rule executor
type FieldTransform ¶ added in v2.5.0
type FieldTransform interface {
Transform(ctx RuleContext, fieldCtx FieldContext, fieldValue interface{}) (interface{}, error)
}
FieldTransform represents a field transform
type FieldTransformer ¶ added in v2.5.0
type FieldTransformer func(ctx RuleContext, fieldTransform FieldTransform, msg interface{}) (interface{}, error)
FieldTransformer represents a field transformer
type MessageFactory ¶
MessageFactory is a factory function, which should return a pointer to an instance into which we will unmarshal wire data. For Avro, the name will be the name of the Avro type if it has one. For JSON Schema, the name will be empty/F. For Protobuf, the name will be the name of the message type.
type Migration ¶ added in v2.5.0
type Migration struct { RuleMode schemaregistry.RuleMode Source *schemaregistry.SchemaMetadata Target *schemaregistry.SchemaMetadata }
Migration represents a migration
type NoneAction ¶ added in v2.5.0
type NoneAction struct { }
NoneAction represents a no-op action
func (NoneAction) Configure ¶ added in v2.5.0
func (a NoneAction) Configure(clientConfig *schemaregistry.Config, config map[string]string) error
Configure configures the action
func (NoneAction) Run ¶ added in v2.5.0
func (a NoneAction) Run(ctx RuleContext, msg interface{}, err error) error
Run runs the action
type RuleAction ¶ added in v2.5.0
type RuleAction interface { RuleBase Run(ctx RuleContext, msg interface{}, err error) error }
RuleAction represents a rule action
type RuleBase ¶ added in v2.5.0
type RuleBase interface { Configure(clientConfig *schemaregistry.Config, config map[string]string) error Type() string Close() error }
RuleBase represents a rule base
type RuleConditionErr ¶ added in v2.5.0
type RuleConditionErr struct { Rule *schemaregistry.Rule Err error }
RuleConditionErr represents a rule condition error
func (RuleConditionErr) Error ¶ added in v2.5.0
func (re RuleConditionErr) Error() string
Error returns the error message
type RuleContext ¶ added in v2.5.0
type RuleContext struct { Source *schemaregistry.SchemaInfo Target *schemaregistry.SchemaInfo Subject string Topic string IsKey bool RuleMode schemaregistry.RuleMode Rule *schemaregistry.Rule Index int Rules []schemaregistry.Rule FieldTransformer FieldTransformer // contains filtered or unexported fields }
RuleContext represents a rule context
func (*RuleContext) CurrentField ¶ added in v2.5.0
func (r *RuleContext) CurrentField() *FieldContext
CurrentField returns the current field context
func (*RuleContext) EnterField ¶ added in v2.5.0
func (r *RuleContext) EnterField(containingMessage interface{}, fullName string, name string, fieldType FieldType, tags []string) (FieldContext, bool)
EnterField enters a field context
func (*RuleContext) GetParameter ¶ added in v2.5.0
func (r *RuleContext) GetParameter(name string) *string
GetParameter returns a parameter by name
func (*RuleContext) GetTags ¶ added in v2.5.0
func (r *RuleContext) GetTags(fullName string) map[string]bool
GetTags returns tags for a full name
func (*RuleContext) LeaveField ¶ added in v2.5.0
func (r *RuleContext) LeaveField()
LeaveField leaves a field context
type RuleExecutor ¶ added in v2.5.0
type RuleExecutor interface { RuleBase Transform(ctx RuleContext, msg interface{}) (interface{}, error) }
RuleExecutor represents a rule executor
type RuleOverride ¶ added in v2.8.0
type RuleOverride struct { // Rule type Type string // Rule action on success OnSuccess *string // Rule action on failure OnFailure *string // Whether the rule is disabled Disabled *bool }
RuleOverride represents a rule override
type RuleRegistry ¶ added in v2.5.4
type RuleRegistry struct {
// contains filtered or unexported fields
}
RuleRegistry is used to store all registered rule executors and actions.
func GlobalRuleRegistry ¶ added in v2.5.4
func GlobalRuleRegistry() *RuleRegistry
GlobalRuleRegistry returns the global rule registry.
func NewRuleRegistry ¶ added in v2.8.0
func NewRuleRegistry() RuleRegistry
NewRuleRegistry creates a Rule Registry instance.
func (*RuleRegistry) Clear ¶ added in v2.5.4
func (r *RuleRegistry) Clear()
Clear clears all registered rules
func (*RuleRegistry) GetAction ¶ added in v2.5.4
func (r *RuleRegistry) GetAction(name string) RuleAction
GetAction fetches a rule action by a given name.
func (*RuleRegistry) GetActions ¶ added in v2.5.4
func (r *RuleRegistry) GetActions() []RuleAction
GetActions fetches all rule actions
func (*RuleRegistry) GetExecutor ¶ added in v2.5.4
func (r *RuleRegistry) GetExecutor(name string) RuleExecutor
GetExecutor fetches a rule executor by a given name.
func (*RuleRegistry) GetExecutors ¶ added in v2.5.4
func (r *RuleRegistry) GetExecutors() []RuleExecutor
GetExecutors fetches all rule executors
func (*RuleRegistry) GetOverride ¶ added in v2.8.0
func (r *RuleRegistry) GetOverride(name string) *RuleOverride
GetOverride fetches a rule override by a given name.
func (*RuleRegistry) GetOverrides ¶ added in v2.8.0
func (r *RuleRegistry) GetOverrides() []*RuleOverride
GetOverrides fetches all rule overrides
func (*RuleRegistry) RegisterAction ¶ added in v2.5.4
func (r *RuleRegistry) RegisterAction(ruleAction RuleAction)
RegisterAction is used to register a new global rule action.
func (*RuleRegistry) RegisterExecutor ¶ added in v2.5.4
func (r *RuleRegistry) RegisterExecutor(ruleExecutor RuleExecutor)
RegisterExecutor is used to register a new rule executor.
func (*RuleRegistry) RegisterOverride ¶ added in v2.8.0
func (r *RuleRegistry) RegisterOverride(ruleOverride *RuleOverride)
RegisterOverride is used to register a new global rule override.
type Serde ¶
type Serde struct { Client schemaregistry.Client SerdeType Type SubjectNameStrategy SubjectNameStrategyFunc MessageFactory MessageFactory FieldTransformer FieldTransformer RuleRegistry *RuleRegistry }
Serde is a common instance for both the serializers and deserializers
func (*Serde) ExecuteMigrations ¶ added in v2.5.0
func (s *Serde) ExecuteMigrations(migrations []Migration, subject string, topic string, msg interface{}) (interface{}, error)
ExecuteMigrations executes the given migrations
func (*Serde) ExecuteRules ¶ added in v2.5.0
func (s *Serde) ExecuteRules(subject string, topic string, ruleMode schemaregistry.RuleMode, source *schemaregistry.SchemaInfo, target *schemaregistry.SchemaInfo, msg interface{}) (interface{}, error)
ExecuteRules executes the given rules
func (*Serde) GetMigrations ¶ added in v2.5.0
func (s *Serde) GetMigrations(subject string, topic string, sourceInfo *schemaregistry.SchemaInfo, target *schemaregistry.SchemaMetadata, msg interface{}) ([]Migration, error)
GetMigrations returns the migration rules for the given subject
func (*Serde) SetRuleRegistry ¶ added in v2.5.4
func (s *Serde) SetRuleRegistry(registry *RuleRegistry, ruleConfig map[string]string) error
SetRuleRegistry sets the rule registry
type Serializer ¶
type Serializer interface { ConfigureSerializer(client schemaregistry.Client, serdeType Type, conf *SerializerConfig) error // Serialize will serialize the given message, which should be a pointer. // For example, in Protobuf, messages are always a pointer to a struct and never just a struct. Serialize(topic string, msg interface{}) ([]byte, error) Close() error }
Serializer represents a serializer
type SerializerConfig ¶
type SerializerConfig struct { // AutoRegisterSchemas determines whether to automatically register schemas during serialization AutoRegisterSchemas bool // UseSchemaID specifies a schema ID to use during serialization UseSchemaID int // UseLatestVersion specifies whether to use the latest schema version during serialization UseLatestVersion bool // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization UseLatestWithMetadata map[string]string // NormalizeSchemas determines whether to normalize schemas during serialization NormalizeSchemas bool // RuleConfig specifies configuration options to the rules RuleConfig map[string]string }
SerializerConfig is used to pass multiple configuration options to the serializers.
func NewSerializerConfig ¶
func NewSerializerConfig() *SerializerConfig
NewSerializerConfig returns a new configuration instance with sane defaults.
type SubjectNameStrategyFunc ¶
type SubjectNameStrategyFunc func(topic string, serdeType Type, schema schemaregistry.SchemaInfo) (string, error)
SubjectNameStrategyFunc determines the subject for the given parameters