common

package
v0.0.0-...-91902aa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: Apache-2.0 Imports: 29 Imported by: 13

Documentation

Index

Constants

View Source
const (
	// DecimalHandlingModeString is the string mode for decimal handling
	DecimalHandlingModeString = "string"
	// DecimalHandlingModePrecise is the precise mode for decimal handling
	DecimalHandlingModePrecise = "precise"
	// BigintUnsignedHandlingModeString is the string mode for unsigned bigint handling
	BigintUnsignedHandlingModeString = "string"
	// BigintUnsignedHandlingModeLong is the long mode for unsigned bigint handling
	BigintUnsignedHandlingModeLong = "long"
)
View Source
const (
	// SchemaRegistryTypeConfluent is the type of Confluent Schema Registry
	SchemaRegistryTypeConfluent = "confluent"
	// SchemaRegistryTypeGlue is the type of AWS Glue Schema Registry
	SchemaRegistryTypeGlue = "glue"
)
View Source
const MaxRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1

MaxRecordOverhead is used to calculate message size by sarama kafka client. reference: https://github.com/IBM/sarama/blob/ 66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 For TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.

Variables

This section is empty.

Functions

func CleanMetrics

func CleanMetrics(changefeedID model.ChangeFeedID)

CleanMetrics remove metrics belong to the given changefeed.

func Compress

func Compress(changefeedID model.ChangeFeedID, cc string, data []byte) ([]byte, error)

Compress the given data by the given compression, also record the compression ratio metric.

func Decompress

func Decompress(cc string, data []byte) ([]byte, error)

Decompress the given data by the given compression.

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file

func MustBinaryLiteralToInt

func MustBinaryLiteralToInt(bytes []byte) uint64

MustBinaryLiteralToInt convert bytes into uint64, by follow https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105

func MustQueryTimezone

func MustQueryTimezone(ctx context.Context, db *sql.DB) string

MustQueryTimezone query the timezone from the upstream database

func SanitizeTopicName

func SanitizeTopicName(name string) string

SanitizeTopicName escapes not permitted chars for topic name https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java

func UnsafeBytesToString

func UnsafeBytesToString(b []byte) string

UnsafeBytesToString create string from byte slice without copying

func UnsafeStringToBytes

func UnsafeStringToBytes(s string) []byte

UnsafeStringToBytes create byte slice from string without copying

func VerifyChecksum

func VerifyChecksum(event *model.RowChangedEvent, db *sql.DB) error

VerifyChecksum calculate the checksum value, and compare it with the expected one, return error if not identical.

Types

type ClaimCheckMessage

type ClaimCheckMessage struct {
	Key   []byte `json:"key"`
	Value []byte `json:"value"`
}

ClaimCheckMessage is the message sent to the claim-check external storage.

func UnmarshalClaimCheckMessage

func UnmarshalClaimCheckMessage(data []byte) (*ClaimCheckMessage, error)

UnmarshalClaimCheckMessage unmarshal bytes to ClaimCheckMessage.

type ColumnsHolder

type ColumnsHolder struct {
	Values        []interface{}
	ValuePointers []interface{}
	Types         []*sql.ColumnType
}

ColumnsHolder read columns from sql.Rows

func MustSnapshotQuery

func MustSnapshotQuery(
	ctx context.Context, db *sql.DB, commitTs uint64, schema, table string, conditions map[string]interface{},
) *ColumnsHolder

MustSnapshotQuery query the db by the snapshot read with the given commitTs

func (*ColumnsHolder) Length

func (h *ColumnsHolder) Length() int

Length return the column count

type Config

type Config struct {
	ChangefeedID model.ChangeFeedID

	Protocol config.Protocol

	// control batch behavior, only for `open-protocol` and `craft` at the moment.
	MaxMessageBytes int
	MaxBatchSize    int

	// DeleteOnlyHandleKeyColumns is true, for the delete event only output the handle key columns.
	DeleteOnlyHandleKeyColumns bool

	LargeMessageHandle *config.LargeMessageHandleConfig

	EnableTiDBExtension bool
	EnableRowChecksum   bool

	// avro only
	AvroConfluentSchemaRegistry    string
	AvroDecimalHandlingMode        string
	AvroBigintUnsignedHandlingMode string
	AvroGlueSchemaRegistry         *config.GlueSchemaRegistryConfig
	// EnableWatermarkEvent set to true, avro encode DDL and checkpoint event
	// and send to the downstream kafka, they cannot be consumed by the confluent official consumer
	// and would cause error, so this is only used for ticdc internal testing purpose, should not be
	// exposed to the outside users.
	AvroEnableWatermark bool

	// canal-json only
	ContentCompatible bool

	// for sinking to cloud storage
	Delimiter            string
	Quote                string
	NullString           string
	IncludeCommitTs      bool
	Terminator           string
	BinaryEncodingMethod string
	OutputOldValue       bool
	OutputHandleKey      bool

	// for open protocol, and canal-json
	OnlyOutputUpdatedColumns bool
	// Whether old value should be excluded in the output.
	OpenOutputOldValue bool

	// for the simple protocol, can be "json" and "avro", default to "json"
	EncodingFormat EncodingFormatType

	// Currently only Debezium protocol is aware of the time zone
	TimeZone *time.Location

	// Debezium only. Whether schema should be excluded in the output.
	DebeziumDisableSchema bool
	// Debezium only. Whether before value should be included in the output.
	DebeziumOutputOldValue bool
}

Config use to create the encoder

func NewConfig

func NewConfig(protocol config.Protocol) *Config

NewConfig return a Config for codec

func (*Config) Apply

func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) error

Apply fill the Config

func (*Config) SchemaRegistryType

func (c *Config) SchemaRegistryType() string

SchemaRegistryType returns the type of schema registry

func (*Config) Validate

func (c *Config) Validate() error

Validate the Config

func (*Config) WithChangefeedID

func (c *Config) WithChangefeedID(id model.ChangeFeedID) *Config

WithChangefeedID set the `changefeedID`

func (*Config) WithMaxMessageBytes

func (c *Config) WithMaxMessageBytes(bytes int) *Config

WithMaxMessageBytes set the `maxMessageBytes`

type EncodingFormatType

type EncodingFormatType string

EncodingFormatType is the type of encoding format

const (
	// EncodingFormatJSON is the json format
	EncodingFormatJSON EncodingFormatType = "json"
	// EncodingFormatAvro is the avro format
	EncodingFormatAvro EncodingFormatType = "avro"
)

type Message

type Message struct {
	Key      []byte
	Value    []byte
	Ts       uint64            // reserved for possible output sorting
	Schema   *string           // schema
	Table    *string           // table
	Type     model.MessageType // type
	Protocol config.Protocol   // protocol

	Callback func() // Callback function will be called when the message is sent to the sink.

	// PartitionKey for pulsar, route messages to one or different partitions
	PartitionKey *string
	// contains filtered or unexported fields
}

Message represents an message to the sink

func NewDDLMsg

func NewDDLMsg(proto config.Protocol, key, value []byte, event *model.DDLEvent) *Message

NewDDLMsg creates a DDL message.

func NewMsg

func NewMsg(
	proto config.Protocol,
	key []byte,
	value []byte,
	ts uint64,
	ty model.MessageType,
	schema, table *string,
) *Message

NewMsg should be used when creating a Message struct. It copies the input byte slices to avoid any surprises in asynchronous MQ writes.

func NewResolvedMsg

func NewResolvedMsg(proto config.Protocol, key, value []byte, ts uint64) *Message

NewResolvedMsg creates a resolved ts message.

func (*Message) GetPartitionKey

func (m *Message) GetPartitionKey() string

GetPartitionKey returns the GetPartitionKey

func (*Message) GetRowsCount

func (m *Message) GetRowsCount() int

GetRowsCount returns the number of rows batched in one Message

func (*Message) GetSchema

func (m *Message) GetSchema() string

GetSchema returns schema string

func (*Message) GetTable

func (m *Message) GetTable() string

GetTable returns the Table string

func (*Message) IncRowsCount

func (m *Message) IncRowsCount()

IncRowsCount increase the number of rows

func (*Message) Length

func (m *Message) Length() int

Length returns the expected size of the Kafka message We didn't append any `Headers` when send the message, so ignore the calculations related to it. If `ProducerMessage` Headers fields used, this method should also adjust.

func (*Message) PhysicalTime

func (m *Message) PhysicalTime() time.Time

PhysicalTime returns physical time part of Ts in time.Time

func (*Message) SetPartitionKey

func (m *Message) SetPartitionKey(key string)

SetPartitionKey sets the PartitionKey for a message PartitionKey is used for pulsar producer, route messages to one or different partitions

func (*Message) SetRowsCount

func (m *Message) SetRowsCount(cnt int)

SetRowsCount set the number of rows

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL