Documentation ¶
Overview ¶
Package kafka is a generated GoMock package.
Index ¶
- Constants
- func ContainerNeeded() bool
- func CreateSourceTopicIfNotExist(src *KafkaSource, topic string, lgr log.Logger) error
- func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte
- func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte
- func InferFormatSettings(src server.Source, formatSettings server.SerializationFormat) server.SerializationFormat
- func IsKafkaRawMessage(items []abstract.ChangeItem) bool
- func MakeKafkaRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, ...) abstract.ChangeItem
- func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, ...) providers.Provider
- func NewReplicationSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)
- func NewSinkImpl(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger, ...) (abstract.Sinker, error)
- func NewSnapshotSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)
- func NewSource(transferID string, cfg *KafkaSource, logger log.Logger, ...) (*publisher, error)
- func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)
- func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)
- func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)
- func StartKafkaContainer() error
- type Encoding
- type KafkaAuth
- type KafkaConnectionOptions
- type KafkaDestination
- func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *KafkaDestination) CleanupMode() server.CleanupType
- func (d *KafkaDestination) Compatible(src server.Source, transferType abstract.TransferType) error
- func (d *KafkaDestination) GetProviderType() abstract.ProviderType
- func (d *KafkaDestination) HostsNames() ([]string, error)
- func (KafkaDestination) IsDestination()
- func (d *KafkaDestination) MDBClusterID() string
- func (d *KafkaDestination) Serializer() (server.SerializationFormat, bool)
- func (d *KafkaDestination) Transformer() map[string]string
- func (d *KafkaDestination) Validate() error
- func (d *KafkaDestination) WithDefaults()
- type KafkaSource
- func (s *KafkaSource) GetProviderType() abstract.ProviderType
- func (s *KafkaSource) HostsNames() ([]string, error)
- func (s *KafkaSource) IsAppendOnly() bool
- func (s *KafkaSource) IsDefaultMirror() bool
- func (KafkaSource) IsSource()
- func (s *KafkaSource) MDBClusterID() string
- func (s *KafkaSource) Parser() map[string]interface{}
- func (s *KafkaSource) Validate() error
- func (s *KafkaSource) WithDefaults()
- type Mirrareable
- type Mockclient
- func (m *Mockclient) BuildWriter(broker []string, compression kafka.Compression, saslMechanism sasl.Mechanism, ...) writer
- func (m *Mockclient) CreateTopicIfNotExist(broker []string, topic string, mechanism sasl.Mechanism, tlsConfig *tls.Config, ...) error
- func (m *Mockclient) EXPECT() *MockclientMockRecorder
- type MockclientMockRecorder
- type Mockwriter
- type MockwriterMockRecorder
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *server.TransferOperation, table abstract.TableMap, ...) error
- func (p *Provider) Sink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) SnapshotSink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Sniffer(ctx context.Context) (abstract.Fetchable, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) Type() abstract.ProviderType
- type TopicConfigEntry
Constants ¶
const ( NoEncoding = Encoding("UNCOMPRESSED") GzipEncoding = Encoding("GZIP") SnappyEncoding = Encoding("SNAPPY") LZ4Encoding = Encoding("LZ4") ZstdEncoding = Encoding("ZSTD") )
const DefaultAuth = "admin"
const ProviderType = abstract.ProviderType("kafka")
Variables ¶
This section is empty.
Functions ¶
func ContainerNeeded ¶
func ContainerNeeded() bool
func CreateSourceTopicIfNotExist ¶
func CreateSourceTopicIfNotExist(src *KafkaSource, topic string, lgr log.Logger) error
func GetKafkaRawMessageData ¶
func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte
func GetKafkaRawMessageKey ¶
func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte
func InferFormatSettings ¶
func InferFormatSettings(src server.Source, formatSettings server.SerializationFormat) server.SerializationFormat
func IsKafkaRawMessage ¶
func IsKafkaRawMessage(items []abstract.ChangeItem) bool
func MakeKafkaRawMessage ¶
func NewReplicationSink ¶
func NewSinkImpl ¶
func NewSnapshotSink ¶
func ResolveBrokers ¶
func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)
func ResolveOnPremBrokers ¶
func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)
func ResolvePassword ¶
func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)
func StartKafkaContainer ¶
func StartKafkaContainer() error
Types ¶
type KafkaAuth ¶
func (*KafkaAuth) GetFranzAuthMechanism ¶
type KafkaConnectionOptions ¶
type KafkaConnectionOptions struct { ClusterID string TLS model.TLSMode TLSFile string `model:"PemFileContent"` Brokers []string SubNetworkID string }
func (*KafkaConnectionOptions) BrokersHostnames ¶
func (o *KafkaConnectionOptions) BrokersHostnames() ([]string, error)
BrokersHostnames returns a list of brokers' hostnames
type KafkaDestination ¶
type KafkaDestination struct { Connection *KafkaConnectionOptions Auth *KafkaAuth SecurityGroupIDs []string // The setting from segmentio/kafka-go Writer. // Tunes max length of one message (see usages of BatchBytes in kafka-go) // Msg size: len(key)+len(val)+14 // By default is 0 - then kafka-go set it into 1048576. // When set it to not default - remember than managed kafka (server-side) has default max.message.bytes == 1048588 BatchBytes int64 ParralelWriterCount int Topic string // full-name version TopicPrefix string AddSystemTables bool // private options - to not skip consumer_keeper & other system tables SaveTxOrder bool // for now, 'FormatSettings' is private option - it's WithDefaults(): SerializationFormatAuto - 'Mirror' for queues, 'Debezium' for the rest FormatSettings server.SerializationFormat TopicConfigEntries []TopicConfigEntry // Compression which compression mechanism use for writer, default - None Compression Encoding }
func DestinationRecipe ¶
func DestinationRecipe() (*KafkaDestination, error)
func (*KafkaDestination) BuffererConfig ¶
func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig
func (*KafkaDestination) CleanupMode ¶
func (d *KafkaDestination) CleanupMode() server.CleanupType
func (*KafkaDestination) Compatible ¶
func (d *KafkaDestination) Compatible(src server.Source, transferType abstract.TransferType) error
func (*KafkaDestination) GetProviderType ¶
func (d *KafkaDestination) GetProviderType() abstract.ProviderType
func (*KafkaDestination) HostsNames ¶
func (d *KafkaDestination) HostsNames() ([]string, error)
func (KafkaDestination) IsDestination ¶
func (KafkaDestination) IsDestination()
func (*KafkaDestination) MDBClusterID ¶
func (d *KafkaDestination) MDBClusterID() string
func (*KafkaDestination) Serializer ¶
func (d *KafkaDestination) Serializer() (server.SerializationFormat, bool)
func (*KafkaDestination) Transformer ¶
func (d *KafkaDestination) Transformer() map[string]string
func (*KafkaDestination) Validate ¶
func (d *KafkaDestination) Validate() error
func (*KafkaDestination) WithDefaults ¶
func (d *KafkaDestination) WithDefaults()
type KafkaSource ¶
type KafkaSource struct { Connection *KafkaConnectionOptions Auth *KafkaAuth Topic string GroupTopics []string Transformer *server.DataTransformOptions BufferSize server.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source SecurityGroupIDs []string ParserConfig map[string]interface{} IsHomo bool // enabled kafka mirror protocol which can work only with kafka target SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions }
func MustSourceRecipe ¶
func MustSourceRecipe() *KafkaSource
func SourceRecipe ¶
func SourceRecipe() (*KafkaSource, error)
func (*KafkaSource) GetProviderType ¶
func (s *KafkaSource) GetProviderType() abstract.ProviderType
func (*KafkaSource) HostsNames ¶
func (s *KafkaSource) HostsNames() ([]string, error)
func (*KafkaSource) IsAppendOnly ¶
func (s *KafkaSource) IsAppendOnly() bool
func (*KafkaSource) IsDefaultMirror ¶
func (s *KafkaSource) IsDefaultMirror() bool
func (KafkaSource) IsSource ¶
func (KafkaSource) IsSource()
func (*KafkaSource) MDBClusterID ¶
func (s *KafkaSource) MDBClusterID() string
func (*KafkaSource) Parser ¶
func (s *KafkaSource) Parser() map[string]interface{}
func (*KafkaSource) Validate ¶
func (s *KafkaSource) Validate() error
func (*KafkaSource) WithDefaults ¶
func (s *KafkaSource) WithDefaults()
type Mirrareable ¶
type Mirrareable interface {
ForceMirror()
}
type Mockclient ¶
type Mockclient struct {
// contains filtered or unexported fields
}
Mockclient is a mock of client interface.
func NewMockclient ¶
func NewMockclient(ctrl *gomock.Controller) *Mockclient
NewMockclient creates a new mock instance.
func (*Mockclient) BuildWriter ¶
func (m *Mockclient) BuildWriter(broker []string, compression kafka.Compression, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) writer
BuildWriter mocks base method.
func (*Mockclient) CreateTopicIfNotExist ¶
func (m *Mockclient) CreateTopicIfNotExist(broker []string, topic string, mechanism sasl.Mechanism, tlsConfig *tls.Config, entries []TopicConfigEntry) error
CreateTopicIfNotExist mocks base method.
func (*Mockclient) EXPECT ¶
func (m *Mockclient) EXPECT() *MockclientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockclientMockRecorder ¶
type MockclientMockRecorder struct {
// contains filtered or unexported fields
}
MockclientMockRecorder is the mock recorder for Mockclient.
func (*MockclientMockRecorder) BuildWriter ¶
func (mr *MockclientMockRecorder) BuildWriter(broker, compression, saslMechanism, tlsConfig any) *gomock.Call
BuildWriter indicates an expected call of BuildWriter.
func (*MockclientMockRecorder) CreateTopicIfNotExist ¶
func (mr *MockclientMockRecorder) CreateTopicIfNotExist(broker, topic, mechanism, tlsConfig, entries any) *gomock.Call
CreateTopicIfNotExist indicates an expected call of CreateTopicIfNotExist.
type Mockwriter ¶
type Mockwriter struct {
// contains filtered or unexported fields
}
Mockwriter is a mock of writer interface.
func NewMockwriter ¶
func NewMockwriter(ctrl *gomock.Controller) *Mockwriter
NewMockwriter creates a new mock instance.
func (*Mockwriter) EXPECT ¶
func (m *Mockwriter) EXPECT() *MockwriterMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*Mockwriter) WriteMessages ¶
WriteMessages mocks base method.
type MockwriterMockRecorder ¶
type MockwriterMockRecorder struct {
// contains filtered or unexported fields
}
MockwriterMockRecorder is the mock recorder for Mockwriter.
func (*MockwriterMockRecorder) Close ¶
func (mr *MockwriterMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close.
func (*MockwriterMockRecorder) WriteMessages ¶
func (mr *MockwriterMockRecorder) WriteMessages(ctx any, msgs ...any) *gomock.Call
WriteMessages indicates an expected call of WriteMessages.
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *server.TransferOperation, table abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) SnapshotSink ¶
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type TopicConfigEntry ¶
type TopicConfigEntry struct {
ConfigName, ConfigValue string
}