changeitem

package
v0.0.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableConsumerKeeper = "__consumer_keeper"
	TableLSN            = "__data_transfer_lsn"
)
View Source
const (
	DropTableKind     = Kind("drop_table")
	TruncateTableKind = Kind("truncate")

	// InitShardedTableLoad is table header.
	// All Init/DoneTableLoad and data items for the same table should be sent strictly after this kind of item
	InitShardedTableLoad = Kind("init_sharded_table_load")
	// InitTableLoad is table part header. It should be sent for each table part before uploading any data for that part.
	// If the table is not sharded, it is considered to have a single part so InitTableLoad should be sent
	// before uploading the whole table.
	InitTableLoad = Kind("init_load_table")
	// DoneTableLoad is table part trailer. It should be sent for each table part after all data events has been sent.
	// If the table is not sharded, it is considered to have a single part so DoneTableLoad should be sent
	// after uploading the whole table.
	DoneTableLoad = Kind("done_load_table")
	// DoneShardedTableLoad is table trailer. No table control or row items should be sent for the table after this item.
	DoneShardedTableLoad = Kind("done_sharded_table_load")

	InsertKind                   = Kind("insert")
	UpdateKind                   = Kind("update")
	DeleteKind                   = Kind("delete")
	PgDDLKind                    = Kind("pg:DDL")
	DDLKind                      = Kind("DDL")
	MongoUpdateDocumentKind      = Kind("mongo:update_document")
	MongoCreateKind              = Kind("mongo:create")
	MongoDropKind                = Kind("mongo:drop")
	MongoRenameKind              = Kind("mongo:rename")
	MongoDropDatabaseKind        = Kind("mongo:dropDatabase")
	MongoNoop                    = Kind("mongo:noop")
	ChCreateTableDistributedKind = Kind("ch:createTableDistributed")
	ChCreateTableKind            = Kind("ch:createTable")
	ClickhouseDDLBuilderKind     = Kind("ch:DDLBuilder")
	ElasticsearchDumpIndexKind   = Kind("es:dumpIndex")
	SynchronizeKind              = Kind("")
)
View Source
const (
	RawMessageTopic     = "topic"
	RawMessagePartition = "partition"
	RawMessageSeqNo     = "seq_no"
	RawMessageWriteTime = "write_time"
	RawMessageData      = "data"

	OriginalTypeMirrorBinary = "mirror:binary"
)
View Source
const (
	DefaultPropertyKey = "default"
)

Variables

View Source
var (
	RawDataSchema = NewTableSchema([]ColSchema{
		{ColumnName: RawMessageTopic, DataType: string(schema.TypeString), PrimaryKey: true, Required: true},
		{ColumnName: RawMessagePartition, DataType: string(schema.TypeUint32), PrimaryKey: true, Required: true},
		{ColumnName: RawMessageSeqNo, DataType: string(schema.TypeUint64), PrimaryKey: true, Required: true},
		{ColumnName: RawMessageWriteTime, DataType: string(schema.TypeDatetime), PrimaryKey: true, Required: true},
		{ColumnName: RawMessageData, DataType: string(schema.TypeString), OriginalType: OriginalTypeMirrorBinary},
	})
	RawDataColumns = []string{RawMessageTopic, RawMessagePartition, RawMessageSeqNo, RawMessageWriteTime, RawMessageData}
	RawDataColsIDX = ColIDX(RawDataSchema.Columns())
)

Functions

func ColIDX

func ColIDX(schema []ColSchema) map[string]int

func ContainsNonRowItem

func ContainsNonRowItem(s []ChangeItem) bool

func Dump

func Dump(input []ChangeItem)

func GetRawMessageData

func GetRawMessageData(r ChangeItem) ([]byte, error)

func IsSystemTable

func IsSystemTable(in string) bool

func KeyNames

func KeyNames(in []ColSchema) []string

func MakeMapColNameToIndex

func MakeMapColNameToIndex(in []ColSchema) map[string]int

MakeMapColNameToIndex returns a mapping of a column name to an index in the given slice

func PgName

func PgName(schemaName, tableName string) string

func RegisterSystemTables

func RegisterSystemTables(tableNames ...string)

func Sniff

func Sniff(input []ChangeItem) string

Sniff dat data

func SplitByTableID

func SplitByTableID(batch []ChangeItem) map[TableID][]ChangeItem

func SplitUpdatedPKeys

func SplitUpdatedPKeys(input []ChangeItem) [][]ChangeItem

SplitUpdatedPKeys takes a list of changes and splits them into sublists based on updated PKeys. Each sublist includes all the previous changes up to the change with updated PKey. This change split into two sub-changes one delete (for old row) and one insert. Note:

  • The order of changes within each sublist is maintained.

func SystemTables

func SystemTables() []string

Types

type ChangeItem

type ChangeItem struct {
	ID         uint32 // Transaction ID. For snapshot: id==0, for mongo on replication is also 0.
	LSN        uint64 // it's ok to be 0 for snapshot
	CommitTime uint64
	Counter    int
	// This field isn't used a lot in code
	// but in __WAL txPosition used as part of composite primary key
	// so, it's very important field!
	Kind   Kind   // insert/update/delete/truncate/DDL/pg:DDL
	Schema string // for pg: filled by wal2json - contains "public", for ydb: always empty string
	Table  string // for pg: filled by wal2json - contains tableName, for ydb: depends on 'UseFullPaths'
	// PartID is used to identify part of sharded upload to make it possible to distinguish items which belong to different parts.
	// PartIDs must be unique within a single table. Each source may use its own PartID generation rules.
	// Sinker should interpret PartID as a plain string to be used only to distinguish one part from another and should not attempt to parse or decode it somehow.
	// PartID should contain only characters that may be used in table identifiers (latin characters, digits, '-' and '_' chars)
	PartID string

	// ColumnNames is a write-once field. Its value may be reused across multiple ChangeItems.
	// Do not use this field in new code! Use column name from table schema instead.
	// This field may be empty when the table schema is not.
	ColumnNames  []string
	ColumnValues []interface{}

	// TableSchema is a write-once field. Its value may be reused across multiple ChangeItems
	//	schema filled on all rows event + init / done table load
	TableSchema *TableSchema

	// OldKeys is a set of (PRIMARY) keys identifying the previous version of the tuple which this item represents.
	//
	// There are also special cases when this field contains different data:
	//   - PG source: contains PRIMARY KEY fields as well as other fields for which UNIQUE indexes exist. Note that when there is no PRIMARY KEY, PG source considers one of the UNIQUE indexes to be the PRIMARY KEY. When REPLICA IDENTITY FULL is set, contains all fields.
	//   - MySQL source: contains the same fields as columnNames or keys, depends on binlog_row_image.
	//
	// There may be other specific cases in addition to the ones mentioned above. In any case, one must rely on TableSchema, not on the presence or absence of particular columns inside this field.
	OldKeys OldKeysType

	TxID string // this only valid for mysql so far, for now we will not store it in YT

	Query string    // optional, can be filled only for items from mysql binlogs with binlog_rows_query_log_events enabled
	Size  EventSize // optional event size
}

func ChangeItemFromMap

func ChangeItemFromMap(input map[string]interface{}, schema *TableSchema, table string, kind string) ChangeItem

func Collapse

func Collapse(input []ChangeItem) []ChangeItem

Collapse collapses (possible) multiple items in the input into a single (or none) items in the output. Currently, it preserves the order of items in the result. It should only be applied by sinks which support PRIMARY KEYs. For them the order of items is considered to be of no importance.

func FindItemOfKind

func FindItemOfKind(s []ChangeItem, kinds ...Kind) *ChangeItem

FindItemOfKind returns an item in the slice whose kind is among the given ones, or nil if there is no such item in it

func MakeRawMessage

func MakeRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, data []byte) ChangeItem

func (*ChangeItem) AddTableColumn

func (c *ChangeItem) AddTableColumn(column ColSchema)

func (*ChangeItem) AsMap

func (c *ChangeItem) AsMap() map[string]interface{}

func (*ChangeItem) ColumnNameIndex

func (c *ChangeItem) ColumnNameIndex(columnName string) int

func (*ChangeItem) ColumnNameIndices

func (c *ChangeItem) ColumnNameIndices() map[string]int

func (*ChangeItem) CurrentKeysString

func (c *ChangeItem) CurrentKeysString(keyColumns map[string]bool) string

func (*ChangeItem) EnsureSanity

func (c *ChangeItem) EnsureSanity() error

EnsureSanity ensures the given item is a sound and sane one which follows the contract between item's fields.

It should be removed one day when the contract is ingrained in the item's structure.

func (*ChangeItem) Fqtn

func (c *ChangeItem) Fqtn() string

func (*ChangeItem) IsMirror

func (c *ChangeItem) IsMirror() bool

IsMirror mirror - it's special format with hardcoded schema - used for "mirroring" (queue->queue) - transfer messages between queues without changes

func (*ChangeItem) IsRowEvent

func (c *ChangeItem) IsRowEvent() bool

func (*ChangeItem) IsSystemKind

func (c *ChangeItem) IsSystemKind() bool

func (*ChangeItem) IsSystemTable

func (c *ChangeItem) IsSystemTable() bool

func (*ChangeItem) IsToasted

func (c *ChangeItem) IsToasted() bool

IsToasted check is change item contains all old-values for update/delete kinds.

func (*ChangeItem) IsTxDone

func (c *ChangeItem) IsTxDone() bool

func (*ChangeItem) KeyCols

func (c *ChangeItem) KeyCols() []string

func (*ChangeItem) KeyVals

func (c *ChangeItem) KeyVals() []string

func (*ChangeItem) KeysAsMap

func (c *ChangeItem) KeysAsMap() map[string]interface{}

func (*ChangeItem) KeysChanged

func (c *ChangeItem) KeysChanged() bool

KeysChanged - checks if update changes primary keys NOTE: this method is quite inefficient

func (*ChangeItem) MakeMapKeys

func (c *ChangeItem) MakeMapKeys() map[string]bool

func (ChangeItem) MarshalJSON

func (c ChangeItem) MarshalJSON() ([]byte, error)

func (ChangeItem) MarshalYSON

func (c ChangeItem) MarshalYSON() ([]byte, error)

func (*ChangeItem) Offset

func (c *ChangeItem) Offset() (offset uint64, found bool)

func (*ChangeItem) OldOrCurrentKeysString

func (c *ChangeItem) OldOrCurrentKeysString(keyColumns map[string]bool) string

OldOrCurrentKeys returns a string representing the values of the columns from the given set of columns, extracted from OldKeys or ColumnValues, if OldKeys are absent

func (*ChangeItem) Part

func (c *ChangeItem) Part() string

func (*ChangeItem) PgName

func (c *ChangeItem) PgName() string

func (*ChangeItem) RemoveColumns

func (c *ChangeItem) RemoveColumns(cols ...string)

RemoveColumns mutate change item to skip some columns from it. it remove it from column names and column values

func (*ChangeItem) SetTableID

func (c *ChangeItem) SetTableID(tableID TableID)

func (*ChangeItem) SetTableSchema

func (c *ChangeItem) SetTableSchema(tableSchema *TableSchema)

func (*ChangeItem) TableID

func (c *ChangeItem) TableID() TableID

func (*ChangeItem) TablePartID

func (c *ChangeItem) TablePartID() TablePartID

func (*ChangeItem) ToJSONString

func (c *ChangeItem) ToJSONString() string

func (*ChangeItem) UnmarshalJSON

func (c *ChangeItem) UnmarshalJSON(jsonData []byte) error

func (*ChangeItem) UnmarshalYSON

func (c *ChangeItem) UnmarshalYSON(data []byte) error

type ColSchema

type ColSchema struct {
	TableSchema  string `json:"table_schema"` // table namespace - for SQL it's database name
	TableName    string `json:"table_name"`
	Path         string `json:"path"`
	ColumnName   string `json:"name"`
	DataType     string `json:"type"` // string with YT type from arcadia/yt/go/schema/schema.go
	PrimaryKey   bool   `json:"key"`
	FakeKey      bool   `json:"fake_key"`      // TODO: remove after migration (TM-1225)
	Required     bool   `json:"required"`      // Required - it's about 'can field contains nil'
	Expression   string `json:"expression"`    // expression for generated columns
	OriginalType string `json:"original_type"` // db-prefix:db-specific-type. example: "mysql:bigint(20) unsigned"

	// field to carry additional optional info.
	// It's either nil or pair key-value. Value can be nil, if it's meaningful
	Properties map[PropertyKey]interface{} `json:"properties,omitempty"`
}

func MakeOriginallyTypedColSchema

func MakeOriginallyTypedColSchema(colName, dataType string, origType string) ColSchema

func MakeTypedColSchema

func MakeTypedColSchema(colName, dataType string, primaryKey bool) ColSchema

func NewColSchema

func NewColSchema(columnName string, dataType schema.Type, isPrimaryKey bool) ColSchema

func (*ColSchema) AddProperty

func (c *ColSchema) AddProperty(key PropertyKey, val interface{})

func (*ColSchema) ColPath

func (c *ColSchema) ColPath() string

func (*ColSchema) Copy

func (c *ColSchema) Copy() *ColSchema

func (*ColSchema) Fqtn

func (c *ColSchema) Fqtn() string

func (*ColSchema) IsKey

func (c *ColSchema) IsKey() bool

func (*ColSchema) IsNestedKey

func (c *ColSchema) IsNestedKey() bool

func (*ColSchema) Numeric

func (c *ColSchema) Numeric() bool

func (*ColSchema) String

func (c *ColSchema) String() string

func (*ColSchema) TableID

func (c *ColSchema) TableID() TableID

type ColumnName

type ColumnName string

ColumnName is an explicitly declared column name

type DBSchema

type DBSchema map[TableID]*TableSchema

func (DBSchema) CheckPrimaryKeys

func (s DBSchema) CheckPrimaryKeys(filter includeable) error

func (DBSchema) FakePkeyTables

func (s DBSchema) FakePkeyTables(filter includeable) []TableID

func (*DBSchema) String

func (s *DBSchema) String(withSchema bool, filter includeable) string

type EventSize

type EventSize struct {
	// Read is the byte length of data read to produce an item
	Read uint64
	// Values is the size of ColumnValues
	Values uint64
}

func EmptyEventSize

func EmptyEventSize() EventSize

func RawEventSize

func RawEventSize(readBytes uint64) EventSize

type FastTableSchema

type FastTableSchema map[ColumnName]ColSchema

FastTableSchema is a mapping from column name to its schema

func MakeFastTableSchema

func MakeFastTableSchema(slowTableSchema []ColSchema) FastTableSchema

MakeFastTableSchema produces a fast table schema from an array of schemas of each column. Column names are taken from these schemas.

type Kind

type Kind string

type OldKeysType

type OldKeysType struct {
	KeyNames  []string      `json:"keynames,omitempty"`
	KeyTypes  []string      `json:"keytypes,omitempty"`
	KeyValues []interface{} `json:"keyvalues,omitempty"`
}

func EmptyOldKeys

func EmptyOldKeys() OldKeysType

type Partition

type Partition struct {
	Cluster   string `json:"cluster"`
	Partition uint32 `json:"partition"`
	Topic     string `json:"topic"`
}

func NewPartition

func NewPartition(topicWithCluster string, partition uint32) Partition

func (Partition) LegacyShittyString

func (p Partition) LegacyShittyString() string

func (Partition) String

func (p Partition) String() string

type PropertyKey

type PropertyKey string

type TableColumns

type TableColumns []ColSchema

func (TableColumns) ColumnNames

func (s TableColumns) ColumnNames() []string

func (TableColumns) Copy

func (s TableColumns) Copy() TableColumns

func (TableColumns) HasFakeKeys

func (s TableColumns) HasFakeKeys() bool

func (TableColumns) HasPrimaryKey

func (s TableColumns) HasPrimaryKey() bool

func (TableColumns) KeysNum

func (s TableColumns) KeysNum() int

type TableID

type TableID struct {
	Namespace string // Schema for PostgreSQL, database name for MySQL and MongoDB; empty string for YT/YDB
	Name      string // Table name for relational databases, collection name for MongoDB. For YDB - full path
}

func NewTableID

func NewTableID(namespace string, name string) *TableID

func (TableID) Equals

func (t TableID) Equals(other TableID) bool

func (TableID) Fqtn

func (t TableID) Fqtn() string

Fqtn returns a "fully qualified" table name / FQTN. FQTN is a string with special properties - see definition of this function to figure them out.

func (TableID) Includes

func (t TableID) Includes(sub TableID) bool

Includes returns true if the given ID identifies a subset of tables identified by the current ID.

In other words, it is a result of an operation "is a superset of"

func (*TableID) IsSystemTable

func (t *TableID) IsSystemTable() bool

func (TableID) Less

func (t TableID) Less(other TableID) int

func (TableID) String

func (t TableID) String() string

type TablePartID

type TablePartID struct {
	TableID
	// PartID is the same as ChangeItem.PartID
	PartID string
}

TablePartID describes table part for sharded upload. Sinker may use this type to distinguish different upload parts of different tables.

func (*TablePartID) FqtnWithPartID

func (t *TablePartID) FqtnWithPartID() string

type TableSchema

type TableSchema struct {
	// contains filtered or unexported fields
}

func NewTableSchema

func NewTableSchema(columns []ColSchema) *TableSchema

func (*TableSchema) ColumnNames

func (s *TableSchema) ColumnNames() []string

func (*TableSchema) Columns

func (s *TableSchema) Columns() TableColumns

func (*TableSchema) Copy

func (s *TableSchema) Copy() *TableSchema

func (*TableSchema) FastColumns

func (s *TableSchema) FastColumns() FastTableSchema

func (*TableSchema) Hash

func (s *TableSchema) Hash() (string, error)

type TxBound

type TxBound struct {
	Left, Right int
}

func SplitByID

func SplitByID(batch []ChangeItem) []TxBound

func TxBounds

func TxBounds(batch []ChangeItem) []TxBound

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL