Documentation ¶
Index ¶
- Constants
- func CleanMetrics(changefeedID model.ChangeFeedID)
- func Compress(changefeedID model.ChangeFeedID, cc string, data []byte) ([]byte, error)
- func Decompress(cc string, data []byte) ([]byte, error)
- func EscapeEnumAndSetOptions(option string) string
- func InitMetrics(registry *prometheus.Registry)
- func MustBinaryLiteralToInt(bytes []byte) uint64
- func MustQueryTimezone(ctx context.Context, db *sql.DB) string
- func SanitizeName(name string) string
- func SanitizeTopicName(name string) string
- func UnsafeBytesToString(b []byte) string
- func UnsafeStringToBytes(s string) []byte
- func VerifyChecksum(event *model.RowChangedEvent, db *sql.DB) error
- type ClaimCheckMessage
- type ColumnsHolder
- type Config
- type EncodingFormatType
- type Message
- func (m *Message) GetPartitionKey() string
- func (m *Message) GetRowsCount() int
- func (m *Message) GetSchema() string
- func (m *Message) GetTable() string
- func (m *Message) IncRowsCount()
- func (m *Message) Length() int
- func (m *Message) PhysicalTime() time.Time
- func (m *Message) SetPartitionKey(key string)
- func (m *Message) SetRowsCount(cnt int)
Constants ¶
const ( // DecimalHandlingModeString is the string mode for decimal handling DecimalHandlingModeString = "string" // DecimalHandlingModePrecise is the precise mode for decimal handling DecimalHandlingModePrecise = "precise" // BigintUnsignedHandlingModeString is the string mode for unsigned bigint handling BigintUnsignedHandlingModeString = "string" // BigintUnsignedHandlingModeLong is the long mode for unsigned bigint handling BigintUnsignedHandlingModeLong = "long" )
const ( // SchemaRegistryTypeConfluent is the type of Confluent Schema Registry SchemaRegistryTypeConfluent = "confluent" // SchemaRegistryTypeGlue is the type of AWS Glue Schema Registry SchemaRegistryTypeGlue = "glue" )
const MaxRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
MaxRecordOverhead is used to calculate message size by sarama kafka client. reference: https://github.com/IBM/sarama/blob/ 66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 For TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
Variables ¶
This section is empty.
Functions ¶
func CleanMetrics ¶
func CleanMetrics(changefeedID model.ChangeFeedID)
CleanMetrics remove metrics belong to the given changefeed.
func Compress ¶
Compress the given data by the given compression, also record the compression ratio metric.
func Decompress ¶
Decompress the given data by the given compression.
func EscapeEnumAndSetOptions ¶
EscapeEnumAndSetOptions escapes ",", "\" and "”" https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/MySqlAntlrDdlParser.java#L374
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file
func MustBinaryLiteralToInt ¶
MustBinaryLiteralToInt convert bytes into uint64, by follow https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105
func MustQueryTimezone ¶
MustQueryTimezone query the timezone from the upstream database
func SanitizeName ¶
SanitizeName escapes not permitted chars https://avro.apache.org/docs/1.12.0/specification/#names see https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/schema/SchemaNameAdjuster.java
func SanitizeTopicName ¶
SanitizeTopicName escapes not permitted chars for topic name https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java
func UnsafeBytesToString ¶
UnsafeBytesToString create string from byte slice without copying
func UnsafeStringToBytes ¶
UnsafeStringToBytes create byte slice from string without copying
func VerifyChecksum ¶
func VerifyChecksum(event *model.RowChangedEvent, db *sql.DB) error
VerifyChecksum calculate the checksum value, and compare it with the expected one, return error if not identical.
Types ¶
type ClaimCheckMessage ¶
ClaimCheckMessage is the message sent to the claim-check external storage.
func UnmarshalClaimCheckMessage ¶
func UnmarshalClaimCheckMessage(data []byte) (*ClaimCheckMessage, error)
UnmarshalClaimCheckMessage unmarshal bytes to ClaimCheckMessage.
type ColumnsHolder ¶
type ColumnsHolder struct { Values []interface{} ValuePointers []interface{} Types []*sql.ColumnType }
ColumnsHolder read columns from sql.Rows
func MustSnapshotQuery ¶
func MustSnapshotQuery( ctx context.Context, db *sql.DB, commitTs uint64, schema, table string, conditions map[string]interface{}, ) *ColumnsHolder
MustSnapshotQuery query the db by the snapshot read with the given commitTs
type Config ¶
type Config struct { ChangefeedID model.ChangeFeedID Protocol config.Protocol // control batch behavior, only for `open-protocol` and `craft` at the moment. MaxMessageBytes int MaxBatchSize int // DeleteOnlyHandleKeyColumns is true, for the delete event only output the handle key columns. DeleteOnlyHandleKeyColumns bool LargeMessageHandle *config.LargeMessageHandleConfig EnableTiDBExtension bool EnableRowChecksum bool // avro only AvroConfluentSchemaRegistry string AvroDecimalHandlingMode string AvroBigintUnsignedHandlingMode string AvroGlueSchemaRegistry *config.GlueSchemaRegistryConfig // EnableWatermarkEvent set to true, avro encode DDL and checkpoint event // and send to the downstream kafka, they cannot be consumed by the confluent official consumer // and would cause error, so this is only used for ticdc internal testing purpose, should not be // exposed to the outside users. AvroEnableWatermark bool // canal-json only ContentCompatible bool // for sinking to cloud storage Delimiter string Quote string NullString string IncludeCommitTs bool Terminator string BinaryEncodingMethod string OutputOldValue bool OutputHandleKey bool // for open protocol, and canal-json OnlyOutputUpdatedColumns bool // Whether old value should be excluded in the output. OpenOutputOldValue bool // for the simple protocol, can be "json" and "avro", default to "json" EncodingFormat EncodingFormatType // Currently only Debezium protocol is aware of the time zone TimeZone *time.Location // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema bool // Debezium only. Whether before value should be included in the output. DebeziumOutputOldValue bool }
Config use to create the encoder
func (*Config) SchemaRegistryType ¶
SchemaRegistryType returns the type of schema registry
func (*Config) WithChangefeedID ¶
func (c *Config) WithChangefeedID(id model.ChangeFeedID) *Config
WithChangefeedID set the `changefeedID`
func (*Config) WithMaxMessageBytes ¶
WithMaxMessageBytes set the `maxMessageBytes`
type EncodingFormatType ¶
type EncodingFormatType string
EncodingFormatType is the type of encoding format
const ( // EncodingFormatJSON is the json format EncodingFormatJSON EncodingFormatType = "json" // EncodingFormatAvro is the avro format EncodingFormatAvro EncodingFormatType = "avro" )
type Message ¶
type Message struct { Key []byte Value []byte Ts uint64 // reserved for possible output sorting Schema *string // schema Table *string // table Type model.MessageType // type Protocol config.Protocol // protocol Callback func() // Callback function will be called when the message is sent to the sink. // PartitionKey for pulsar, route messages to one or different partitions PartitionKey *string // contains filtered or unexported fields }
Message represents an message to the sink
func NewMsg ¶
func NewMsg( proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MessageType, schema, table *string, ) *Message
NewMsg should be used when creating a Message struct. It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewResolvedMsg ¶
NewResolvedMsg creates a resolved ts message.
func (*Message) GetPartitionKey ¶
GetPartitionKey returns the GetPartitionKey
func (*Message) GetRowsCount ¶
GetRowsCount returns the number of rows batched in one Message
func (*Message) IncRowsCount ¶
func (m *Message) IncRowsCount()
IncRowsCount increase the number of rows
func (*Message) Length ¶
Length returns the expected size of the Kafka message We didn't append any `Headers` when send the message, so ignore the calculations related to it. If `ProducerMessage` Headers fields used, this method should also adjust.
func (*Message) PhysicalTime ¶
PhysicalTime returns physical time part of Ts in time.Time
func (*Message) SetPartitionKey ¶
SetPartitionKey sets the PartitionKey for a message PartitionKey is used for pulsar producer, route messages to one or different partitions
func (*Message) SetRowsCount ¶
SetRowsCount set the number of rows