Documentation ¶
Index ¶
- Constants
- func ContainerNeeded() bool
- func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte
- func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte
- func InferFormatSettings(src model.Source, formatSettings model.SerializationFormat) model.SerializationFormat
- func IsKafkaRawMessage(items []abstract.ChangeItem) bool
- func MakeKafkaRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, ...) abstract.ChangeItem
- func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, ...) providers.Provider
- func NewReplicationSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)
- func NewSinkImpl(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger, ...) (abstract.Sinker, error)
- func NewSnapshotSink(cfg *KafkaDestination, registry metrics.Registry, lgr log.Logger) (abstract.Sinker, error)
- func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)
- func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)
- func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)
- func StartKafkaContainer() error
- type Encoding
- type KafkaAuth
- type KafkaConnectionOptions
- type KafkaDestination
- func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *KafkaDestination) CleanupMode() model.CleanupType
- func (d *KafkaDestination) Compatible(src model.Source, transferType abstract.TransferType) error
- func (d *KafkaDestination) GetProviderType() abstract.ProviderType
- func (d *KafkaDestination) HostsNames() ([]string, error)
- func (KafkaDestination) IsDestination()
- func (d *KafkaDestination) MDBClusterID() string
- func (d *KafkaDestination) Serializer() (model.SerializationFormat, bool)
- func (d *KafkaDestination) Transformer() map[string]string
- func (d *KafkaDestination) Validate() error
- func (d *KafkaDestination) WithDefaults()
- type KafkaSource
- func (s *KafkaSource) GetProviderType() abstract.ProviderType
- func (s *KafkaSource) HostsNames() ([]string, error)
- func (s *KafkaSource) IsAppendOnly() bool
- func (s *KafkaSource) IsDefaultMirror() bool
- func (KafkaSource) IsSource()
- func (s *KafkaSource) MDBClusterID() string
- func (s *KafkaSource) Parser() map[string]interface{}
- func (s *KafkaSource) Validate() error
- func (s *KafkaSource) WithDefaults()
- type Mirrareable
- type OffsetPolicy
- type Provider
- func (p *Provider) Activate(_ context.Context, _ *model.TransferOperation, _ abstract.TableMap, ...) error
- func (p *Provider) Sink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) SnapshotSink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Sniffer(_ context.Context) (abstract.Fetchable, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) Type() abstract.ProviderType
- type Source
- type TopicConfigEntry
Constants ¶
View Source
const ( NoEncoding = Encoding("UNCOMPRESSED") GzipEncoding = Encoding("GZIP") SnappyEncoding = Encoding("SNAPPY") LZ4Encoding = Encoding("LZ4") ZstdEncoding = Encoding("ZSTD") )
View Source
const ( NoOffsetPolicy = OffsetPolicy("") // Not specified AtStartOffsetPolicy = OffsetPolicy("at_start") AtEndOffsetPolicy = OffsetPolicy("at_end") )
View Source
const DefaultAuth = "admin"
View Source
const ProviderType = abstract.ProviderType("kafka")
Variables ¶
This section is empty.
Functions ¶
func ContainerNeeded ¶
func ContainerNeeded() bool
func GetKafkaRawMessageData ¶
func GetKafkaRawMessageData(r *abstract.ChangeItem) []byte
func GetKafkaRawMessageKey ¶
func GetKafkaRawMessageKey(r *abstract.ChangeItem) []byte
func InferFormatSettings ¶
func InferFormatSettings(src model.Source, formatSettings model.SerializationFormat) model.SerializationFormat
func IsKafkaRawMessage ¶
func IsKafkaRawMessage(items []abstract.ChangeItem) bool
func MakeKafkaRawMessage ¶
func NewReplicationSink ¶
func NewSinkImpl ¶
func NewSnapshotSink ¶
func ResolveBrokers ¶
func ResolveBrokers(s *KafkaConnectionOptions) ([]string, error)
func ResolveOnPremBrokers ¶
func ResolveOnPremBrokers(connectionOpt *KafkaConnectionOptions, kafkaAuth *KafkaAuth) ([]string, error)
func ResolvePassword ¶
func ResolvePassword(s *KafkaConnectionOptions, kafkaAuth *KafkaAuth) (string, error)
func StartKafkaContainer ¶
func StartKafkaContainer() error
Types ¶
type KafkaAuth ¶
func (*KafkaAuth) GetFranzAuthMechanism ¶
type KafkaConnectionOptions ¶
type KafkaConnectionOptions struct { ClusterID string TLS model.TLSMode TLSFile string `model:"PemFileContent"` Brokers []string SubNetworkID string }
func (*KafkaConnectionOptions) BrokersHostnames ¶
func (o *KafkaConnectionOptions) BrokersHostnames() ([]string, error)
BrokersHostnames returns a list of brokers' hostnames
type KafkaDestination ¶
type KafkaDestination struct { Connection *KafkaConnectionOptions Auth *KafkaAuth SecurityGroupIDs []string // The setting from segmentio/kafka-go Writer. // Tunes max length of one message (see usages of BatchBytes in kafka-go) // Msg size: len(key)+len(val)+14 // By default is 0 - then kafka-go set it into 1048576. // When set it to not default - remember than managed kafka (server-side) has default max.message.bytes == 1048588 BatchBytes int64 ParralelWriterCount int Topic string // full-name version TopicPrefix string AddSystemTables bool // private options - to not skip consumer_keeper & other system tables SaveTxOrder bool // for now, 'FormatSettings' is private option - it's WithDefaults(): SerializationFormatAuto - 'Mirror' for queues, 'Debezium' for the rest FormatSettings model.SerializationFormat TopicConfigEntries []TopicConfigEntry // Compression which compression mechanism use for writer, default - None Compression Encoding }
func DestinationRecipe ¶
func DestinationRecipe() (*KafkaDestination, error)
func (*KafkaDestination) BuffererConfig ¶
func (d *KafkaDestination) BuffererConfig() bufferer.BuffererConfig
func (*KafkaDestination) CleanupMode ¶
func (d *KafkaDestination) CleanupMode() model.CleanupType
func (*KafkaDestination) Compatible ¶
func (d *KafkaDestination) Compatible(src model.Source, transferType abstract.TransferType) error
func (*KafkaDestination) GetProviderType ¶
func (d *KafkaDestination) GetProviderType() abstract.ProviderType
func (*KafkaDestination) HostsNames ¶
func (d *KafkaDestination) HostsNames() ([]string, error)
func (KafkaDestination) IsDestination ¶
func (KafkaDestination) IsDestination()
func (*KafkaDestination) MDBClusterID ¶
func (d *KafkaDestination) MDBClusterID() string
func (*KafkaDestination) Serializer ¶
func (d *KafkaDestination) Serializer() (model.SerializationFormat, bool)
func (*KafkaDestination) Transformer ¶
func (d *KafkaDestination) Transformer() map[string]string
func (*KafkaDestination) Validate ¶
func (d *KafkaDestination) Validate() error
func (*KafkaDestination) WithDefaults ¶
func (d *KafkaDestination) WithDefaults()
type KafkaSource ¶
type KafkaSource struct { Connection *KafkaConnectionOptions Auth *KafkaAuth Topic string GroupTopics []string Transformer *model.DataTransformOptions BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source SecurityGroupIDs []string ParserConfig map[string]interface{} IsHomo bool // enabled kafka mirror protocol which can work only with kafka target SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions OffsetPolicy OffsetPolicy // specify from what topic part start message consumption }
func MustSourceRecipe ¶
func MustSourceRecipe() *KafkaSource
func SourceRecipe ¶
func SourceRecipe() (*KafkaSource, error)
func (*KafkaSource) GetProviderType ¶
func (s *KafkaSource) GetProviderType() abstract.ProviderType
func (*KafkaSource) HostsNames ¶
func (s *KafkaSource) HostsNames() ([]string, error)
func (*KafkaSource) IsAppendOnly ¶
func (s *KafkaSource) IsAppendOnly() bool
func (*KafkaSource) IsDefaultMirror ¶
func (s *KafkaSource) IsDefaultMirror() bool
func (KafkaSource) IsSource ¶
func (KafkaSource) IsSource()
func (*KafkaSource) MDBClusterID ¶
func (s *KafkaSource) MDBClusterID() string
func (*KafkaSource) Parser ¶
func (s *KafkaSource) Parser() map[string]interface{}
func (*KafkaSource) Validate ¶
func (s *KafkaSource) Validate() error
func (*KafkaSource) WithDefaults ¶
func (s *KafkaSource) WithDefaults()
type Mirrareable ¶
type Mirrareable interface {
ForceMirror()
}
type OffsetPolicy ¶
type OffsetPolicy string
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(_ context.Context, _ *model.TransferOperation, _ abstract.TableMap, _ providers.ActivateCallbacks) error
func (*Provider) SnapshotSink ¶
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type TopicConfigEntry ¶
type TopicConfigEntry struct {
ConfigName, ConfigValue string
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.