kafka

package
v0.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 53 Imported by: 0

Documentation

Overview

Package kafka is a generated GoMock package.

Index

Constants

View Source
const (
	NoEncoding     = Encoding("UNCOMPRESSED")
	GzipEncoding   = Encoding("GZIP")
	SnappyEncoding = Encoding("SNAPPY")
	LZ4Encoding    = Encoding("LZ4")
	ZstdEncoding   = Encoding("ZSTD")
)
View Source
const DefaultAuth = "admin"
View Source
const ProviderType = abstract.ProviderType("kafka")

Variables

This section is empty.

Functions

func ContainerNeeded

func ContainerNeeded() bool

func CreateSourceTopicIfNotExist

func CreateSourceTopicIfNotExist(src *KafkaSource, topic string, lgr log.Logger) error

func GetKafkaRawMessageData

func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte

func GetKafkaRawMessageKey

func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte

func InferFormatSettings

func InferFormatSettings(src server.Source, formatSettings server.SerializationFormat) server.SerializationFormat

func IsKafkaRawMessage

func IsKafkaRawMessage(items []abstract.ChangeItem) bool

func MakeKafkaRawMessage

func MakeKafkaRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, key, data []byte) abstract.ChangeItem

func New

func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, transfer *server.Transfer) providers.Provider

func NewReplicationSink

func NewReplicationSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)

func NewSinkImpl

func NewSinkImpl(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger, client client, isSnapshot bool) (abstract.Sinker, error)

func NewSnapshotSink

func NewSnapshotSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)

func NewSource

func NewSource(transferID string, cfg *KafkaSource, logger log.Logger, registry metrics.Registry) (*publisher, error)

func ResolveBrokers

func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)

func ResolveOnPremBrokers

func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)

func ResolvePassword

func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)

func StartKafkaContainer

func StartKafkaContainer() error

Types

type Encoding

type Encoding string

func (Encoding) AsKafka

func (e Encoding) AsKafka() kafka.Compression

type KafkaAuth

type KafkaAuth struct {
	Enabled   bool
	Mechanism string
	User      string
	Password  string
}

func (*KafkaAuth) GetAuthMechanism

func (a *KafkaAuth) GetAuthMechanism() (sasl.Mechanism, error)

func (*KafkaAuth) GetFranzAuthMechanism

func (a *KafkaAuth) GetFranzAuthMechanism() franzsasl.Mechanism

type KafkaConnectionOptions

type KafkaConnectionOptions struct {
	ClusterID    string
	TLS          model.TLSMode
	TLSFile      string `model:"PemFileContent"`
	Brokers      []string
	SubNetworkID string
}

func (*KafkaConnectionOptions) BrokersHostnames

func (o *KafkaConnectionOptions) BrokersHostnames() ([]string, error)

BrokersHostnames returns a list of brokers' hostnames

func (*KafkaConnectionOptions) TLSConfig

func (o *KafkaConnectionOptions) TLSConfig() (*tls.Config, error)

type KafkaDestination

type KafkaDestination struct {
	Connection       *KafkaConnectionOptions
	Auth             *KafkaAuth
	SecurityGroupIDs []string

	// The setting from segmentio/kafka-go Writer.
	// Tunes max length of one message (see usages of BatchBytes in kafka-go)
	// Msg size: len(key)+len(val)+14
	// By default is 0 - then kafka-go set it into 1048576.
	// When set it to not default - remember than managed kafka (server-side) has default max.message.bytes == 1048588
	BatchBytes          int64
	ParralelWriterCount int

	Topic       string // full-name version
	TopicPrefix string

	AddSystemTables bool // private options - to not skip consumer_keeper & other system tables
	SaveTxOrder     bool

	// for now, 'FormatSettings' is private option - it's WithDefaults(): SerializationFormatAuto - 'Mirror' for queues, 'Debezium' for the rest
	FormatSettings server.SerializationFormat

	TopicConfigEntries []TopicConfigEntry

	// Compression which compression mechanism use for writer, default - None
	Compression Encoding
}

func DestinationRecipe

func DestinationRecipe() (*KafkaDestination, error)

func (*KafkaDestination) BuffererConfig

func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig

func (*KafkaDestination) CleanupMode

func (d *KafkaDestination) CleanupMode() server.CleanupType

func (*KafkaDestination) Compatible

func (d *KafkaDestination) Compatible(src server.Source, transferType abstract.TransferType) error

func (*KafkaDestination) GetProviderType

func (d *KafkaDestination) GetProviderType() abstract.ProviderType

func (*KafkaDestination) HostsNames

func (d *KafkaDestination) HostsNames() ([]string, error)

func (KafkaDestination) IsDestination

func (KafkaDestination) IsDestination()

func (*KafkaDestination) MDBClusterID

func (d *KafkaDestination) MDBClusterID() string

func (*KafkaDestination) Serializer

func (d *KafkaDestination) Serializer() (server.SerializationFormat, bool)

func (*KafkaDestination) Transformer

func (d *KafkaDestination) Transformer() map[string]string

func (*KafkaDestination) Validate

func (d *KafkaDestination) Validate() error

func (*KafkaDestination) WithDefaults

func (d *KafkaDestination) WithDefaults()

type KafkaSource

type KafkaSource struct {
	Connection  *KafkaConnectionOptions
	Auth        *KafkaAuth
	Topic       string
	GroupTopics []string
	Transformer *server.DataTransformOptions

	BufferSize server.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source

	SecurityGroupIDs []string

	ParserConfig        map[string]interface{}
	IsHomo              bool // enabled kafka mirror protocol which can work only with kafka target
	SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions
}

func MustSourceRecipe

func MustSourceRecipe() *KafkaSource

func SourceRecipe

func SourceRecipe() (*KafkaSource, error)

func (*KafkaSource) GetProviderType

func (s *KafkaSource) GetProviderType() abstract.ProviderType

func (*KafkaSource) HostsNames

func (s *KafkaSource) HostsNames() ([]string, error)

func (*KafkaSource) IsAppendOnly

func (s *KafkaSource) IsAppendOnly() bool

func (*KafkaSource) IsDefaultMirror

func (s *KafkaSource) IsDefaultMirror() bool

func (KafkaSource) IsSource

func (KafkaSource) IsSource()

func (*KafkaSource) MDBClusterID

func (s *KafkaSource) MDBClusterID() string

func (*KafkaSource) Parser

func (s *KafkaSource) Parser() map[string]interface{}

func (*KafkaSource) Validate

func (s *KafkaSource) Validate() error

func (*KafkaSource) WithDefaults

func (s *KafkaSource) WithDefaults()

type Mirrareable

type Mirrareable interface {
	ForceMirror()
}

type Mockclient

type Mockclient struct {
	// contains filtered or unexported fields
}

Mockclient is a mock of client interface.

func NewMockclient

func NewMockclient(ctrl *gomock.Controller) *Mockclient

NewMockclient creates a new mock instance.

func (*Mockclient) BuildWriter

func (m *Mockclient) BuildWriter(broker []string, compression kafka.Compression, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) writer

BuildWriter mocks base method.

func (*Mockclient) CreateTopicIfNotExist

func (m *Mockclient) CreateTopicIfNotExist(broker []string, topic string, mechanism sasl.Mechanism, tlsConfig *tls.Config, entries []TopicConfigEntry) error

CreateTopicIfNotExist mocks base method.

func (*Mockclient) EXPECT

func (m *Mockclient) EXPECT() *MockclientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

type MockclientMockRecorder

type MockclientMockRecorder struct {
	// contains filtered or unexported fields
}

MockclientMockRecorder is the mock recorder for Mockclient.

func (*MockclientMockRecorder) BuildWriter

func (mr *MockclientMockRecorder) BuildWriter(broker, compression, saslMechanism, tlsConfig any) *gomock.Call

BuildWriter indicates an expected call of BuildWriter.

func (*MockclientMockRecorder) CreateTopicIfNotExist

func (mr *MockclientMockRecorder) CreateTopicIfNotExist(broker, topic, mechanism, tlsConfig, entries any) *gomock.Call

CreateTopicIfNotExist indicates an expected call of CreateTopicIfNotExist.

type Mockwriter

type Mockwriter struct {
	// contains filtered or unexported fields
}

Mockwriter is a mock of writer interface.

func NewMockwriter

func NewMockwriter(ctrl *gomock.Controller) *Mockwriter

NewMockwriter creates a new mock instance.

func (*Mockwriter) Close

func (m *Mockwriter) Close() error

Close mocks base method.

func (*Mockwriter) EXPECT

func (m *Mockwriter) EXPECT() *MockwriterMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*Mockwriter) WriteMessages

func (m *Mockwriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

WriteMessages mocks base method.

type MockwriterMockRecorder

type MockwriterMockRecorder struct {
	// contains filtered or unexported fields
}

MockwriterMockRecorder is the mock recorder for Mockwriter.

func (*MockwriterMockRecorder) Close

func (mr *MockwriterMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close.

func (*MockwriterMockRecorder) WriteMessages

func (mr *MockwriterMockRecorder) WriteMessages(ctx any, msgs ...any) *gomock.Call

WriteMessages indicates an expected call of WriteMessages.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (*Provider) Sink

func (*Provider) SnapshotSink

func (p *Provider) SnapshotSink(middlewares.Config) (abstract.Sinker, error)

func (*Provider) Sniffer

func (p *Provider) Sniffer(ctx context.Context) (abstract.Fetchable, error)

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type TopicConfigEntry

type TopicConfigEntry struct {
	ConfigName, ConfigValue string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL