config

package
v0.0.0-...-3f1ab7e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2024 License: Apache-2.0 Imports: 29 Imported by: 28

Documentation

Index

Constants

View Source
const (
	// LargeMessageHandleOptionNone means not handling large message.
	LargeMessageHandleOptionNone string = "none"
	// LargeMessageHandleOptionClaimCheck means handling large message by sending to the claim check storage.
	LargeMessageHandleOptionClaimCheck string = "claim-check"
	// LargeMessageHandleOptionHandleKeyOnly means handling large message by sending only handle key columns.
	LargeMessageHandleOptionHandleKeyOnly string = "handle-key-only"
)
View Source
const (

	// DefaultSortDir is the default value of sort-dir, it will be a subordinate directory of data-dir.
	DefaultSortDir = "/tmp/sorter"

	// DefaultRedoDir is a subordinate directory path of data-dir.
	DefaultRedoDir = "/tmp/redo"

	// DebugConfigurationItem is the name of debug configurations
	DebugConfigurationItem = "debug"

	// DefaultChangefeedMemoryQuota is the default memory quota for each changefeed.
	DefaultChangefeedMemoryQuota = 1024 * 1024 * 1024 // 1GB.

	// DisableMemoryLimit is the default max memory percentage for TiCDC server.
	// 0 means no memory limit.
	DisableMemoryLimit = 0

	// EnablePDForwarding is the value of whether to enable PD client forwarding function.
	// The PD client will forward the requests throughthe follower
	// If there is a network partition problem between TiCDC and PD leader.
	EnablePDForwarding = true
)
View Source
const (
	// DefaultMaxMessageBytes sets the default value for max-message-bytes.
	DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M
	// DefaultAdvanceTimeoutInSec sets the default value for advance-timeout-in-sec.
	DefaultAdvanceTimeoutInSec = uint(150)

	// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
	TxnAtomicityKey = "transaction-atomicity"

	// Comma is a constant for ','
	Comma = ","
	// CR is an abbreviation for carriage return
	CR = '\r'
	// LF is an abbreviation for line feed
	LF = '\n'
	// CRLF is an abbreviation for '\r\n'
	CRLF = "\r\n"
	// DoubleQuoteChar is a constant for '"'
	DoubleQuoteChar = '"'
	// Backslash is a constant for '\'
	Backslash = '\\'
	// NULL is a constant for '\N'
	NULL = "\\N"

	// MinFileIndexWidth is the minimum width of file index.
	MinFileIndexWidth = 6 // enough for 2^19 files
	// MaxFileIndexWidth is the maximum width of file index.
	MaxFileIndexWidth = 20 // enough for 2^64 files
	// DefaultFileIndexWidth is the default width of file index.
	DefaultFileIndexWidth = MaxFileIndexWidth

	// BinaryEncodingHex encodes binary data to hex string.
	BinaryEncodingHex = "hex"
	// BinaryEncodingBase64 encodes binary data to base64 string.
	BinaryEncodingBase64 = "base64"

	// DefaultPulsarProducerCacheSize is the default size of the cache for producers
	// 10240 producers maybe cost 1.1G memory
	DefaultPulsarProducerCacheSize = 10240

	// DefaultEncoderGroupConcurrency is the default concurrency of encoder group.
	DefaultEncoderGroupConcurrency = 32

	// DefaultSendBootstrapIntervalInSec is the default interval to send bootstrap message.
	DefaultSendBootstrapIntervalInSec = int64(120)
	// DefaultSendBootstrapInMsgCount is the default number of messages to send bootstrap message.
	DefaultSendBootstrapInMsgCount = int32(10000)
	// DefaultSendBootstrapToAllPartition is the default value of
	// whether to send bootstrap message to all partitions.
	DefaultSendBootstrapToAllPartition = true
	// DefaultSendAllBootstrapAtStart is the default value of whether
	// to send all tables bootstrap message at changefeed start.
	DefaultSendAllBootstrapAtStart = false

	// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
	// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
	// Based on test, when the max reconnect times is 3,
	// the total time of reconnecting to brokers is about 30 seconds.
	DefaultMaxReconnectToPulsarBroker = 3
)
View Source
const (

	// DefaultTiDBSourceID is the default source ID of TiDB cluster.
	DefaultTiDBSourceID = 1
)
View Source
const (
	// ProtocolKey specifies the key of the protocol in the SinkURI.
	ProtocolKey = "protocol"
)

Variables

View Source
var (

	// ReservedClusterIDs contains a list of reserved cluster id,
	// these words are the part of old cdc etcd key prefix
	// like: /tidb/cdc/owner
	ReservedClusterIDs = []string{
		"owner", "capture", "task",
		"changefeed", "job", "meta",
	}
)

Functions

func StoreGlobalServerConfig

func StoreGlobalServerConfig(config *ServerConfig)

StoreGlobalServerConfig stores a new config to the globalServerConfig. It mostly uses in the test to avoid some data races.

Types

type AtomicityLevel

type AtomicityLevel string

AtomicityLevel represents the atomicity level of a changefeed.

func (AtomicityLevel) ShouldSplitTxn

func (l AtomicityLevel) ShouldSplitTxn() bool

ShouldSplitTxn returns whether the sink should split txn.

type CDCV2

type CDCV2 struct {
	// Enable represents if the cdc v2 is enabled or not
	Enable bool `toml:"enable" json:"enable"`
	// MetaStoreConfig  represents config for new meta store configurations
	MetaStoreConfig MetaStoreConfiguration `toml:"meta-store" json:"meta-store"`
}

CDCV2 represents config for ticdc v2

func (*CDCV2) ValidateAndAdjust

func (c *CDCV2) ValidateAndAdjust() error

ValidateAndAdjust validates the meta store configurations

type CSVConfig

type CSVConfig struct {
	// delimiter between fields, it can be 1 character or at most 2 characters
	// It can not be CR or LF or contains CR or LF.
	// It should have exclusive characters with quote.
	Delimiter string `toml:"delimiter" json:"delimiter"`
	// quoting character
	Quote string `toml:"quote" json:"quote"`
	// representation of null values
	NullString string `toml:"null" json:"null"`
	// whether to include commit ts
	IncludeCommitTs bool `toml:"include-commit-ts" json:"include-commit-ts"`
	// encoding method of binary type
	BinaryEncodingMethod string `toml:"binary-encoding-method" json:"binary-encoding-method"`
	// output old value
	OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
	// output handle key
	OutputHandleKey bool `toml:"output-handle-key" json:"output-handle-key"`
}

CSVConfig defines a series of configuration items for csv codec.

type ChangefeedSchedulerConfig

type ChangefeedSchedulerConfig struct {
	// EnableTableAcrossNodes set true to split one table to multiple spans and
	// distribute to multiple TiCDC nodes.
	EnableTableAcrossNodes bool `toml:"enable-table-across-nodes" json:"enable-table-across-nodes"`
	// RegionThreshold is the region count threshold of splitting a table.
	RegionThreshold int `toml:"region-threshold" json:"region-threshold"`
	// WriteKeyThreshold is the written keys threshold of splitting a table.
	WriteKeyThreshold int `toml:"write-key-threshold" json:"write-key-threshold"`
	// Deprecated.
	RegionPerSpan int `toml:"region-per-span" json:"region-per-span"`
}

ChangefeedSchedulerConfig is per changefeed scheduler settings.

func (*ChangefeedSchedulerConfig) Validate

func (c *ChangefeedSchedulerConfig) Validate() error

Validate validates the config.

type CloudStorageConfig

type CloudStorageConfig struct {
	WorkerCount   *int    `toml:"worker-count" json:"worker-count,omitempty"`
	FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"`
	FileSize      *int    `toml:"file-size" json:"file-size,omitempty"`

	OutputColumnID      *bool   `toml:"output-column-id" json:"output-column-id,omitempty"`
	FileExpirationDays  *int    `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
	FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
	FlushConcurrency    *int    `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`

	// OutputRawChangeEvent controls whether to split the update pk/uk events.
	OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
}

CloudStorageConfig represents a cloud storage sink configuration

func (*CloudStorageConfig) GetOutputRawChangeEvent

func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool

GetOutputRawChangeEvent returns the value of OutputRawChangeEvent

type CodecConfig

type CodecConfig struct {
	EnableTiDBExtension            *bool   `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"`
	MaxBatchSize                   *int    `toml:"max-batch-size" json:"max-batch-size,omitempty"`
	AvroEnableWatermark            *bool   `toml:"avro-enable-watermark" json:"avro-enable-watermark"`
	AvroDecimalHandlingMode        *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"`
	AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"`
	EncodingFormat                 *string `toml:"encoding-format" json:"encoding-format,omitempty"`
}

CodecConfig represents a MQ codec configuration

type ColumnSelector

type ColumnSelector struct {
	Matcher []string `toml:"matcher" json:"matcher"`
	Columns []string `toml:"columns" json:"columns"`
}

ColumnSelector represents a column selector for a table.

type ConsistentConfig

type ConsistentConfig struct {
	// Level is the consistency level, it can be `none` or `eventual`.
	// `eventual` means enable redo log.
	// Default is `none`.
	Level string `toml:"level" json:"level"`
	// MaxLogSize is the max size(MiB) of a log file written by redo log.
	// Default is 64MiB.
	MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
	// FlushIntervalInMs is the flush interval(ms) of redo log to flush log to storage.
	// Default is 2000ms.
	FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
	// MetaFlushIntervalInMs is the flush interval(ms) of redo log to
	// flush meta(resolvedTs and checkpointTs) to storage.
	// Default is 200ms.
	MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
	// EncodingWorkerNum is the number of workers to encode `RowChangeEvent“ to redo log.
	// Default is 16.
	EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"`
	// FlushWorkerNum is the number of workers to flush redo log to storage.
	// Default is 8.
	FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"`
	// Storage is the storage path(uri) to store redo log.
	Storage string `toml:"storage" json:"storage"`
	// UseFileBackend is a flag to enable file backend for redo log.
	// file backend means before flush redo log to storage, it will be written to local file.
	// Default is false.
	UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
	// Compression is the compression algorithm used for redo log.
	// Default is "", it means no compression, equals to `none`.
	// Supported compression algorithms are `none` and `lz4`.
	Compression string `toml:"compression" json:"compression"`
	// FlushConcurrency is the concurrency of flushing a single log file.
	// Default is 1. It means a single log file will be flushed by only one worker.
	// The singe file concurrent flushing feature supports only `s3` storage.
	FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
	// MemoryUsage represents the percentage of ReplicaConfig.MemoryQuota
	// that can be utilized by the redo log module.
	MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"`
}

ConsistentConfig represents replication consistency config for a changefeed. It is used by redo log functionality.

func (*ConsistentConfig) MaskSensitiveData

func (c *ConsistentConfig) MaskSensitiveData()

MaskSensitiveData masks sensitive data in ConsistentConfig

func (*ConsistentConfig) ValidateAndAdjust

func (c *ConsistentConfig) ValidateAndAdjust() error

ValidateAndAdjust validates the consistency config and adjusts it if necessary.

type ConsistentMemoryUsage

type ConsistentMemoryUsage struct {
	// ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events.
	MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"`
}

ConsistentMemoryUsage represents memory usage of Consistent module.

type DBConfig

type DBConfig struct {
	// Count is the number of db count.
	//
	// The default value is 8.
	Count int `toml:"count" json:"count"`
	// MaxOpenFiles is the maximum number of open FD by db sorter.
	//
	// The default value is 10000.
	MaxOpenFiles int `toml:"max-open-files" json:"max-open-files"`
	// BlockSize the block size of db sorter.
	//
	// The default value is 65536, 64KB.
	BlockSize int `toml:"block-size" json:"block-size"`
	// WriterBufferSize is the size of memory table of db.
	//
	// The default value is 8388608, 8MB.
	WriterBufferSize int `toml:"writer-buffer-size" json:"writer-buffer-size"`
	// Compression is the compression algorithm that is used by db.
	// Valid values are "none" or "snappy".
	//
	// The default value is "snappy".
	Compression string `toml:"compression" json:"compression"`
	// WriteL0PauseTrigger defines number of db sst file at level-0 that will
	// pause write.
	//
	// The default value is 1<<31 - 1.
	WriteL0PauseTrigger int `toml:"write-l0-pause-trigger" json:"write-l0-pause-trigger"`

	// CompactionL0Trigger defines number of db sst file at level-0 that will
	// trigger compaction.
	//
	// The default value is 16, which is based on a performance test on 4K tables.
	CompactionL0Trigger int `toml:"compaction-l0-trigger" json:"compaction-l0-trigger"`
}

DBConfig represents db sorter config.

func NewDefaultDBConfig

func NewDefaultDBConfig() *DBConfig

NewDefaultDBConfig return the default db configuration

func (*DBConfig) ValidateAndAdjust

func (c *DBConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the db configuration

type DateSeparator

type DateSeparator int

DateSeparator specifies the date separator in storage destination path

const (
	DateSeparatorNone DateSeparator = iota
	DateSeparatorYear
	DateSeparatorMonth
	DateSeparatorDay
)

Enum types of DateSeparator

func (*DateSeparator) FromString

func (d *DateSeparator) FromString(separator string) error

FromString converts the separator from string to DateSeperator enum type.

func (DateSeparator) GetPattern

func (d DateSeparator) GetPattern() string

GetPattern returns the pattern of the date separator.

func (DateSeparator) String

func (d DateSeparator) String() string

type DebeziumConfig

type DebeziumConfig struct {
	OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
}

DebeziumConfig represents the configurations for debezium protocol encoding

type DebugConfig

type DebugConfig struct {
	DB *DBConfig `toml:"db" json:"db"`

	Messages *MessagesConfig `toml:"messages" json:"messages"`

	// Scheduler is the configuration of the two-phase scheduler.
	Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`

	// CDCV2 enables ticdc version 2 implementation with new metastore
	CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"`

	// Puller is the configuration of the puller.
	Puller *PullerConfig `toml:"puller" json:"puller"`
}

DebugConfig represents config for ticdc unexposed feature configurations

func (*DebugConfig) ValidateAndAdjust

func (c *DebugConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the debug configuration

type DispatchRule

type DispatchRule struct {
	Matcher []string `toml:"matcher" json:"matcher"`
	// Deprecated, please use PartitionRule.
	DispatcherRule string `toml:"dispatcher" json:"dispatcher"`
	// PartitionRule is an alias added for DispatcherRule to mitigate confusions.
	// In the future release, the DispatcherRule is expected to be removed .
	PartitionRule string `toml:"partition" json:"partition"`

	// IndexName is set when using index-value dispatcher with specified index.
	IndexName string `toml:"index" json:"index"`

	// Columns are set when using columns dispatcher.
	Columns []string `toml:"columns" json:"columns"`

	TopicRule string `toml:"topic" json:"topic"`
}

DispatchRule represents partition rule for a table.

type Duration

type Duration struct {
	time.Duration
}

Duration wrap time.Duration to override UnmarshalText func

func (*Duration) UnmarshalText

func (d *Duration) UnmarshalText(text []byte) error

UnmarshalText unmarshal byte to duration

type EventFilterRule

type EventFilterRule struct {
	Matcher     []string       `toml:"matcher" json:"matcher"`
	IgnoreEvent []bf.EventType `toml:"ignore-event" json:"ignore-event"`
	// regular expression
	IgnoreSQL []string `toml:"ignore-sql" json:"ignore-sql"`
	// sql expression
	IgnoreInsertValueExpr    string `toml:"ignore-insert-value-expr" json:"ignore-insert-value-expr"`
	IgnoreUpdateNewValueExpr string `toml:"ignore-update-new-value-expr" json:"ignore-update-new-value-expr"`
	IgnoreUpdateOldValueExpr string `toml:"ignore-update-old-value-expr" json:"ignore-update-old-value-expr"`
	IgnoreDeleteValueExpr    string `toml:"ignore-delete-value-expr" json:"ignore-delete-value-expr"`
}

EventFilterRule is used by sql event filter and expression filter

type FilterConfig

type FilterConfig struct {
	Rules            []string           `toml:"rules" json:"rules"`
	IgnoreTxnStartTs []uint64           `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"`
	EventFilters     []*EventFilterRule `toml:"event-filters" json:"event-filters"`
}

FilterConfig represents filter config for a changefeed

type GlueSchemaRegistryConfig

type GlueSchemaRegistryConfig struct {
	// Name of the schema registry
	RegistryName string `toml:"registry-name" json:"registry-name"`
	// Region of the schema registry
	Region string `toml:"region" json:"region"`
	// AccessKey of the schema registry
	AccessKey string `toml:"access-key" json:"access-key,omitempty"`
	// SecretAccessKey of the schema registry
	SecretAccessKey string `toml:"secret-access-key" json:"secret-access-key,omitempty"`
	Token           string `toml:"token" json:"token,omitempty"`
}

GlueSchemaRegistryConfig represents a Glue Schema Registry configuration

func (*GlueSchemaRegistryConfig) NoCredentials

func (g *GlueSchemaRegistryConfig) NoCredentials() bool

NoCredentials returns true if no credentials are set.

func (*GlueSchemaRegistryConfig) Validate

func (g *GlueSchemaRegistryConfig) Validate() error

Validate the GlueSchemaRegistryConfig.

type KVClientConfig

type KVClientConfig struct {
	EnableMultiplexing bool `toml:"enable-multiplexing" json:"enable-multiplexing"`
	// how many workers will be used for a single region worker
	WorkerConcurrent uint `toml:"worker-concurrent" json:"worker-concurrent"`
	// how many grpc streams will be established to every TiKV node
	GrpcStreamConcurrent uint `toml:"grpc-stream-concurrent" json:"grpc-stream-concurrent"`
	// Advance table ResolvedTs interval.
	AdvanceIntervalInMs uint `toml:"advance-interval-in-ms" json:"advance-interval-in-ms"`
	// how many goroutines to maintain frontiers.
	FrontierConcurrent uint `toml:"frontier-concurrent" json:"frontier-concurrent"`
	// background workerpool size, the workrpool is shared by all goroutines in cdc server
	WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
	// region incremental scan limit for one table in a single store
	RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
	// the total retry duration of connecting a region
	RegionRetryDuration TomlDuration `toml:"region-retry-duration" json:"region-retry-duration"`
}

KVClientConfig represents config for kv client

func NewDefaultKVClientConfig

func NewDefaultKVClientConfig() *KVClientConfig

NewDefaultKVClientConfig return the default kv client configuration

func (*KVClientConfig) ValidateAndAdjust

func (c *KVClientConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the kv client configuration

type KafkaConfig

type KafkaConfig struct {
	PartitionNum                 *int32                    `toml:"partition-num" json:"partition-num,omitempty"`
	ReplicationFactor            *int16                    `toml:"replication-factor" json:"replication-factor,omitempty"`
	KafkaVersion                 *string                   `toml:"kafka-version" json:"kafka-version,omitempty"`
	MaxMessageBytes              *int                      `toml:"max-message-bytes" json:"max-message-bytes,omitempty"`
	Compression                  *string                   `toml:"compression" json:"compression,omitempty"`
	KafkaClientID                *string                   `toml:"kafka-client-id" json:"kafka-client-id,omitempty"`
	AutoCreateTopic              *bool                     `toml:"auto-create-topic" json:"auto-create-topic,omitempty"`
	DialTimeout                  *string                   `toml:"dial-timeout" json:"dial-timeout,omitempty"`
	WriteTimeout                 *string                   `toml:"write-timeout" json:"write-timeout,omitempty"`
	ReadTimeout                  *string                   `toml:"read-timeout" json:"read-timeout,omitempty"`
	RequiredAcks                 *int                      `toml:"required-acks" json:"required-acks,omitempty"`
	SASLUser                     *string                   `toml:"sasl-user" json:"sasl-user,omitempty"`
	SASLPassword                 *string                   `toml:"sasl-password" json:"sasl-password,omitempty"`
	SASLMechanism                *string                   `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"`
	SASLGssAPIAuthType           *string                   `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"`
	SASLGssAPIKeytabPath         *string                   `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"`
	SASLGssAPIKerberosConfigPath *string                   `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"`
	SASLGssAPIServiceName        *string                   `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"`
	SASLGssAPIUser               *string                   `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"`
	SASLGssAPIPassword           *string                   `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"`
	SASLGssAPIRealm              *string                   `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"`
	SASLGssAPIDisablePafxfast    *bool                     `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"`
	SASLOAuthClientID            *string                   `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"`
	SASLOAuthClientSecret        *string                   `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"`
	SASLOAuthTokenURL            *string                   `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"`
	SASLOAuthScopes              []string                  `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"`
	SASLOAuthGrantType           *string                   `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"`
	SASLOAuthAudience            *string                   `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"`
	EnableTLS                    *bool                     `toml:"enable-tls" json:"enable-tls,omitempty"`
	CA                           *string                   `toml:"ca" json:"ca,omitempty"`
	Cert                         *string                   `toml:"cert" json:"cert,omitempty"`
	Key                          *string                   `toml:"key" json:"key,omitempty"`
	InsecureSkipVerify           *bool                     `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"`
	CodecConfig                  *CodecConfig              `toml:"codec-config" json:"codec-config,omitempty"`
	LargeMessageHandle           *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"`
	GlueSchemaRegistryConfig     *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"`

	// OutputRawChangeEvent controls whether to split the update pk/uk events.
	OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
}

KafkaConfig represents a kafka sink configuration

func (*KafkaConfig) GetOutputRawChangeEvent

func (k *KafkaConfig) GetOutputRawChangeEvent() bool

GetOutputRawChangeEvent returns the value of OutputRawChangeEvent

func (*KafkaConfig) MaskSensitiveData

func (k *KafkaConfig) MaskSensitiveData()

MaskSensitiveData masks sensitive data in KafkaConfig

type LargeMessageHandleConfig

type LargeMessageHandleConfig struct {
	LargeMessageHandleOption      string `toml:"large-message-handle-option" json:"large-message-handle-option"`
	LargeMessageHandleCompression string `toml:"large-message-handle-compression" json:"large-message-handle-compression"`
	ClaimCheckStorageURI          string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"`
	ClaimCheckRawValue            bool   `toml:"claim-check-raw-value" json:"claim-check-raw-value"`
}

LargeMessageHandleConfig is the configuration for handling large message.

func NewDefaultLargeMessageHandleConfig

func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig

NewDefaultLargeMessageHandleConfig return the default Config.

func (*LargeMessageHandleConfig) AdjustAndValidate

func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error

AdjustAndValidate the Config.

func (*LargeMessageHandleConfig) Disabled

func (c *LargeMessageHandleConfig) Disabled() bool

Disabled returns true if disable large message handle.

func (*LargeMessageHandleConfig) EnableClaimCheck

func (c *LargeMessageHandleConfig) EnableClaimCheck() bool

EnableClaimCheck returns true if enable claim check.

func (*LargeMessageHandleConfig) HandleKeyOnly

func (c *LargeMessageHandleConfig) HandleKeyOnly() bool

HandleKeyOnly returns true if handle large message by encoding handle key only.

type LogConfig

type LogConfig struct {
	File              *LogFileConfig `toml:"file" json:"file"`
	InternalErrOutput string         `toml:"error-output" json:"error-output"`
}

LogConfig represents log config for server

type LogFileConfig

type LogFileConfig struct {
	MaxSize    int `toml:"max-size" json:"max-size"`
	MaxDays    int `toml:"max-days" json:"max-days"`
	MaxBackups int `toml:"max-backups" json:"max-backups"`
}

LogFileConfig represents log file config for server

type MessagesConfig

type MessagesConfig struct {
	ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"`
	ClientMaxBatchSize     int          `toml:"client-max-batch-size" json:"client-max-batch-size"`
	ClientMaxBatchCount    int          `toml:"client-max-batch-count" json:"client-max-batch-count"`
	ClientRetryRateLimit   float64      `toml:"client-retry-rate-limit" json:"client-retry-rate-limit"`

	ServerMaxPendingMessageCount int          `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"`
	ServerAckInterval            TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"`
	ServerWorkerPoolSize         int          `toml:"server-worker-pool-size" json:"server-worker-pool-size"`

	// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
	MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"`

	// After a duration of this time if the server doesn't see any activity it
	// pings the client to see if the transport is still alive.
	KeepAliveTime TomlDuration `toml:"keep-alive-time" json:"keep-alive-time"`
	// After having pinged for keepalive check, the server waits for a duration
	// of Timeout and if no activity is seen even after that the connection is
	// closed.
	KeepAliveTimeout TomlDuration `toml:"keep-alive-timeout" json:"keep-alive-timeout"`
}

MessagesConfig configs MessageServer and MessageClient.

func (*MessagesConfig) Clone

func (c *MessagesConfig) Clone() *MessagesConfig

Clone returns a deep copy of the configuration.

func (*MessagesConfig) ToMessageClientConfig

func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig

ToMessageClientConfig converts the MessagesConfig to a MessageClientConfig.

func (*MessagesConfig) ToMessageServerConfig

func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig

ToMessageServerConfig returns a MessageServerConfig that can be used to create a MessageServer.

func (*MessagesConfig) ValidateAndAdjust

func (c *MessagesConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the configs.

type MetaStoreConfiguration

type MetaStoreConfiguration struct {
	// URI is the address of the meta store.
	// for example:  "mysql://127.0.0.1:3306/test"
	URI string `toml:"uri" json:"uri"`
	// SSLCA is the path of the CA certificate file.
	SSLCa   string `toml:"ssl-ca" json:"ssl-ca"`
	SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
	SSLKey  string `toml:"ssl-key" json:"ssl-key"`
}

MetaStoreConfiguration represents config for new meta store configurations

func (*MetaStoreConfiguration) GenDSN

func (cfg *MetaStoreConfiguration) GenDSN() (*dmysql.Config, error)

GenDSN generates a DSN from the given metastore config.

type MounterConfig

type MounterConfig struct {
	WorkerNum int `toml:"worker-num" json:"worker-num"`
}

MounterConfig represents mounter config for a changefeed

type MySQLConfig

type MySQLConfig struct {
	WorkerCount                  *int    `toml:"worker-count" json:"worker-count,omitempty"`
	MaxTxnRow                    *int    `toml:"max-txn-row" json:"max-txn-row,omitempty"`
	MaxMultiUpdateRowSize        *int    `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"`
	MaxMultiUpdateRowCount       *int    `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"`
	TiDBTxnMode                  *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"`
	SSLCa                        *string `toml:"ssl-ca" json:"ssl-ca,omitempty"`
	SSLCert                      *string `toml:"ssl-cert" json:"ssl-cert,omitempty"`
	SSLKey                       *string `toml:"ssl-key" json:"ssl-key,omitempty"`
	TimeZone                     *string `toml:"time-zone" json:"time-zone,omitempty"`
	WriteTimeout                 *string `toml:"write-timeout" json:"write-timeout,omitempty"`
	ReadTimeout                  *string `toml:"read-timeout" json:"read-timeout,omitempty"`
	Timeout                      *string `toml:"timeout" json:"timeout,omitempty"`
	EnableBatchDML               *bool   `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"`
	EnableMultiStatement         *bool   `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"`
	EnableCachePreparedStatement *bool   `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"`
}

MySQLConfig represents a MySQL sink configuration

type OAuth2

type OAuth2 struct {
	// OAuth2IssuerURL  the URL of the authorization server.
	OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"`
	// OAuth2Audience  the URL of the resource server.
	OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"`
	// OAuth2PrivateKey the private key used to sign the server.
	OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"`
	// OAuth2ClientID  the client ID of the application.
	OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"`
	// OAuth2Scope scope
	OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"`
}

OAuth2 is the configuration for OAuth2

type OpenProtocolConfig

type OpenProtocolConfig struct {
	OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
}

OpenProtocolConfig represents the configurations for open protocol encoding

type Protocol

type Protocol int

Protocol is the protocol of the message.

const (
	ProtocolUnknown Protocol = iota
	ProtocolDefault
	ProtocolCanal
	ProtocolAvro
	ProtocolMaxwell
	ProtocolCanalJSON
	ProtocolCraft
	ProtocolOpen
	ProtocolCsv
	ProtocolDebezium
	ProtocolSimple
)

Enum types of the Protocol.

func ParseSinkProtocolFromString

func ParseSinkProtocolFromString(protocol string) (Protocol, error)

ParseSinkProtocolFromString converts the protocol from string to Protocol enum type.

func (Protocol) IsBatchEncode

func (p Protocol) IsBatchEncode() bool

IsBatchEncode returns whether the protocol is a batch encoder.

func (Protocol) String

func (p Protocol) String() string

String converts the Protocol enum type string to string.

type PullerConfig

type PullerConfig struct {
	// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
	EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
	// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
	ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
	// LogRegionDetails determines whether logs Region details or not in puller and kv-client.
	LogRegionDetails bool `toml:"log-region-details" json:"log-region-details"`
}

PullerConfig represents config for puller

func NewDefaultPullerConfig

func NewDefaultPullerConfig() *PullerConfig

NewDefaultPullerConfig return the default puller configuration

type PulsarCompressionType

type PulsarCompressionType string

PulsarCompressionType is the compression type for pulsar

func (*PulsarCompressionType) Value

Value returns the pulsar compression type

type PulsarConfig

type PulsarConfig struct {
	TLSKeyFilePath        *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"`
	TLSCertificateFile    *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"`
	TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"`

	// PulsarProducerCacheSize is the size of the cache of pulsar producers
	PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"`

	// PulsarVersion print the version of pulsar
	PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"`

	// pulsar client compression
	CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"`

	// AuthenticationToken the token for the Pulsar server
	AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"`

	// ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds)
	ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"`

	// Set the operation timeout (default: 30 seconds)
	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
	// operation will be marked as failed
	OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"`

	// BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000)
	BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"`

	// BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
	// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
	// interval or until
	BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"`

	// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
	// Send and SendAsync returns an error after timeout.
	// default: 30s
	SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"`

	// TokenFromFile Authentication from the file token,
	// the path name of the file (the third priority authentication method)
	TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"`

	// BasicUserName Account name for pulsar basic authentication (the second priority authentication method)
	BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"`
	// BasicPassword with account
	BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"`

	// AuthTLSCertificatePath  create new pulsar authentication provider with specified TLS certificate and private key
	AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"`
	// AuthTLSPrivateKeyPath private key
	AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"`

	// Oauth2 include  oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id
	// and 'type' always use 'client_credentials'
	OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"`

	// OutputRawChangeEvent controls whether to split the update pk/uk events.
	OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`

	// BrokerURL is used to configure service brokerUrl for the Pulsar service.
	// This parameter is a part of the `sink-uri`. Internal use only.
	BrokerURL string `toml:"-" json:"-"`
	// SinkURI is the parsed sinkURI. Internal use only.
	SinkURI *url.URL `toml:"-" json:"-"`
}

PulsarConfig pulsar sink configuration

func (*PulsarConfig) GetDefaultTopicName

func (c *PulsarConfig) GetDefaultTopicName() string

GetDefaultTopicName get default topic name

func (*PulsarConfig) GetOutputRawChangeEvent

func (c *PulsarConfig) GetOutputRawChangeEvent() bool

GetOutputRawChangeEvent returns the value of OutputRawChangeEvent

func (*PulsarConfig) MaskSensitiveData

func (c *PulsarConfig) MaskSensitiveData()

MaskSensitiveData masks sensitive data in PulsarConfig

type ReplicaConfig

type ReplicaConfig replicaConfig

ReplicaConfig represents some addition replication config for a changefeed

func GetDefaultReplicaConfig

func GetDefaultReplicaConfig() *ReplicaConfig

GetDefaultReplicaConfig returns the default replica config.

func (*ReplicaConfig) Clone

func (c *ReplicaConfig) Clone() *ReplicaConfig

Clone clones a replica config

func (*ReplicaConfig) FixMemoryQuota

func (c *ReplicaConfig) FixMemoryQuota()

FixMemoryQuota adjusts memory quota to default value

func (*ReplicaConfig) FixScheduler

func (c *ReplicaConfig) FixScheduler(inheritV66 bool)

FixScheduler adjusts scheduler to default value

func (*ReplicaConfig) Marshal

func (c *ReplicaConfig) Marshal() (string, error)

Marshal returns the json marshal format of a ReplicationConfig

func (*ReplicaConfig) MaskSensitiveData

func (c *ReplicaConfig) MaskSensitiveData()

MaskSensitiveData masks sensitive data in ReplicaConfig

func (*ReplicaConfig) Scan

func (c *ReplicaConfig) Scan(value interface{}) error

Scan implements the sql.Scanner interface

func (*ReplicaConfig) UnmarshalJSON

func (c *ReplicaConfig) UnmarshalJSON(data []byte) error

UnmarshalJSON unmarshals into *ReplicationConfig from json marshal byte slice

func (*ReplicaConfig) ValidateAndAdjust

func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error

ValidateAndAdjust verifies and adjusts the replica configuration.

func (ReplicaConfig) Value

func (c ReplicaConfig) Value() (driver.Value, error)

Value implements the driver.Valuer interface

type SchedulerConfig

type SchedulerConfig struct {
	// HeartbeatTick is the number of owner tick to initial a heartbeat to captures.
	HeartbeatTick int `toml:"heartbeat-tick" json:"heartbeat-tick"`
	// CollectStatsTick is the number of owner tick to collect stats.
	CollectStatsTick int `toml:"collect-stats-tick" json:"collect-stats-tick"`
	// MaxTaskConcurrency the maximum of concurrent running schedule tasks.
	MaxTaskConcurrency int `toml:"max-task-concurrency" json:"max-task-concurrency"`
	// CheckBalanceInterval the interval of balance tables between each capture.
	CheckBalanceInterval TomlDuration `toml:"check-balance-interval" json:"check-balance-interval"`
	// AddTableBatchSize is the batch size of adding tables on each tick,
	// used by the `BasicScheduler`.
	// When the new owner in power, other captures may not online yet, there might have hundreds of
	// tables need to be dispatched, add tables in a batch way to prevent suddenly resource usage
	// spikes, also wait for other captures join the cluster
	// When there are only 2 captures, and a large number of tables, this can be helpful to prevent
	// oom caused by all tables dispatched to only one capture.
	AddTableBatchSize int `toml:"add-table-batch-size" json:"add-table-batch-size"`

	// ChangefeedSettings is setting by changefeed.
	ChangefeedSettings *ChangefeedSchedulerConfig `toml:"-" json:"-"`
}

SchedulerConfig configs TiCDC scheduler.

func NewDefaultSchedulerConfig

func NewDefaultSchedulerConfig() *SchedulerConfig

NewDefaultSchedulerConfig return the default scheduler configuration.

func (*SchedulerConfig) ValidateAndAdjust

func (c *SchedulerConfig) ValidateAndAdjust() error

ValidateAndAdjust verifies that each parameter is valid.

type ServerConfig

type ServerConfig struct {
	Addr          string `toml:"addr" json:"addr"`
	AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

	LogFile  string     `toml:"log-file" json:"log-file"`
	LogLevel string     `toml:"log-level" json:"log-level"`
	Log      *LogConfig `toml:"log" json:"log"`

	DataDir string `toml:"data-dir" json:"data-dir"`

	GcTTL int64  `toml:"gc-ttl" json:"gc-ttl"`
	TZ    string `toml:"tz" json:"tz"`

	CaptureSessionTTL int `toml:"capture-session-ttl" json:"capture-session-ttl"`

	OwnerFlushInterval     TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"`
	ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"`

	Sorter                 *SorterConfig        `toml:"sorter" json:"sorter"`
	Security               *security.Credential `toml:"security" json:"security"`
	KVClient               *KVClientConfig      `toml:"kv-client" json:"kv-client"`
	Debug                  *DebugConfig         `toml:"debug" json:"debug"`
	ClusterID              string               `toml:"cluster-id" json:"cluster-id"`
	GcTunerMemoryThreshold uint64               `toml:"gc-tuner-memory-threshold" json:"gc-tuner-memory-threshold"`

	// Deprecated: we don't use this field anymore.
	PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"`
	// Deprecated: we don't use this field anymore.
	MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`
}

ServerConfig represents a config for server

func GetDefaultServerConfig

func GetDefaultServerConfig() *ServerConfig

GetDefaultServerConfig returns the default server config

func GetGlobalServerConfig

func GetGlobalServerConfig() *ServerConfig

GetGlobalServerConfig returns the global configuration for this server. It should store configuration from command line and configuration file. Other parts of the system can read the global configuration use this function.

func (*ServerConfig) Clone

func (c *ServerConfig) Clone() *ServerConfig

Clone clones a replication

func (*ServerConfig) Marshal

func (c *ServerConfig) Marshal() (string, error)

Marshal returns the json marshal format of a ServerConfig

func (*ServerConfig) String

func (c *ServerConfig) String() string

String implements the Stringer interface

func (*ServerConfig) Unmarshal

func (c *ServerConfig) Unmarshal(data []byte) error

Unmarshal unmarshals into *ServerConfig from json marshal byte slice

func (*ServerConfig) ValidateAndAdjust

func (c *ServerConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the server configuration

type SinkConfig

type SinkConfig struct {
	TxnAtomicity *AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity,omitempty"`
	// Protocol is NOT available when the downstream is DB.
	Protocol *string `toml:"protocol" json:"protocol,omitempty"`

	// DispatchRules is only available when the downstream is MQ.
	DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"`

	ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"`
	// SchemaRegistry is only available when the downstream is MQ using avro protocol.
	SchemaRegistry *string `toml:"schema-registry" json:"schema-registry,omitempty"`
	// EncoderConcurrency is only available when the downstream is MQ.
	EncoderConcurrency *int `toml:"encoder-concurrency" json:"encoder-concurrency,omitempty"`
	// Terminator is NOT available when the downstream is DB.
	Terminator *string `toml:"terminator" json:"terminator,omitempty"`
	// DateSeparator is only available when the downstream is Storage.
	DateSeparator *string `toml:"date-separator" json:"date-separator,omitempty"`
	// EnablePartitionSeparator is only available when the downstream is Storage.
	EnablePartitionSeparator *bool `toml:"enable-partition-separator" json:"enable-partition-separator,omitempty"`
	// FileIndexWidth is only available when the downstream is Storage
	FileIndexWidth *int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`

	// EnableKafkaSinkV2 enabled then the kafka-go sink will be used.
	// It is only available when the downstream is MQ.
	EnableKafkaSinkV2 *bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2,omitempty"`

	// OnlyOutputUpdatedColumns is only available when the downstream is MQ.
	OnlyOutputUpdatedColumns *bool `toml:"only-output-updated-columns" json:"only-output-updated-columns,omitempty"`

	// DeleteOnlyOutputHandleKeyColumns is only available when the downstream is MQ.
	DeleteOnlyOutputHandleKeyColumns *bool `toml:"delete-only-output-handle-key-columns" json:"delete-only-output-handle-key-columns,omitempty"`

	// ContentCompatible is only available when the downstream is MQ.
	ContentCompatible *bool `toml:"content-compatible" json:"content-compatible,omitempty"`

	// TiDBSourceID is the source ID of the upstream TiDB,
	// which is used to set the `tidb_cdc_write_source` session variable.
	// Note: This field is only used internally and only used in the MySQL sink.
	TiDBSourceID uint64 `toml:"-" json:"-"`
	// SafeMode is only available when the downstream is DB.
	SafeMode           *bool               `toml:"safe-mode" json:"safe-mode,omitempty"`
	KafkaConfig        *KafkaConfig        `toml:"kafka-config" json:"kafka-config,omitempty"`
	PulsarConfig       *PulsarConfig       `toml:"pulsar-config" json:"pulsar-config,omitempty"`
	MySQLConfig        *MySQLConfig        `toml:"mysql-config" json:"mysql-config,omitempty"`
	CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"`

	// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
	// advanced for this given duration, the sink will be canceled and re-established.
	// Deprecated since v8.1.1
	AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`

	// Simple Protocol only config, use to control the behavior of sending bootstrap message.
	// Note: When one of the following conditions is set to negative value,
	// bootstrap sending function will be disabled.
	// SendBootstrapIntervalInSec is the interval in seconds to send bootstrap message.
	SendBootstrapIntervalInSec *int64 `toml:"send-bootstrap-interval-in-sec" json:"send-bootstrap-interval-in-sec,omitempty"`
	// SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages.
	SendBootstrapInMsgCount *int32 `toml:"send-bootstrap-in-msg-count" json:"send-bootstrap-in-msg-count,omitempty"`
	// SendBootstrapToAllPartition determines whether to send bootstrap message to all partitions.
	// If set to false, bootstrap message will only be sent to the first partition of each topic.
	// Default value is true.
	SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`
	// SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.
	SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
	// Debezium only. Whether schema should be excluded in the output.
	DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

	// CSVConfig is only available when the downstream is Storage.
	CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"`
	// OpenProtocol related configurations
	OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"`
	// DebeziumConfig related configurations
	Debezium *DebeziumConfig `toml:"debezium" json:"debezium,omitempty"`
}

SinkConfig represents sink config for a changefeed

func (*SinkConfig) CheckCompatibilityWithSinkURI

func (s *SinkConfig) CheckCompatibilityWithSinkURI(
	oldSinkConfig *SinkConfig, sinkURIStr string,
) error

CheckCompatibilityWithSinkURI check whether the sinkURI is compatible with the sink config.

func (*SinkConfig) MaskSensitiveData

func (s *SinkConfig) MaskSensitiveData()

MaskSensitiveData masks sensitive data in SinkConfig

func (*SinkConfig) ShouldSendAllBootstrapAtStart

func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool

ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.

func (*SinkConfig) ShouldSendBootstrapMsg

func (s *SinkConfig) ShouldSendBootstrapMsg() bool

ShouldSendBootstrapMsg returns whether the sink should send bootstrap message. Only enable bootstrap sending function for simple protocol and when both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count are > 0

func (*SinkConfig) ValidateProtocol

func (s *SinkConfig) ValidateProtocol(scheme string) error

ValidateProtocol validates the protocol configuration.

type SorterConfig

type SorterConfig struct {
	// the directory used to store the temporary files generated by the sorter
	SortDir string `toml:"sort-dir" json:"sort-dir"`

	// Cache size of sorter in MB.
	CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"`

	// Deprecated: we don't use this field anymore.
	MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`
	// Deprecated: we don't use this field anymore.
	MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"`
	// Deprecated: we don't use this field anymore.
	NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"`
	// Deprecated: we don't use this field anymore .
	NumConcurrentWorker int `toml:"num-concurrent-worker" json:"num-concurrent-worker"`
	// Deprecated: we don't use this field anymore.
	ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"`
}

SorterConfig represents sorter config for a changefeed

func (*SorterConfig) ValidateAndAdjust

func (c *SorterConfig) ValidateAndAdjust() error

ValidateAndAdjust validates and adjusts the sorter configuration

type SyncedStatusConfig

type SyncedStatusConfig struct {
	// The minimum interval between the latest synced ts and now required to reach synced state
	SyncedCheckInterval int64 `toml:"synced-check-interval" json:"synced-check-interval"`
	// The maximum interval between latest checkpoint ts and now or
	// between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state
	CheckpointInterval int64 `toml:"checkpoint-interval" json:"checkpoint-interval"`
}

SyncedStatusConfig represents synced check interval config for a changefeed

type TimeMill

type TimeMill int

TimeMill is the time in milliseconds

func NewTimeMill

func NewTimeMill(x int) *TimeMill

NewTimeMill returns a new time in milliseconds

func (*TimeMill) Duration

func (t *TimeMill) Duration() time.Duration

Duration returns the time in seconds as a duration

type TimeSec

type TimeSec int

TimeSec is the time in seconds

func NewTimeSec

func NewTimeSec(x int) *TimeSec

NewTimeSec returns a new time in seconds

func (*TimeSec) Duration

func (t *TimeSec) Duration() time.Duration

Duration returns the time in seconds as a duration

type TomlDuration

type TomlDuration time.Duration

TomlDuration is a duration with a custom json decoder and toml decoder

func (*TomlDuration) UnmarshalJSON

func (d *TomlDuration) UnmarshalJSON(b []byte) error

UnmarshalJSON is the json decoder

func (*TomlDuration) UnmarshalText

func (d *TomlDuration) UnmarshalText(text []byte) error

UnmarshalText is the toml decoder

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL