kafka

package
v0.0.0-rc10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NoEncoding     = Encoding("UNCOMPRESSED")
	GzipEncoding   = Encoding("GZIP")
	SnappyEncoding = Encoding("SNAPPY")
	LZ4Encoding    = Encoding("LZ4")
	ZstdEncoding   = Encoding("ZSTD")
)
View Source
const DefaultAuth = "admin"
View Source
const ProviderType = abstract.ProviderType("kafka")

Variables

This section is empty.

Functions

func ContainerNeeded

func ContainerNeeded() bool

func GetKafkaRawMessageData

func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte

func GetKafkaRawMessageKey

func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte

func InferFormatSettings

func InferFormatSettings(src model.Source, formatSettings model.SerializationFormat) model.SerializationFormat

func IsKafkaRawMessage

func IsKafkaRawMessage(items []abstract.ChangeItem) bool

func MakeKafkaRawMessage

func MakeKafkaRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, key, data []byte) abstract.ChangeItem

func New

func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, transfer *model.Transfer) providers.Provider

func NewReplicationSink

func NewReplicationSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)

func NewSinkImpl

func NewSinkImpl(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger, writerFactory writer.AbstractWriterFactory, isSnapshot bool) (abstract.Sinker, error)

func NewSnapshotSink

func NewSnapshotSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)

func ResolveBrokers

func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)

func ResolveOnPremBrokers

func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)

func ResolvePassword

func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)

func StartKafkaContainer

func StartKafkaContainer() error

Types

type Encoding

type Encoding string

func (Encoding) AsKafka

func (e Encoding) AsKafka() kafka.Compression

type KafkaAuth

type KafkaAuth struct {
	Enabled   bool
	Mechanism string
	User      string
	Password  string
}

func (*KafkaAuth) GetAuthMechanism

func (a *KafkaAuth) GetAuthMechanism() (sasl.Mechanism, error)

func (*KafkaAuth) GetFranzAuthMechanism

func (a *KafkaAuth) GetFranzAuthMechanism() franzsasl.Mechanism

type KafkaConnectionOptions

type KafkaConnectionOptions struct {
	ClusterID    string
	TLS          model.TLSMode
	TLSFile      string `model:"PemFileContent"`
	Brokers      []string
	SubNetworkID string
}

func (*KafkaConnectionOptions) BrokersHostnames

func (o *KafkaConnectionOptions) BrokersHostnames() ([]string, error)

BrokersHostnames returns a list of brokers' hostnames

func (*KafkaConnectionOptions) TLSConfig

func (o *KafkaConnectionOptions) TLSConfig() (*tls.Config, error)

type KafkaDestination

type KafkaDestination struct {
	Connection       *KafkaConnectionOptions
	Auth             *KafkaAuth
	SecurityGroupIDs []string

	// The setting from segmentio/kafka-go Writer.
	// Tunes max length of one message (see usages of BatchBytes in kafka-go)
	// Msg size: len(key)+len(val)+14
	// By default is 0 - then kafka-go set it into 1048576.
	// When set it to not default - remember than managed kafka (server-side) has default max.message.bytes == 1048588
	BatchBytes          int64
	ParralelWriterCount int

	Topic       string // full-name version
	TopicPrefix string

	AddSystemTables bool // private options - to not skip consumer_keeper & other system tables
	SaveTxOrder     bool

	// for now, 'FormatSettings' is private option - it's WithDefaults(): SerializationFormatAuto - 'Mirror' for queues, 'Debezium' for the rest
	FormatSettings model.SerializationFormat

	TopicConfigEntries []TopicConfigEntry

	// Compression which compression mechanism use for writer, default - None
	Compression Encoding
}

func DestinationRecipe

func DestinationRecipe() (*KafkaDestination, error)

func (*KafkaDestination) BuffererConfig

func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig

func (*KafkaDestination) CleanupMode

func (d *KafkaDestination) CleanupMode() model.CleanupType

func (*KafkaDestination) Compatible

func (d *KafkaDestination) Compatible(src model.Source, transferType abstract.TransferType) error

func (*KafkaDestination) GetProviderType

func (d *KafkaDestination) GetProviderType() abstract.ProviderType

func (*KafkaDestination) HostsNames

func (d *KafkaDestination) HostsNames() ([]string, error)

func (KafkaDestination) IsDestination

func (KafkaDestination) IsDestination()

func (*KafkaDestination) MDBClusterID

func (d *KafkaDestination) MDBClusterID() string

func (*KafkaDestination) Serializer

func (d *KafkaDestination) Serializer() (model.SerializationFormat, bool)

func (*KafkaDestination) Transformer

func (d *KafkaDestination) Transformer() map[string]string

func (*KafkaDestination) Validate

func (d *KafkaDestination) Validate() error

func (*KafkaDestination) WithDefaults

func (d *KafkaDestination) WithDefaults()

type KafkaSource

type KafkaSource struct {
	Connection  *KafkaConnectionOptions
	Auth        *KafkaAuth
	Topic       string
	GroupTopics []string
	Transformer *model.DataTransformOptions

	BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source

	SecurityGroupIDs []string

	ParserConfig        map[string]interface{}
	IsHomo              bool // enabled kafka mirror protocol which can work only with kafka target
	SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions
}

func MustSourceRecipe

func MustSourceRecipe() *KafkaSource

func SourceRecipe

func SourceRecipe() (*KafkaSource, error)

func (*KafkaSource) GetProviderType

func (s *KafkaSource) GetProviderType() abstract.ProviderType

func (*KafkaSource) HostsNames

func (s *KafkaSource) HostsNames() ([]string, error)

func (*KafkaSource) IsAppendOnly

func (s *KafkaSource) IsAppendOnly() bool

func (*KafkaSource) IsDefaultMirror

func (s *KafkaSource) IsDefaultMirror() bool

func (KafkaSource) IsSource

func (KafkaSource) IsSource()

func (*KafkaSource) MDBClusterID

func (s *KafkaSource) MDBClusterID() string

func (*KafkaSource) Parser

func (s *KafkaSource) Parser() map[string]interface{}

func (*KafkaSource) Validate

func (s *KafkaSource) Validate() error

func (*KafkaSource) WithDefaults

func (s *KafkaSource) WithDefaults()

type Mirrareable

type Mirrareable interface {
	ForceMirror()
}

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Sink

func (*Provider) SnapshotSink

func (p *Provider) SnapshotSink(middlewares.Config) (abstract.Sinker, error)

func (*Provider) Sniffer

func (p *Provider) Sniffer(_ context.Context) (abstract.Fetchable, error)

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(transferID string, cfg *KafkaSource, logger log.Logger, registry metrics.Registry) (*Source, error)

func (*Source) Fetch

func (p *Source) Fetch() ([]abstract.ChangeItem, error)

func (*Source) Run

func (p *Source) Run(sink abstract.AsyncSink) error

func (*Source) Stop

func (p *Source) Stop()

type TopicConfigEntry

type TopicConfigEntry struct {
	ConfigName, ConfigValue string
}

Directories

Path Synopsis
Code generated by MockGen.
Code generated by MockGen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL