Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type Fetch
- type Flush
- type GenericPluginConfig
- type GtmConfig
- type InputConfig
- type InputMode
- type KafkaCommonConfig
- type KafkaConsumerConfig
- type KafkaGlobalConfig
- type KafkaNetConfig
- type KafkaProducerConfig
- type MongoConfigs
- type MongoConnConfig
- type MongoPosition
- type MongoSource
- type MySQLConfig
- type Offsets
- type PipelineConfig
- type PipelineConfigV2
- type PipelineConfigV3
- type SASL
- type SourceKafkaConfig
- type SourceProbeCfg
- type SourceTiDBConfig
- type TableConfig
- type TargetMySQLWorkerConfig
Constants ¶
View Source
const PipelineConfigV3Version = "1.0"
Variables ¶
View Source
var ( TxnBufferLimit = 1024 MaxNrGravity = 16 )
View Source
var DefaultBinlogSyncerTimeout = "10s"
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.9.10
type Config struct { *flag.FlagSet `json:"-"` EtcdEndpoints string `toml:"etcd-endpoints" json:"etcd-endpoints"` PipelineConfig PipelineConfigV3 `toml:"pipeline" json:"pipeline"` // Log related configuration. Log logutil.LogConfig `toml:"log" json:"log"` HttpAddr string `toml:"http-addr" json:"http-addr"` PositionFile string `toml:"position-file" json:"position-file"` ConfigFile string `toml:"-" json:"-"` ClearPosition bool `toml:"-" json:"-"` Version bool }
Config is the configuration.
func LoadConfigFromFile ¶ added in v0.9.10
func NewConfigFromString ¶
func (*Config) ConfigFromFile ¶ added in v0.9.10
ConfigFromFile loads config from file.
type Flush ¶
type Flush struct { Bytes int `mapstructure:"bytes" toml:"bytes" json:"bytes"` Messages int `mapstructure:"messages" toml:"messages" json:"messages"` Frequency string `mapstructure:"frequency" toml:"frequency" json:"frequency"` MaxMessages int `mapstructure:"max-messages" toml:"max-messages" json:"max-messages"` }
type GenericPluginConfig ¶ added in v0.9.35
type GtmConfig ¶ added in v0.9.10
type GtmConfig struct { UseBufferDuration bool `mapstructure:"use-buffer-duration" toml:"use-buffer-duration" json:"use-buffer-duration"` BufferSize int `mapstructure:"buffer-size" toml:"buffer-size" json:"buffer-size"` ChannelSize int `mapstructure:"channel-size" toml:"channel-size" json:"channel-size"` BufferDurationMs int `mapstructure:"buffer-duration-ms" toml:"buffer-duration-ms" json:"buffer-duration-ms"` }
type InputConfig ¶ added in v0.9.10
type KafkaCommonConfig ¶
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct { MaxWaitTime string `mapstructure:"max-wait-time" toml:"max-wait-time" json:"max-wait-time"` // Fetch is the namespace for controlling how many bytes are retrieved by any // given request. Fetch Fetch `mapstructure:"fetch" toml:"fetch" json:"fetch"` Offsets Offsets `mapstructure:"offsets" toml:"offsets" json:"offsets"` }
type KafkaGlobalConfig ¶
type KafkaGlobalConfig struct { BrokerAddrs []string `mapstructure:"broker-addrs" toml:"broker-addrs" json:"broker-addrs"` CertFile string `mapstructure:"cert-file" toml:"cert-file" json:"cert-file"` KeyFile string `mapstructure:"key-file" toml:"key-file" json:"key-file"` CaFile string `mapstructure:"ca-file" toml:"ca-file" json:"ca-file"` VerifySSL bool `mapstructure:"verify-ssl" toml:"verify-ssl" json:"verify-ssl"` Mode string `mapstructure:"mode" toml:"mode" json:"mode"` Producer *KafkaProducerConfig `mapstructure:"producer" toml:"producer" json:"producer"` Net *KafkaNetConfig `mapstructure:"net" toml:"net" json:"net"` }
type KafkaNetConfig ¶
type KafkaNetConfig struct { // SASL based authentication with broker. While there are multiple SASL authentication methods // the current implementation is limited to plaintext (SASL/PLAIN) authentication SASL SASL `mapstructure:"sasl" toml:"sasl" json:"sasl"` // KeepAlive specifies the keep-alive period for an active network connection. // If zero, keep-alives are disabled. (default is 0: disabled). KeepAlive time.Duration }
type KafkaProducerConfig ¶
type KafkaProducerConfig struct {
Flush Flush `mapstructure:"flush" toml:"flush" json:"flush"`
}
type MongoConfigs ¶ added in v0.9.10
type MongoConfigs struct { MongoSources []MongoSource `toml:"mongo-sources" json:"mongo-sources"` PositionSource *MongoConnConfig `toml:"position-conn" json:"position-conn"` GtmConfig *GtmConfig `toml:"gtm-config" json:"gtm-config"` }
type MongoConnConfig ¶ added in v0.9.10
type MongoConnConfig struct { Host string `mapstructure:"host" toml:"host" json:"host"` Port int `mapstructure:"port" toml:"port" json:"port"` Username string `mapstructure:"username" toml:"username" json:"username"` Password string `mapstructure:"password" toml:"password" json:"password"` Database string `mapstructure:"database" toml:"database" json:"database"` Direct bool `mapstructure:"direct" toml:"direct" json:"direct"` }
type MongoPosition ¶ added in v0.9.10
type MongoPosition bson.MongoTimestamp
type MongoSource ¶ added in v0.9.10
type MongoSource struct { MongoConnConfig *MongoConnConfig `mapstructure:"source" toml:"source" json:"source"` StartPosition *MongoPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"` }
type MySQLConfig ¶ added in v0.9.10
type MySQLConfig struct { IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"` Source *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"` StartPosition *utils.MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"` }
type Offsets ¶
type Offsets struct {
CommitInterval string `toml:"commit-interval" json:"commit-interval"`
}
type PipelineConfig ¶ added in v0.9.10
type PipelineConfig struct { PipelineName string `toml:"name" json:"name"` // Deprecated! // DetectTxn txn is used in: bi-directional transfer, dynamic route DetectTxn bool `toml:"detect-txn" json:"detect-txn"` // UniqueSourceName name of the server UniqueSourceName string `toml:"unique-source-name" json:"unique-source-name"` Input string `toml:"input" json:"input"` Output string `toml:"output" json:"output"` OutputFormat string `toml:"output-format" json:"output-format"` MongoConfig *MongoConfigs `toml:"mongo" json:"mongo"` MySQLConfig *MySQLConfig `toml:"mysql" json:"mysql"` SourceTiDBConfig *SourceTiDBConfig `toml:"source-tidb" json:"source-tidb"` SourceProbeCfg *SourceProbeCfg `toml:"source-probe-config" json:"source-probe-config"` KafkaGlobalConfig *KafkaGlobalConfig `toml:"kafka-global" json:"kafka-global"` // // RouteMode, DynamicKafkaRouteConfig, StaticKafkaRouteConfig, and DBRoutes // are route related configuration RouteMode string `toml:"route-mode" json:"route-mode"` TableConfig []*TableConfig `toml:"table-config" json:"table-config"` TargetMySQL *utils.DBConfig `toml:"target-mysql" json:"target-mysql"` TargetMySQLWorkerCfg *TargetMySQLWorkerConfig `toml:"target-mysql-worker" json:"target-mysql-worker"` // // internal configurations that is not exposed to users // DisableBinlogChecker bool `toml:"-" json:"-"` DebugBinlog bool `toml:"-" json:"-"` BinlogSyncerTimeout string `toml:"-" json:"-"` }
type PipelineConfigV2 ¶ added in v0.9.10
type PipelineConfigV2 struct { PipelineName string `mapstructure:"name" toml:"name" json:"name"` InputPlugins map[string]interface{} `toml:"input" json:"input" mapstructure:"input"` FilterPlugins []interface{} `mapstructure:"filters" toml:"filters" json:"filters,omitempty"` OutputPlugins map[string]interface{} `mapstructure:"output" toml:"output" json:"output"` SchedulerPlugins map[string]interface{} `mapstructure:"scheduler" toml:"scheduler" json:"scheduler,omitempty"` }
func DecodeTomlString ¶ added in v0.9.10
func DecodeTomlString(s string) (*PipelineConfigV2, error)
func (*PipelineConfigV2) IsV3 ¶ added in v0.9.10
func (c *PipelineConfigV2) IsV3() bool
func (*PipelineConfigV2) ToV3 ¶ added in v0.9.10
func (c *PipelineConfigV2) ToV3() PipelineConfigV3
type PipelineConfigV3 ¶ added in v0.9.10
type PipelineConfigV3 struct { PipelineName string `yaml:"name" toml:"name" json:"name"` Version string `yaml:"version" toml:"version" json:"version"` InputPlugin InputConfig `yaml:"input" toml:"input" json:"input"` FilterPlugins []GenericPluginConfig `yaml:"filters" toml:"filters" json:"filters,omitempty"` OutputPlugin GenericPluginConfig `yaml:"output" toml:"output" json:"output"` SchedulerPlugin *GenericPluginConfig `yaml:"scheduler" toml:"scheduler" json:"scheduler,omitempty"` }
func (*PipelineConfigV3) DeepCopy ¶ added in v0.9.10
func (c *PipelineConfigV3) DeepCopy() PipelineConfigV3
func (*PipelineConfigV3) SetDefault ¶ added in v0.9.10
func (c *PipelineConfigV3) SetDefault()
type SourceKafkaConfig ¶ added in v0.9.10
type SourceKafkaConfig struct { BrokerConfig KafkaGlobalConfig `mapstructure:"brokers" toml:"brokers" json:"brokers"` GroupID string `mapstructure:"group-id" toml:"group-id" json:"group-id"` Topics []string `mapstructure:"topic" toml:"topics" json:"topics"` ConsumeFrom string `mapstructure:"consume-from" toml:"consume-from" json:"consume-from"` Common KafkaCommonConfig `mapstructure:"common" toml:"common" json:"common"` Consumer *KafkaConsumerConfig `mapstructure:"consumer" toml:"consumer" json:"consumer"` }
type SourceProbeCfg ¶ added in v0.9.10
type SourceTiDBConfig ¶ added in v0.9.10
type SourceTiDBConfig struct { SourceDB *utils.DBConfig `mapstructure:"source-db" toml:"source-db" json:"source-db"` SourceKafka *SourceKafkaConfig `mapstructure:"source-kafka" toml:"source-kafka" json:"source-kafka"` // OffsetStoreConfig *SourceProbeCfg `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"` PositionRepo *GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"` IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"` }
type TableConfig ¶
type TableConfig struct { Schema string `toml:"schema" json:"schema"` Table string `toml:"table" json:"table"` RenameColumns map[string]string `toml:"rename-columns" json:"rename-columns"` IgnoreColumns []string `toml:"ignore-columns" json:"ignore-columns"` PkOverride []string `toml:"pk-override" json:"pk-override"` ScanColumn string `toml:"scan-column" json:"scan-column"` ScanType string `toml:"scan-type" json:"scan-type"` }
func GetTableConfig ¶
func GetTableConfig(tableConfig []TableConfig, schema string, table string) *TableConfig
type TargetMySQLWorkerConfig ¶
type TargetMySQLWorkerConfig struct { EnableDDL bool `toml:"enable-ddl" json:"enable-ddl"` UseBidirection bool `toml:"use-bidirection" json:"use-bidirection"` UseShadingProxy bool `toml:"use-shading-proxy" json:"use-shading-proxy"` SQLExecutionEngine string `toml:"sql-execution-engine" json:"sql-execution-engine"` Plugins []string `toml:"plugins" json:"plugins"` }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.