Documentation ¶
Index ¶
- Constants
- Variables
- func StoreGlobalServerConfig(config *ServerConfig)
- type AtomicityLevel
- type CDCV2
- type CSVConfig
- type ChangefeedSchedulerConfig
- type CloudStorageConfig
- type CodecConfig
- type ColumnSelector
- type ConsistentConfig
- type ConsistentMemoryUsage
- type DBConfig
- type DateSeparator
- type DebeziumConfig
- type DebugConfig
- type DispatchRule
- type Duration
- type EventFilterRule
- type FilterConfig
- type GlueSchemaRegistryConfig
- type KVClientConfig
- type KafkaConfig
- type LargeMessageHandleConfig
- type LogConfig
- type LogFileConfig
- type MessagesConfig
- type MetaStoreConfiguration
- type MounterConfig
- type MySQLConfig
- type OAuth2
- type OpenProtocolConfig
- type Protocol
- type PullerConfig
- type PulsarCompressionType
- type PulsarConfig
- type ReplicaConfig
- func (c *ReplicaConfig) Clone() *ReplicaConfig
- func (c *ReplicaConfig) FixMemoryQuota()
- func (c *ReplicaConfig) FixScheduler(inheritV66 bool)
- func (c *ReplicaConfig) Marshal() (string, error)
- func (c *ReplicaConfig) MaskSensitiveData()
- func (c *ReplicaConfig) Scan(value interface{}) error
- func (c *ReplicaConfig) UnmarshalJSON(data []byte) error
- func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error
- func (c ReplicaConfig) Value() (driver.Value, error)
- type SchedulerConfig
- type ServerConfig
- type SinkConfig
- func (s *SinkConfig) CheckCompatibilityWithSinkURI(oldSinkConfig *SinkConfig, sinkURIStr string) error
- func (s *SinkConfig) MaskSensitiveData()
- func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool
- func (s *SinkConfig) ShouldSendBootstrapMsg() bool
- func (s *SinkConfig) ValidateProtocol(scheme string) error
- type SorterConfig
- type SyncedStatusConfig
- type TimeMill
- type TimeSec
- type TomlDuration
Constants ¶
const ( // LargeMessageHandleOptionNone means not handling large message. LargeMessageHandleOptionNone string = "none" // LargeMessageHandleOptionClaimCheck means handling large message by sending to the claim check storage. LargeMessageHandleOptionClaimCheck string = "claim-check" // LargeMessageHandleOptionHandleKeyOnly means handling large message by sending only handle key columns. LargeMessageHandleOptionHandleKeyOnly string = "handle-key-only" )
const ( // DefaultSortDir is the default value of sort-dir, it will be a subordinate directory of data-dir. DefaultSortDir = "/tmp/sorter" // DefaultRedoDir is a subordinate directory path of data-dir. DefaultRedoDir = "/tmp/redo" // DebugConfigurationItem is the name of debug configurations DebugConfigurationItem = "debug" // DefaultChangefeedMemoryQuota is the default memory quota for each changefeed. DefaultChangefeedMemoryQuota = 1024 * 1024 * 1024 // 1GB. // DisableMemoryLimit is the default max memory percentage for TiCDC server. // 0 means no memory limit. DisableMemoryLimit = 0 // EnablePDForwarding is the value of whether to enable PD client forwarding function. // The PD client will forward the requests throughthe follower // If there is a network partition problem between TiCDC and PD leader. EnablePDForwarding = true )
const ( // DefaultMaxMessageBytes sets the default value for max-message-bytes. DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M // DefaultAdvanceTimeoutInSec sets the default value for advance-timeout-in-sec. DefaultAdvanceTimeoutInSec = uint(150) // TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI. TxnAtomicityKey = "transaction-atomicity" // Comma is a constant for ',' Comma = "," // CR is an abbreviation for carriage return CR = '\r' // LF is an abbreviation for line feed LF = '\n' // CRLF is an abbreviation for '\r\n' CRLF = "\r\n" // DoubleQuoteChar is a constant for '"' DoubleQuoteChar = '"' // Backslash is a constant for '\' Backslash = '\\' // NULL is a constant for '\N' NULL = "\\N" // MinFileIndexWidth is the minimum width of file index. MinFileIndexWidth = 6 // enough for 2^19 files // MaxFileIndexWidth is the maximum width of file index. MaxFileIndexWidth = 20 // enough for 2^64 files // DefaultFileIndexWidth is the default width of file index. DefaultFileIndexWidth = MaxFileIndexWidth // BinaryEncodingHex encodes binary data to hex string. BinaryEncodingHex = "hex" // BinaryEncodingBase64 encodes binary data to base64 string. BinaryEncodingBase64 = "base64" // DefaultPulsarProducerCacheSize is the default size of the cache for producers // 10240 producers maybe cost 1.1G memory DefaultPulsarProducerCacheSize = 10240 // DefaultEncoderGroupConcurrency is the default concurrency of encoder group. DefaultEncoderGroupConcurrency = 32 // DefaultSendBootstrapIntervalInSec is the default interval to send bootstrap message. DefaultSendBootstrapIntervalInSec = int64(120) // DefaultSendBootstrapInMsgCount is the default number of messages to send bootstrap message. DefaultSendBootstrapInMsgCount = int32(10000) // DefaultSendBootstrapToAllPartition is the default value of // whether to send bootstrap message to all partitions. DefaultSendBootstrapToAllPartition = true // DefaultSendAllBootstrapAtStart is the default value of whether // to send all tables bootstrap message at changefeed start. DefaultSendAllBootstrapAtStart = false // DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker. // The pulsar client uses an exponential backoff with jitter to reconnect to the broker. // Based on test, when the max reconnect times is 3, // the total time of reconnecting to brokers is about 30 seconds. DefaultMaxReconnectToPulsarBroker = 3 )
const (
// DefaultTiDBSourceID is the default source ID of TiDB cluster.
DefaultTiDBSourceID = 1
)
const (
// ProtocolKey specifies the key of the protocol in the SinkURI.
ProtocolKey = "protocol"
)
Variables ¶
var ( // ReservedClusterIDs contains a list of reserved cluster id, // these words are the part of old cdc etcd key prefix // like: /tidb/cdc/owner ReservedClusterIDs = []string{ "owner", "capture", "task", "changefeed", "job", "meta", } )
Functions ¶
func StoreGlobalServerConfig ¶
func StoreGlobalServerConfig(config *ServerConfig)
StoreGlobalServerConfig stores a new config to the globalServerConfig. It mostly uses in the test to avoid some data races.
Types ¶
type AtomicityLevel ¶
type AtomicityLevel string
AtomicityLevel represents the atomicity level of a changefeed.
func (AtomicityLevel) ShouldSplitTxn ¶
func (l AtomicityLevel) ShouldSplitTxn() bool
ShouldSplitTxn returns whether the sink should split txn.
type CDCV2 ¶
type CDCV2 struct { // Enable represents if the cdc v2 is enabled or not Enable bool `toml:"enable" json:"enable"` // MetaStoreConfig represents config for new meta store configurations MetaStoreConfig MetaStoreConfiguration `toml:"meta-store" json:"meta-store"` }
CDCV2 represents config for ticdc v2
func (*CDCV2) ValidateAndAdjust ¶
ValidateAndAdjust validates the meta store configurations
type CSVConfig ¶
type CSVConfig struct { // delimiter between fields, it can be 1 character or at most 2 characters // It can not be CR or LF or contains CR or LF. // It should have exclusive characters with quote. Delimiter string `toml:"delimiter" json:"delimiter"` // quoting character Quote string `toml:"quote" json:"quote"` // representation of null values NullString string `toml:"null" json:"null"` // whether to include commit ts IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"` // encoding method of binary type BinaryEncodingMethod string `toml:"binary-encoding-method" json:"binary-encoding-method"` // output old value OutputOldValue bool `toml:"output-old-value" json:"output-old-value"` // output handle key OutputHandleKey bool `toml:"output-handle-key" json:"output-handle-key"` }
CSVConfig defines a series of configuration items for csv codec.
type ChangefeedSchedulerConfig ¶
type ChangefeedSchedulerConfig struct { // EnableTableAcrossNodes set true to split one table to multiple spans and // distribute to multiple TiCDC nodes. EnableTableAcrossNodes bool `toml:"enable-table-across-nodes" json:"enable-table-across-nodes"` // RegionThreshold is the region count threshold of splitting a table. RegionThreshold int `toml:"region-threshold" json:"region-threshold"` // WriteKeyThreshold is the written keys threshold of splitting a table. WriteKeyThreshold int `toml:"write-key-threshold" json:"write-key-threshold"` // Deprecated. RegionPerSpan int `toml:"region-per-span" json:"region-per-span"` }
ChangefeedSchedulerConfig is per changefeed scheduler settings.
func (*ChangefeedSchedulerConfig) Validate ¶
func (c *ChangefeedSchedulerConfig) Validate() error
Validate validates the config.
type CloudStorageConfig ¶
type CloudStorageConfig struct { WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` FileSize *int `toml:"file-size" json:"file-size,omitempty"` OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` // OutputRawChangeEvent controls whether to split the update pk/uk events. OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` }
CloudStorageConfig represents a cloud storage sink configuration
func (*CloudStorageConfig) GetOutputRawChangeEvent ¶
func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool
GetOutputRawChangeEvent returns the value of OutputRawChangeEvent
type CodecConfig ¶
type CodecConfig struct { EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"` MaxBatchSize *int `toml:"max-batch-size" json:"max-batch-size,omitempty"` AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"` AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"` AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"` EncodingFormat *string `toml:"encoding-format" json:"encoding-format,omitempty"` }
CodecConfig represents a MQ codec configuration
type ColumnSelector ¶
type ColumnSelector struct { Matcher []string `toml:"matcher" json:"matcher"` Columns []string `toml:"columns" json:"columns"` }
ColumnSelector represents a column selector for a table.
type ConsistentConfig ¶
type ConsistentConfig struct { // Level is the consistency level, it can be `none` or `eventual`. // `eventual` means enable redo log. // Default is `none`. Level string `toml:"level" json:"level"` // MaxLogSize is the max size(MiB) of a log file written by redo log. // Default is 64MiB. MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` // FlushIntervalInMs is the flush interval(ms) of redo log to flush log to storage. // Default is 2000ms. FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` // MetaFlushIntervalInMs is the flush interval(ms) of redo log to // flush meta(resolvedTs and checkpointTs) to storage. // Default is 200ms. MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"` // EncodingWorkerNum is the number of workers to encode `RowChangeEvent“ to redo log. // Default is 16. EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"` // FlushWorkerNum is the number of workers to flush redo log to storage. // Default is 8. FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"` // Storage is the storage path(uri) to store redo log. Storage string `toml:"storage" json:"storage"` // UseFileBackend is a flag to enable file backend for redo log. // file backend means before flush redo log to storage, it will be written to local file. // Default is false. UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` // Compression is the compression algorithm used for redo log. // Default is "", it means no compression, equals to `none`. // Supported compression algorithms are `none` and `lz4`. Compression string `toml:"compression" json:"compression"` // FlushConcurrency is the concurrency of flushing a single log file. // Default is 1. It means a single log file will be flushed by only one worker. // The singe file concurrent flushing feature supports only `s3` storage. FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` // MemoryUsage represents the percentage of ReplicaConfig.MemoryQuota // that can be utilized by the redo log module. MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"` }
ConsistentConfig represents replication consistency config for a changefeed. It is used by redo log functionality.
func (*ConsistentConfig) MaskSensitiveData ¶
func (c *ConsistentConfig) MaskSensitiveData()
MaskSensitiveData masks sensitive data in ConsistentConfig
func (*ConsistentConfig) ValidateAndAdjust ¶
func (c *ConsistentConfig) ValidateAndAdjust() error
ValidateAndAdjust validates the consistency config and adjusts it if necessary.
type ConsistentMemoryUsage ¶
type ConsistentMemoryUsage struct { // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events. MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"` }
ConsistentMemoryUsage represents memory usage of Consistent module.
type DBConfig ¶
type DBConfig struct { // Count is the number of db count. // // The default value is 8. Count int `toml:"count" json:"count"` // MaxOpenFiles is the maximum number of open FD by db sorter. // // The default value is 10000. MaxOpenFiles int `toml:"max-open-files" json:"max-open-files"` // BlockSize the block size of db sorter. // // The default value is 65536, 64KB. BlockSize int `toml:"block-size" json:"block-size"` // WriterBufferSize is the size of memory table of db. // // The default value is 8388608, 8MB. WriterBufferSize int `toml:"writer-buffer-size" json:"writer-buffer-size"` // Compression is the compression algorithm that is used by db. // Valid values are "none" or "snappy". // // The default value is "snappy". Compression string `toml:"compression" json:"compression"` // WriteL0PauseTrigger defines number of db sst file at level-0 that will // pause write. // // The default value is 1<<31 - 1. WriteL0PauseTrigger int `toml:"write-l0-pause-trigger" json:"write-l0-pause-trigger"` // CompactionL0Trigger defines number of db sst file at level-0 that will // trigger compaction. // // The default value is 16, which is based on a performance test on 4K tables. CompactionL0Trigger int `toml:"compaction-l0-trigger" json:"compaction-l0-trigger"` }
DBConfig represents db sorter config.
func NewDefaultDBConfig ¶
func NewDefaultDBConfig() *DBConfig
NewDefaultDBConfig return the default db configuration
func (*DBConfig) ValidateAndAdjust ¶
ValidateAndAdjust validates and adjusts the db configuration
type DateSeparator ¶
type DateSeparator int
DateSeparator specifies the date separator in storage destination path
const ( DateSeparatorNone DateSeparator = iota DateSeparatorYear DateSeparatorMonth DateSeparatorDay )
Enum types of DateSeparator
func (*DateSeparator) FromString ¶
func (d *DateSeparator) FromString(separator string) error
FromString converts the separator from string to DateSeperator enum type.
func (DateSeparator) GetPattern ¶
func (d DateSeparator) GetPattern() string
GetPattern returns the pattern of the date separator.
func (DateSeparator) String ¶
func (d DateSeparator) String() string
type DebeziumConfig ¶
type DebeziumConfig struct {
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
}
DebeziumConfig represents the configurations for debezium protocol encoding
type DebugConfig ¶
type DebugConfig struct { DB *DBConfig `toml:"db" json:"db"` Messages *MessagesConfig `toml:"messages" json:"messages"` // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` // CDCV2 enables ticdc version 2 implementation with new metastore CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` // Puller is the configuration of the puller. Puller *PullerConfig `toml:"puller" json:"puller"` }
DebugConfig represents config for ticdc unexposed feature configurations
func (*DebugConfig) ValidateAndAdjust ¶
func (c *DebugConfig) ValidateAndAdjust() error
ValidateAndAdjust validates and adjusts the debug configuration
type DispatchRule ¶
type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. DispatcherRule string `toml:"dispatcher" json:"dispatcher"` // PartitionRule is an alias added for DispatcherRule to mitigate confusions. // In the future release, the DispatcherRule is expected to be removed . PartitionRule string `toml:"partition" json:"partition"` // IndexName is set when using index-value dispatcher with specified index. IndexName string `toml:"index" json:"index"` // Columns are set when using columns dispatcher. Columns []string `toml:"columns" json:"columns"` TopicRule string `toml:"topic" json:"topic"` }
DispatchRule represents partition rule for a table.
type Duration ¶
Duration wrap time.Duration to override UnmarshalText func
func (*Duration) UnmarshalText ¶
UnmarshalText unmarshal byte to duration
type EventFilterRule ¶
type EventFilterRule struct { Matcher []string `toml:"matcher" json:"matcher"` IgnoreEvent []bf.EventType `toml:"ignore-event" json:"ignore-event"` // regular expression IgnoreSQL []string `toml:"ignore-sql" json:"ignore-sql"` // sql expression IgnoreInsertValueExpr string `toml:"ignore-insert-value-expr" json:"ignore-insert-value-expr"` IgnoreUpdateNewValueExpr string `toml:"ignore-update-new-value-expr" json:"ignore-update-new-value-expr"` IgnoreUpdateOldValueExpr string `toml:"ignore-update-old-value-expr" json:"ignore-update-old-value-expr"` IgnoreDeleteValueExpr string `toml:"ignore-delete-value-expr" json:"ignore-delete-value-expr"` }
EventFilterRule is used by sql event filter and expression filter
type FilterConfig ¶
type FilterConfig struct { Rules []string `toml:"rules" json:"rules"` IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"` EventFilters []*EventFilterRule `toml:"event-filters" json:"event-filters"` }
FilterConfig represents filter config for a changefeed
type GlueSchemaRegistryConfig ¶
type GlueSchemaRegistryConfig struct { // Name of the schema registry RegistryName string `toml:"registry-name" json:"registry-name"` // Region of the schema registry Region string `toml:"region" json:"region"` // AccessKey of the schema registry AccessKey string `toml:"access-key" json:"access-key,omitempty"` // SecretAccessKey of the schema registry SecretAccessKey string `toml:"secret-access-key" json:"secret-access-key,omitempty"` Token string `toml:"token" json:"token,omitempty"` }
GlueSchemaRegistryConfig represents a Glue Schema Registry configuration
func (*GlueSchemaRegistryConfig) NoCredentials ¶
func (g *GlueSchemaRegistryConfig) NoCredentials() bool
NoCredentials returns true if no credentials are set.
func (*GlueSchemaRegistryConfig) Validate ¶
func (g *GlueSchemaRegistryConfig) Validate() error
Validate the GlueSchemaRegistryConfig.
type KVClientConfig ¶
type KVClientConfig struct { EnableMultiplexing bool `toml:"enable-multiplexing" json:"enable-multiplexing"` // how many workers will be used for a single region worker WorkerConcurrent uint `toml:"worker-concurrent" json:"worker-concurrent"` // how many grpc streams will be established to every TiKV node GrpcStreamConcurrent uint `toml:"grpc-stream-concurrent" json:"grpc-stream-concurrent"` // Advance table ResolvedTs interval. AdvanceIntervalInMs uint `toml:"advance-interval-in-ms" json:"advance-interval-in-ms"` // how many goroutines to maintain frontiers. FrontierConcurrent uint `toml:"frontier-concurrent" json:"frontier-concurrent"` // background workerpool size, the workrpool is shared by all goroutines in cdc server WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` // region incremental scan limit for one table in a single store RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` // the total retry duration of connecting a region RegionRetryDuration TomlDuration `toml:"region-retry-duration" json:"region-retry-duration"` }
KVClientConfig represents config for kv client
func NewDefaultKVClientConfig ¶
func NewDefaultKVClientConfig() *KVClientConfig
NewDefaultKVClientConfig return the default kv client configuration
func (*KVClientConfig) ValidateAndAdjust ¶
func (c *KVClientConfig) ValidateAndAdjust() error
ValidateAndAdjust validates and adjusts the kv client configuration
type KafkaConfig ¶
type KafkaConfig struct { PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` Compression *string `toml:"compression" json:"compression,omitempty"` KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` CA *string `toml:"ca" json:"ca,omitempty"` Cert *string `toml:"cert" json:"cert,omitempty"` Key *string `toml:"key" json:"key,omitempty"` InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` // OutputRawChangeEvent controls whether to split the update pk/uk events. OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` }
KafkaConfig represents a kafka sink configuration
func (*KafkaConfig) GetOutputRawChangeEvent ¶
func (k *KafkaConfig) GetOutputRawChangeEvent() bool
GetOutputRawChangeEvent returns the value of OutputRawChangeEvent
func (*KafkaConfig) MaskSensitiveData ¶
func (k *KafkaConfig) MaskSensitiveData()
MaskSensitiveData masks sensitive data in KafkaConfig
type LargeMessageHandleConfig ¶
type LargeMessageHandleConfig struct { LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"` LargeMessageHandleCompression string `toml:"large-message-handle-compression" json:"large-message-handle-compression"` ClaimCheckStorageURI string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"` ClaimCheckRawValue bool `toml:"claim-check-raw-value" json:"claim-check-raw-value"` }
LargeMessageHandleConfig is the configuration for handling large message.
func NewDefaultLargeMessageHandleConfig ¶
func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig
NewDefaultLargeMessageHandleConfig return the default Config.
func (*LargeMessageHandleConfig) AdjustAndValidate ¶
func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error
AdjustAndValidate the Config.
func (*LargeMessageHandleConfig) Disabled ¶
func (c *LargeMessageHandleConfig) Disabled() bool
Disabled returns true if disable large message handle.
func (*LargeMessageHandleConfig) EnableClaimCheck ¶
func (c *LargeMessageHandleConfig) EnableClaimCheck() bool
EnableClaimCheck returns true if enable claim check.
func (*LargeMessageHandleConfig) HandleKeyOnly ¶
func (c *LargeMessageHandleConfig) HandleKeyOnly() bool
HandleKeyOnly returns true if handle large message by encoding handle key only.
type LogConfig ¶
type LogConfig struct { File *LogFileConfig `toml:"file" json:"file"` InternalErrOutput string `toml:"error-output" json:"error-output"` }
LogConfig represents log config for server
type LogFileConfig ¶
type LogFileConfig struct { MaxSize int `toml:"max-size" json:"max-size"` MaxDays int `toml:"max-days" json:"max-days"` MaxBackups int `toml:"max-backups" json:"max-backups"` }
LogFileConfig represents log file config for server
type MessagesConfig ¶
type MessagesConfig struct { ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"` ClientMaxBatchSize int `toml:"client-max-batch-size" json:"client-max-batch-size"` ClientMaxBatchCount int `toml:"client-max-batch-count" json:"client-max-batch-count"` ClientRetryRateLimit float64 `toml:"client-retry-rate-limit" json:"client-retry-rate-limit"` ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"` ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"` ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"` // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"` // After a duration of this time if the server doesn't see any activity it // pings the client to see if the transport is still alive. KeepAliveTime TomlDuration `toml:"keep-alive-time" json:"keep-alive-time"` // After having pinged for keepalive check, the server waits for a duration // of Timeout and if no activity is seen even after that the connection is // closed. KeepAliveTimeout TomlDuration `toml:"keep-alive-timeout" json:"keep-alive-timeout"` }
MessagesConfig configs MessageServer and MessageClient.
func (*MessagesConfig) Clone ¶
func (c *MessagesConfig) Clone() *MessagesConfig
Clone returns a deep copy of the configuration.
func (*MessagesConfig) ToMessageClientConfig ¶
func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig
ToMessageClientConfig converts the MessagesConfig to a MessageClientConfig.
func (*MessagesConfig) ToMessageServerConfig ¶
func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig
ToMessageServerConfig returns a MessageServerConfig that can be used to create a MessageServer.
func (*MessagesConfig) ValidateAndAdjust ¶
func (c *MessagesConfig) ValidateAndAdjust() error
ValidateAndAdjust validates and adjusts the configs.
type MetaStoreConfiguration ¶
type MetaStoreConfiguration struct { // URI is the address of the meta store. // for example: "mysql://127.0.0.1:3306/test" URI string `toml:"uri" json:"uri"` // SSLCA is the path of the CA certificate file. SSLCa string `toml:"ssl-ca" json:"ssl-ca"` SSLCert string `toml:"ssl-cert" json:"ssl-cert"` SSLKey string `toml:"ssl-key" json:"ssl-key"` }
MetaStoreConfiguration represents config for new meta store configurations
type MounterConfig ¶
type MounterConfig struct {
WorkerNum int `toml:"worker-num" json:"worker-num"`
}
MounterConfig represents mounter config for a changefeed
type MySQLConfig ¶
type MySQLConfig struct { WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` Timeout *string `toml:"timeout" json:"timeout,omitempty"` EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` }
MySQLConfig represents a MySQL sink configuration
type OAuth2 ¶
type OAuth2 struct { // OAuth2IssuerURL the URL of the authorization server. OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` // OAuth2Audience the URL of the resource server. OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` // OAuth2PrivateKey the private key used to sign the server. OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` // OAuth2ClientID the client ID of the application. OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` // OAuth2Scope scope OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` }
OAuth2 is the configuration for OAuth2
type OpenProtocolConfig ¶
type OpenProtocolConfig struct {
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
}
OpenProtocolConfig represents the configurations for open protocol encoding
type Protocol ¶
type Protocol int
Protocol is the protocol of the message.
const ( ProtocolUnknown Protocol = iota ProtocolDefault ProtocolCanal ProtocolAvro ProtocolMaxwell ProtocolCanalJSON ProtocolCraft ProtocolOpen ProtocolCsv ProtocolDebezium ProtocolSimple )
Enum types of the Protocol.
func ParseSinkProtocolFromString ¶
ParseSinkProtocolFromString converts the protocol from string to Protocol enum type.
func (Protocol) IsBatchEncode ¶
IsBatchEncode returns whether the protocol is a batch encoder.
type PullerConfig ¶
type PullerConfig struct { // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` // LogRegionDetails determines whether logs Region details or not in puller and kv-client. LogRegionDetails bool `toml:"log-region-details" json:"log-region-details"` }
PullerConfig represents config for puller
func NewDefaultPullerConfig ¶
func NewDefaultPullerConfig() *PullerConfig
NewDefaultPullerConfig return the default puller configuration
type PulsarCompressionType ¶
type PulsarCompressionType string
PulsarCompressionType is the compression type for pulsar
func (*PulsarCompressionType) Value ¶
func (p *PulsarCompressionType) Value() pulsar.CompressionType
Value returns the pulsar compression type
type PulsarConfig ¶
type PulsarConfig struct { TLSKeyFilePath *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"` TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"` TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` // PulsarProducerCacheSize is the size of the cache of pulsar producers PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` // PulsarVersion print the version of pulsar PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` // pulsar client compression CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` // AuthenticationToken the token for the Pulsar server AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` // Set the operation timeout (default: 30 seconds) // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the // operation will be marked as failed OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) // if batch messages are enabled. If set to a non zero value, messages will be queued until this time // interval or until BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. // Send and SendAsync returns an error after timeout. // default: 30s SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` // TokenFromFile Authentication from the file token, // the path name of the file (the third priority authentication method) TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` // BasicPassword with account BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` // AuthTLSPrivateKeyPath private key AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id // and 'type' always use 'client_credentials' OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` // OutputRawChangeEvent controls whether to split the update pk/uk events. OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` // BrokerURL is used to configure service brokerUrl for the Pulsar service. // This parameter is a part of the `sink-uri`. Internal use only. BrokerURL string `toml:"-" json:"-"` // SinkURI is the parsed sinkURI. Internal use only. SinkURI *url.URL `toml:"-" json:"-"` }
PulsarConfig pulsar sink configuration
func (*PulsarConfig) GetDefaultTopicName ¶
func (c *PulsarConfig) GetDefaultTopicName() string
GetDefaultTopicName get default topic name
func (*PulsarConfig) GetOutputRawChangeEvent ¶
func (c *PulsarConfig) GetOutputRawChangeEvent() bool
GetOutputRawChangeEvent returns the value of OutputRawChangeEvent
func (*PulsarConfig) MaskSensitiveData ¶
func (c *PulsarConfig) MaskSensitiveData()
MaskSensitiveData masks sensitive data in PulsarConfig
type ReplicaConfig ¶
type ReplicaConfig replicaConfig
ReplicaConfig represents some addition replication config for a changefeed
func GetDefaultReplicaConfig ¶
func GetDefaultReplicaConfig() *ReplicaConfig
GetDefaultReplicaConfig returns the default replica config.
func (*ReplicaConfig) Clone ¶
func (c *ReplicaConfig) Clone() *ReplicaConfig
Clone clones a replica config
func (*ReplicaConfig) FixMemoryQuota ¶
func (c *ReplicaConfig) FixMemoryQuota()
FixMemoryQuota adjusts memory quota to default value
func (*ReplicaConfig) FixScheduler ¶
func (c *ReplicaConfig) FixScheduler(inheritV66 bool)
FixScheduler adjusts scheduler to default value
func (*ReplicaConfig) Marshal ¶
func (c *ReplicaConfig) Marshal() (string, error)
Marshal returns the json marshal format of a ReplicationConfig
func (*ReplicaConfig) MaskSensitiveData ¶
func (c *ReplicaConfig) MaskSensitiveData()
MaskSensitiveData masks sensitive data in ReplicaConfig
func (*ReplicaConfig) Scan ¶
func (c *ReplicaConfig) Scan(value interface{}) error
Scan implements the sql.Scanner interface
func (*ReplicaConfig) UnmarshalJSON ¶
func (c *ReplicaConfig) UnmarshalJSON(data []byte) error
UnmarshalJSON unmarshals into *ReplicationConfig from json marshal byte slice
func (*ReplicaConfig) ValidateAndAdjust ¶
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error
ValidateAndAdjust verifies and adjusts the replica configuration.
type SchedulerConfig ¶
type SchedulerConfig struct { // HeartbeatTick is the number of owner tick to initial a heartbeat to captures. HeartbeatTick int `toml:"heartbeat-tick" json:"heartbeat-tick"` // CollectStatsTick is the number of owner tick to collect stats. CollectStatsTick int `toml:"collect-stats-tick" json:"collect-stats-tick"` // MaxTaskConcurrency the maximum of concurrent running schedule tasks. MaxTaskConcurrency int `toml:"max-task-concurrency" json:"max-task-concurrency"` // CheckBalanceInterval the interval of balance tables between each capture. CheckBalanceInterval TomlDuration `toml:"check-balance-interval" json:"check-balance-interval"` // AddTableBatchSize is the batch size of adding tables on each tick, // used by the `BasicScheduler`. // When the new owner in power, other captures may not online yet, there might have hundreds of // tables need to be dispatched, add tables in a batch way to prevent suddenly resource usage // spikes, also wait for other captures join the cluster // When there are only 2 captures, and a large number of tables, this can be helpful to prevent // oom caused by all tables dispatched to only one capture. AddTableBatchSize int `toml:"add-table-batch-size" json:"add-table-batch-size"` // ChangefeedSettings is setting by changefeed. ChangefeedSettings *ChangefeedSchedulerConfig `toml:"-" json:"-"` }
SchedulerConfig configs TiCDC scheduler.
func NewDefaultSchedulerConfig ¶
func NewDefaultSchedulerConfig() *SchedulerConfig
NewDefaultSchedulerConfig return the default scheduler configuration.
func (*SchedulerConfig) ValidateAndAdjust ¶
func (c *SchedulerConfig) ValidateAndAdjust() error
ValidateAndAdjust verifies that each parameter is valid.
type ServerConfig ¶
type ServerConfig struct { Addr string `toml:"addr" json:"addr"` AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` LogFile string `toml:"log-file" json:"log-file"` LogLevel string `toml:"log-level" json:"log-level"` Log *LogConfig `toml:"log" json:"log"` DataDir string `toml:"data-dir" json:"data-dir"` GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"` TZ string `toml:"tz" json:"tz"` CaptureSessionTTL int `toml:"capture-session-ttl" json:"capture-session-ttl"` OwnerFlushInterval TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"` ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"` Sorter *SorterConfig `toml:"sorter" json:"sorter"` Security *security.Credential `toml:"security" json:"security"` KVClient *KVClientConfig `toml:"kv-client" json:"kv-client"` Debug *DebugConfig `toml:"debug" json:"debug"` ClusterID string `toml:"cluster-id" json:"cluster-id"` GcTunerMemoryThreshold uint64 `toml:"gc-tuner-memory-threshold" json:"gc-tuner-memory-threshold"` // Deprecated: we don't use this field anymore. PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"` // Deprecated: we don't use this field anymore. MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` }
ServerConfig represents a config for server
func GetDefaultServerConfig ¶
func GetDefaultServerConfig() *ServerConfig
GetDefaultServerConfig returns the default server config
func GetGlobalServerConfig ¶
func GetGlobalServerConfig() *ServerConfig
GetGlobalServerConfig returns the global configuration for this server. It should store configuration from command line and configuration file. Other parts of the system can read the global configuration use this function.
func (*ServerConfig) Clone ¶
func (c *ServerConfig) Clone() *ServerConfig
Clone clones a replication
func (*ServerConfig) Marshal ¶
func (c *ServerConfig) Marshal() (string, error)
Marshal returns the json marshal format of a ServerConfig
func (*ServerConfig) String ¶
func (c *ServerConfig) String() string
String implements the Stringer interface
func (*ServerConfig) Unmarshal ¶
func (c *ServerConfig) Unmarshal(data []byte) error
Unmarshal unmarshals into *ServerConfig from json marshal byte slice
func (*ServerConfig) ValidateAndAdjust ¶
func (c *ServerConfig) ValidateAndAdjust() error
ValidateAndAdjust validates and adjusts the server configuration
type SinkConfig ¶
type SinkConfig struct { TxnAtomicity *AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity,omitempty"` // Protocol is NOT available when the downstream is DB. Protocol *string `toml:"protocol" json:"protocol,omitempty"` // DispatchRules is only available when the downstream is MQ. DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` // SchemaRegistry is only available when the downstream is MQ using avro protocol. SchemaRegistry *string `toml:"schema-registry" json:"schema-registry,omitempty"` // EncoderConcurrency is only available when the downstream is MQ. EncoderConcurrency *int `toml:"encoder-concurrency" json:"encoder-concurrency,omitempty"` // Terminator is NOT available when the downstream is DB. Terminator *string `toml:"terminator" json:"terminator,omitempty"` // DateSeparator is only available when the downstream is Storage. DateSeparator *string `toml:"date-separator" json:"date-separator,omitempty"` // EnablePartitionSeparator is only available when the downstream is Storage. EnablePartitionSeparator *bool `toml:"enable-partition-separator" json:"enable-partition-separator,omitempty"` // FileIndexWidth is only available when the downstream is Storage FileIndexWidth *int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"` // EnableKafkaSinkV2 enabled then the kafka-go sink will be used. // It is only available when the downstream is MQ. EnableKafkaSinkV2 *bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2,omitempty"` // OnlyOutputUpdatedColumns is only available when the downstream is MQ. OnlyOutputUpdatedColumns *bool `toml:"only-output-updated-columns" json:"only-output-updated-columns,omitempty"` // DeleteOnlyOutputHandleKeyColumns is only available when the downstream is MQ. DeleteOnlyOutputHandleKeyColumns *bool `toml:"delete-only-output-handle-key-columns" json:"delete-only-output-handle-key-columns,omitempty"` // ContentCompatible is only available when the downstream is MQ. ContentCompatible *bool `toml:"content-compatible" json:"content-compatible,omitempty"` // TiDBSourceID is the source ID of the upstream TiDB, // which is used to set the `tidb_cdc_write_source` session variable. // Note: This field is only used internally and only used in the MySQL sink. TiDBSourceID uint64 `toml:"-" json:"-"` // SafeMode is only available when the downstream is DB. SafeMode *bool `toml:"safe-mode" json:"safe-mode,omitempty"` KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` PulsarConfig *PulsarConfig `toml:"pulsar-config" json:"pulsar-config,omitempty"` MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"` CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` // AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been // advanced for this given duration, the sink will be canceled and re-established. // Deprecated since v8.1.1 AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"` // Simple Protocol only config, use to control the behavior of sending bootstrap message. // Note: When one of the following conditions is set to negative value, // bootstrap sending function will be disabled. // SendBootstrapIntervalInSec is the interval in seconds to send bootstrap message. SendBootstrapIntervalInSec *int64 `toml:"send-bootstrap-interval-in-sec" json:"send-bootstrap-interval-in-sec,omitempty"` // SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages. SendBootstrapInMsgCount *int32 `toml:"send-bootstrap-in-msg-count" json:"send-bootstrap-in-msg-count,omitempty"` // SendBootstrapToAllPartition determines whether to send bootstrap message to all partitions. // If set to false, bootstrap message will only be sent to the first partition of each topic. // Default value is true. SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"` // SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start. SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"` // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"` // CSVConfig is only available when the downstream is Storage. CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` // OpenProtocol related configurations OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"` // DebeziumConfig related configurations Debezium *DebeziumConfig `toml:"debezium" json:"debezium,omitempty"` }
SinkConfig represents sink config for a changefeed
func (*SinkConfig) CheckCompatibilityWithSinkURI ¶
func (s *SinkConfig) CheckCompatibilityWithSinkURI( oldSinkConfig *SinkConfig, sinkURIStr string, ) error
CheckCompatibilityWithSinkURI check whether the sinkURI is compatible with the sink config.
func (*SinkConfig) MaskSensitiveData ¶
func (s *SinkConfig) MaskSensitiveData()
MaskSensitiveData masks sensitive data in SinkConfig
func (*SinkConfig) ShouldSendAllBootstrapAtStart ¶
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool
ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (*SinkConfig) ShouldSendBootstrapMsg ¶
func (s *SinkConfig) ShouldSendBootstrapMsg() bool
ShouldSendBootstrapMsg returns whether the sink should send bootstrap message. Only enable bootstrap sending function for simple protocol and when both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count are > 0
func (*SinkConfig) ValidateProtocol ¶
func (s *SinkConfig) ValidateProtocol(scheme string) error
ValidateProtocol validates the protocol configuration.
type SorterConfig ¶
type SorterConfig struct { // the directory used to store the temporary files generated by the sorter SortDir string `toml:"sort-dir" json:"sort-dir"` // Cache size of sorter in MB. CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"` // Deprecated: we don't use this field anymore. MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` // Deprecated: we don't use this field anymore. MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` // Deprecated: we don't use this field anymore. NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"` // Deprecated: we don't use this field anymore . NumConcurrentWorker int `toml:"num-concurrent-worker" json:"num-concurrent-worker"` // Deprecated: we don't use this field anymore. ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"` }
SorterConfig represents sorter config for a changefeed
func (*SorterConfig) ValidateAndAdjust ¶
func (c *SorterConfig) ValidateAndAdjust() error
ValidateAndAdjust validates and adjusts the sorter configuration
type SyncedStatusConfig ¶
type SyncedStatusConfig struct { // The minimum interval between the latest synced ts and now required to reach synced state SyncedCheckInterval int64 `toml:"synced-check-interval" json:"synced-check-interval"` // The maximum interval between latest checkpoint ts and now or // between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state CheckpointInterval int64 `toml:"checkpoint-interval" json:"checkpoint-interval"` }
SyncedStatusConfig represents synced check interval config for a changefeed
type TomlDuration ¶
TomlDuration is a duration with a custom json decoder and toml decoder
func (*TomlDuration) UnmarshalJSON ¶
func (d *TomlDuration) UnmarshalJSON(b []byte) error
UnmarshalJSON is the json decoder
func (*TomlDuration) UnmarshalText ¶
func (d *TomlDuration) UnmarshalText(text []byte) error
UnmarshalText is the toml decoder