packer

package
v0.0.0-rc11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

README

Packers

Packer - convert kafka schema & payload into debezium messages

Packers should be thread-safe.

We have 3 functional packers:

  • PackerIncludeSchema - packer who includes schema - default debezium behaviour
  • PackerSkipSchema - packer who skips schema - behaviour like debezium 'schema.enable:false'
  • PackerSchemaRegistry - packer who uses confluent schema registry (json) - confluent SR, converting kafka schema into confluent schema & resolving schema into schemaID & writes in confluent wire format

And two auxiliary packers

  • PackerCacheFinalSchema - packer who caches final schema (it's actually pattern 'decorator') - keeping tableSchemaAsString->finalSchemaBytes cache, and not to calls 'BuildFinalSchema' 2nd time.
  • lightning_cache/PackerLightningCache - packer who exists only in one batch serialization (it's actually pattern 'decorator'), cacher either schemaID or finalSchema - in dependency of base packer.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BuilderFunc

type BuilderFunc = func(changeItem *abstract.ChangeItem) ([]byte, error)

type DefaultSessionPackers

type DefaultSessionPackers struct {
	// contains filtered or unexported fields
}

func NewDefaultSessionPackers

func NewDefaultSessionPackers(k, v Packer) *DefaultSessionPackers

func (*DefaultSessionPackers) Packer

func (p *DefaultSessionPackers) Packer(isKey bool) Packer

type Packer

type Packer interface {
	// Pack  - builds payload & schema into message
	Pack(
		changeItem *abstract.ChangeItem,
		payloadBuilder BuilderFunc,
		kafkaSchemaBuilder BuilderFunc,
		maybeCachedRawSchema []byte,
	) ([]byte, error)

	// BuildFinalSchema
	// This interface is used for caching
	// If your packer is wrapped into 'PackerCacheFinalSchema' - it will call BuildFinalSchema only when schema is not cached
	BuildFinalSchema(changeItem *abstract.ChangeItem, kafkaSchemaBuilder BuilderFunc) ([]byte, error)
	IsDropSchema() bool

	GetSchemaIDResolver() SchemaIDResolver
}

func NewKeyPackerFromDebeziumParameters

func NewKeyPackerFromDebeziumParameters(connectorParameters map[string]string, logger log.Logger) (Packer, error)

func NewValuePackerFromDebeziumParameters

func NewValuePackerFromDebeziumParameters(connectorParameters map[string]string, logger log.Logger) (Packer, error)

type PackerCacheFinalSchema

type PackerCacheFinalSchema struct {
	// contains filtered or unexported fields
}

func NewPackerCacheFinalSchema

func NewPackerCacheFinalSchema(in Packer) *PackerCacheFinalSchema

func (*PackerCacheFinalSchema) BuildFinalSchema

func (c *PackerCacheFinalSchema) BuildFinalSchema(changeItem *abstract.ChangeItem, schemaBuilder BuilderFunc) ([]byte, error)

func (*PackerCacheFinalSchema) GetSchemaIDResolver

func (c *PackerCacheFinalSchema) GetSchemaIDResolver() SchemaIDResolver

func (*PackerCacheFinalSchema) IsDropSchema

func (c *PackerCacheFinalSchema) IsDropSchema() bool

func (*PackerCacheFinalSchema) Pack

func (c *PackerCacheFinalSchema) Pack(
	changeItem *abstract.ChangeItem,
	payloadBuilder BuilderFunc,
	kafkaSchemaBuilder BuilderFunc,
	_ []byte,
) ([]byte, error)

type PackerIncludeSchema

type PackerIncludeSchema struct{}

func NewPackerIncludeSchema

func NewPackerIncludeSchema() *PackerIncludeSchema

func (*PackerIncludeSchema) BuildFinalSchema

func (s *PackerIncludeSchema) BuildFinalSchema(changeItem *abstract.ChangeItem, schemaBuilder BuilderFunc) ([]byte, error)

func (*PackerIncludeSchema) GetSchemaIDResolver

func (*PackerIncludeSchema) GetSchemaIDResolver() SchemaIDResolver

func (*PackerIncludeSchema) IsDropSchema

func (s *PackerIncludeSchema) IsDropSchema() bool

func (*PackerIncludeSchema) Pack

func (s *PackerIncludeSchema) Pack(
	changeItem *abstract.ChangeItem,
	payloadBuilder BuilderFunc,
	kafkaSchemaBuilder BuilderFunc,
	maybeCachedRawSchema []byte,
) ([]byte, error)

type PackerSchemaRegistry

type PackerSchemaRegistry struct {
	// contains filtered or unexported fields
}

func NewPackerSchemaRegistry

func NewPackerSchemaRegistry(
	srClient *confluent.SchemaRegistryClient,
	subjectNameStrategy string,
	isKeyProcessor bool,
	writeIntoOneTopic bool,
	topic string,
) *PackerSchemaRegistry

func (*PackerSchemaRegistry) BuildFinalSchema

func (s *PackerSchemaRegistry) BuildFinalSchema(changeItem *abstract.ChangeItem, kafkaSchemaBuilder BuilderFunc) ([]byte, error)

func (*PackerSchemaRegistry) GetSchemaIDResolver

func (s *PackerSchemaRegistry) GetSchemaIDResolver() SchemaIDResolver

func (*PackerSchemaRegistry) IsDropSchema

func (s *PackerSchemaRegistry) IsDropSchema() bool

func (*PackerSchemaRegistry) Pack

func (s *PackerSchemaRegistry) Pack(
	changeItem *abstract.ChangeItem,
	payloadBuilder BuilderFunc,
	kafkaSchemaBuilder BuilderFunc,
	maybeCachedRawSchema []byte,
) ([]byte, error)

func (*PackerSchemaRegistry) PackWithSchemaID

func (s *PackerSchemaRegistry) PackWithSchemaID(schemaID uint32, payload []byte) ([]byte, error)

func (*PackerSchemaRegistry) ResolveSchemaID

func (s *PackerSchemaRegistry) ResolveSchemaID(schema []byte, table abstract.TableID) (uint32, error)

type PackerSkipSchema

type PackerSkipSchema struct {
}

func NewPackerSkipSchema

func NewPackerSkipSchema() *PackerSkipSchema

func (*PackerSkipSchema) BuildFinalSchema

func (s *PackerSkipSchema) BuildFinalSchema(_ *abstract.ChangeItem, _ BuilderFunc) ([]byte, error)

func (*PackerSkipSchema) GetSchemaIDResolver

func (*PackerSkipSchema) GetSchemaIDResolver() SchemaIDResolver

func (*PackerSkipSchema) IsDropSchema

func (s *PackerSkipSchema) IsDropSchema() bool

func (*PackerSkipSchema) Pack

func (s *PackerSkipSchema) Pack(
	changeItem *abstract.ChangeItem,
	payloadBuilder BuilderFunc,
	_ BuilderFunc,
	_ []byte,
) ([]byte, error)

type SchemaIDResolver

type SchemaIDResolver interface {
	ResolveSchemaID(schema []byte, table abstract.TableID) (uint32, error)
	PackWithSchemaID(schemaID uint32, payload []byte) ([]byte, error)
}

SchemaIDResolver If there are packer with schemaRegistry and wrapped into schemaID resolver It would be called ResolveSchemaID+PackWithSchemaID except 'Pack'

type SessionPackers

type SessionPackers interface {
	Packer(isKey bool) Packer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL