Documentation ¶
Index ¶
- Constants
- Variables
- func ColIDX(schema []ColSchema) map[string]int
- func ContainsNonRowItem(s []ChangeItem) bool
- func Dump(input []ChangeItem)
- func GetRawMessageData(r ChangeItem) ([]byte, error)
- func IsSystemTable(in string) bool
- func KeyNames(in []ColSchema) []string
- func MakeMapColNameToIndex(in []ColSchema) map[string]int
- func PgName(schemaName, tableName string) string
- func RegisterSystemTables(tableNames ...string)
- func Sniff(input []ChangeItem) string
- func SplitByTableID(batch []ChangeItem) map[TableID][]ChangeItem
- func SplitUpdatedPKeys(input []ChangeItem) [][]ChangeItem
- func SystemTables() []string
- type ChangeItem
- func ChangeItemFromMap(input map[string]interface{}, schema *TableSchema, table string, kind string) ChangeItem
- func Collapse(input []ChangeItem) []ChangeItem
- func FindItemOfKind(s []ChangeItem, kinds ...Kind) *ChangeItem
- func MakeRawMessage(table string, commitTime time.Time, topic string, shard int, offset int64, ...) ChangeItem
- func (c *ChangeItem) AddTableColumn(column ColSchema)
- func (c *ChangeItem) AsMap() map[string]interface{}
- func (c *ChangeItem) ColumnNameIndex(columnName string) int
- func (c *ChangeItem) ColumnNameIndices() map[string]int
- func (c *ChangeItem) CurrentKeysString(keyColumns map[string]bool) string
- func (c *ChangeItem) EnsureSanity() error
- func (c *ChangeItem) Fqtn() string
- func (c *ChangeItem) IsMirror() bool
- func (c *ChangeItem) IsRowEvent() bool
- func (c *ChangeItem) IsSystemKind() bool
- func (c *ChangeItem) IsSystemTable() bool
- func (c *ChangeItem) IsToasted() bool
- func (c *ChangeItem) IsTxDone() bool
- func (c *ChangeItem) KeyCols() []string
- func (c *ChangeItem) KeyVals() []string
- func (c *ChangeItem) KeysAsMap() map[string]interface{}
- func (c *ChangeItem) KeysChanged() bool
- func (c *ChangeItem) MakeMapKeys() map[string]bool
- func (c ChangeItem) MarshalJSON() ([]byte, error)
- func (c ChangeItem) MarshalYSON() ([]byte, error)
- func (c *ChangeItem) Offset() (offset uint64, found bool)
- func (c *ChangeItem) OldOrCurrentKeysString(keyColumns map[string]bool) string
- func (c *ChangeItem) Part() string
- func (c *ChangeItem) PgName() string
- func (c *ChangeItem) RemoveColumns(cols ...string)
- func (c *ChangeItem) SetTableID(tableID TableID)
- func (c *ChangeItem) SetTableSchema(tableSchema *TableSchema)
- func (c *ChangeItem) TableID() TableID
- func (c *ChangeItem) TablePartID() TablePartID
- func (c *ChangeItem) ToJSONString() string
- func (c *ChangeItem) UnmarshalJSON(jsonData []byte) error
- func (c *ChangeItem) UnmarshalYSON(data []byte) error
- type ColSchema
- func (c *ColSchema) AddProperty(key PropertyKey, val interface{})
- func (c *ColSchema) ColPath() string
- func (c *ColSchema) Copy() *ColSchema
- func (c *ColSchema) Fqtn() string
- func (c *ColSchema) IsKey() bool
- func (c *ColSchema) IsNestedKey() bool
- func (c *ColSchema) Numeric() bool
- func (c *ColSchema) String() string
- func (c *ColSchema) TableID() TableID
- type ColumnName
- type DBSchema
- type EventSize
- type FastTableSchema
- type Kind
- type OldKeysType
- type Partition
- type PropertyKey
- type TableColumns
- type TableID
- type TablePartID
- type TableSchema
- type TxBound
Constants ¶
const ( TableConsumerKeeper = "__consumer_keeper" TableLSN = "__data_transfer_lsn" )
const ( DropTableKind = Kind("drop_table") TruncateTableKind = Kind("truncate") // InitShardedTableLoad is table header. // All Init/DoneTableLoad and data items for the same table should be sent strictly after this kind of item InitShardedTableLoad = Kind("init_sharded_table_load") // InitTableLoad is table part header. It should be sent for each table part before uploading any data for that part. // If the table is not sharded, it is considered to have a single part so InitTableLoad should be sent // before uploading the whole table. InitTableLoad = Kind("init_load_table") // DoneTableLoad is table part trailer. It should be sent for each table part after all data events has been sent. // If the table is not sharded, it is considered to have a single part so DoneTableLoad should be sent // after uploading the whole table. DoneTableLoad = Kind("done_load_table") // DoneShardedTableLoad is table trailer. No table control or row items should be sent for the table after this item. DoneShardedTableLoad = Kind("done_sharded_table_load") InsertKind = Kind("insert") UpdateKind = Kind("update") DeleteKind = Kind("delete") PgDDLKind = Kind("pg:DDL") DDLKind = Kind("DDL") MongoUpdateDocumentKind = Kind("mongo:update_document") MongoCreateKind = Kind("mongo:create") MongoDropKind = Kind("mongo:drop") MongoRenameKind = Kind("mongo:rename") MongoDropDatabaseKind = Kind("mongo:dropDatabase") MongoNoop = Kind("mongo:noop") ChCreateTableDistributedKind = Kind("ch:createTableDistributed") ChCreateTableKind = Kind("ch:createTable") ClickhouseDDLBuilderKind = Kind("ch:DDLBuilder") ElasticsearchDumpIndexKind = Kind("es:dumpIndex") SynchronizeKind = Kind("") )
const ( RawMessageTopic = "topic" RawMessagePartition = "partition" RawMessageSeqNo = "seq_no" RawMessageWriteTime = "write_time" RawMessageData = "data" OriginalTypeMirrorBinary = "mirror:binary" )
const (
DefaultPropertyKey = "default"
)
Variables ¶
var ( RawDataSchema = NewTableSchema([]ColSchema{ {ColumnName: RawMessageTopic, DataType: string(schema.TypeString), PrimaryKey: true, Required: true}, {ColumnName: RawMessagePartition, DataType: string(schema.TypeUint32), PrimaryKey: true, Required: true}, {ColumnName: RawMessageSeqNo, DataType: string(schema.TypeUint64), PrimaryKey: true, Required: true}, {ColumnName: RawMessageWriteTime, DataType: string(schema.TypeDatetime), PrimaryKey: true, Required: true}, {ColumnName: RawMessageData, DataType: string(schema.TypeString), OriginalType: OriginalTypeMirrorBinary}, }) RawDataColumns = []string{RawMessageTopic, RawMessagePartition, RawMessageSeqNo, RawMessageWriteTime, RawMessageData} RawDataColsIDX = ColIDX(RawDataSchema.Columns()) )
var RowEventKinds = set.New(InsertKind, UpdateKind, DeleteKind, MongoUpdateDocumentKind)
var SystemKinds = set.New( InitTableLoad, InitShardedTableLoad, DoneTableLoad, DoneShardedTableLoad, DropTableKind, TruncateTableKind, SynchronizeKind, )
Functions ¶
func ContainsNonRowItem ¶
func ContainsNonRowItem(s []ChangeItem) bool
func Dump ¶
func Dump(input []ChangeItem)
func GetRawMessageData ¶
func GetRawMessageData(r ChangeItem) ([]byte, error)
func IsSystemTable ¶
func MakeMapColNameToIndex ¶
MakeMapColNameToIndex returns a mapping of a column name to an index in the given slice
func RegisterSystemTables ¶
func RegisterSystemTables(tableNames ...string)
func SplitByTableID ¶
func SplitByTableID(batch []ChangeItem) map[TableID][]ChangeItem
func SplitUpdatedPKeys ¶
func SplitUpdatedPKeys(input []ChangeItem) [][]ChangeItem
SplitUpdatedPKeys takes a list of changes and splits them into sublists based on updated PKeys. Each sublist includes all the previous changes up to the change with updated PKey. This change split into two sub-changes one delete (for old row) and one insert. Note:
- The order of changes within each sublist is maintained.
func SystemTables ¶
func SystemTables() []string
Types ¶
type ChangeItem ¶
type ChangeItem struct { ID uint32 // Transaction ID. For snapshot: id==0, for mongo on replication is also 0. LSN uint64 // it's ok to be 0 for snapshot CommitTime uint64 Counter int // This field isn't used a lot in code // but in __WAL txPosition used as part of composite primary key // so, it's very important field! Kind Kind // insert/update/delete/truncate/DDL/pg:DDL Schema string // for pg: filled by wal2json - contains "public", for ydb: always empty string Table string // for pg: filled by wal2json - contains tableName, for ydb: depends on 'UseFullPaths' // PartID is used to identify part of sharded upload to make it possible to distinguish items which belong to different parts. // PartIDs must be unique within a single table. Each source may use its own PartID generation rules. // Sinker should interpret PartID as a plain string to be used only to distinguish one part from another and should not attempt to parse or decode it somehow. // PartID should contain only characters that may be used in table identifiers (latin characters, digits, '-' and '_' chars) PartID string // ColumnNames is a write-once field. Its value may be reused across multiple ChangeItems. // Do not use this field in new code! Use column name from table schema instead. // This field may be empty when the table schema is not. ColumnNames []string ColumnValues []interface{} // TableSchema is a write-once field. Its value may be reused across multiple ChangeItems // schema filled on all rows event + init / done table load TableSchema *TableSchema // OldKeys is a set of (PRIMARY) keys identifying the previous version of the tuple which this item represents. // // There are also special cases when this field contains different data: // - PG source: contains PRIMARY KEY fields as well as other fields for which UNIQUE indexes exist. Note that when there is no PRIMARY KEY, PG source considers one of the UNIQUE indexes to be the PRIMARY KEY. When REPLICA IDENTITY FULL is set, contains all fields. // - MySQL source: contains the same fields as columnNames or keys, depends on binlog_row_image. // // There may be other specific cases in addition to the ones mentioned above. In any case, one must rely on TableSchema, not on the presence or absence of particular columns inside this field. OldKeys OldKeysType TxID string // this only valid for mysql so far, for now we will not store it in YT Query string // optional, can be filled only for items from mysql binlogs with binlog_rows_query_log_events enabled Size EventSize // optional event size }
func ChangeItemFromMap ¶
func ChangeItemFromMap(input map[string]interface{}, schema *TableSchema, table string, kind string) ChangeItem
func Collapse ¶
func Collapse(input []ChangeItem) []ChangeItem
Collapse collapses (possible) multiple items in the input into a single (or none) items in the output. Currently, it preserves the order of items in the result. It should only be applied by sinks which support PRIMARY KEYs. For them the order of items is considered to be of no importance.
func FindItemOfKind ¶
func FindItemOfKind(s []ChangeItem, kinds ...Kind) *ChangeItem
FindItemOfKind returns an item in the slice whose kind is among the given ones, or nil if there is no such item in it
func MakeRawMessage ¶
func (*ChangeItem) AddTableColumn ¶
func (c *ChangeItem) AddTableColumn(column ColSchema)
func (*ChangeItem) AsMap ¶
func (c *ChangeItem) AsMap() map[string]interface{}
func (*ChangeItem) ColumnNameIndex ¶
func (c *ChangeItem) ColumnNameIndex(columnName string) int
func (*ChangeItem) ColumnNameIndices ¶
func (c *ChangeItem) ColumnNameIndices() map[string]int
func (*ChangeItem) CurrentKeysString ¶
func (c *ChangeItem) CurrentKeysString(keyColumns map[string]bool) string
func (*ChangeItem) EnsureSanity ¶
func (c *ChangeItem) EnsureSanity() error
EnsureSanity ensures the given item is a sound and sane one which follows the contract between item's fields.
It should be removed one day when the contract is ingrained in the item's structure.
func (*ChangeItem) Fqtn ¶
func (c *ChangeItem) Fqtn() string
func (*ChangeItem) IsMirror ¶
func (c *ChangeItem) IsMirror() bool
IsMirror mirror - it's special format with hardcoded schema - used for "mirroring" (queue->queue) - transfer messages between queues without changes
func (*ChangeItem) IsRowEvent ¶
func (c *ChangeItem) IsRowEvent() bool
func (*ChangeItem) IsSystemKind ¶
func (c *ChangeItem) IsSystemKind() bool
func (*ChangeItem) IsSystemTable ¶
func (c *ChangeItem) IsSystemTable() bool
func (*ChangeItem) IsToasted ¶
func (c *ChangeItem) IsToasted() bool
IsToasted check is change item contains all old-values for update/delete kinds.
func (*ChangeItem) IsTxDone ¶
func (c *ChangeItem) IsTxDone() bool
func (*ChangeItem) KeyCols ¶
func (c *ChangeItem) KeyCols() []string
func (*ChangeItem) KeyVals ¶
func (c *ChangeItem) KeyVals() []string
func (*ChangeItem) KeysAsMap ¶
func (c *ChangeItem) KeysAsMap() map[string]interface{}
func (*ChangeItem) KeysChanged ¶
func (c *ChangeItem) KeysChanged() bool
KeysChanged - checks if update changes primary keys NOTE: this method is quite inefficient
func (*ChangeItem) MakeMapKeys ¶
func (c *ChangeItem) MakeMapKeys() map[string]bool
func (ChangeItem) MarshalJSON ¶
func (c ChangeItem) MarshalJSON() ([]byte, error)
func (ChangeItem) MarshalYSON ¶
func (c ChangeItem) MarshalYSON() ([]byte, error)
func (*ChangeItem) Offset ¶
func (c *ChangeItem) Offset() (offset uint64, found bool)
func (*ChangeItem) OldOrCurrentKeysString ¶
func (c *ChangeItem) OldOrCurrentKeysString(keyColumns map[string]bool) string
OldOrCurrentKeys returns a string representing the values of the columns from the given set of columns, extracted from OldKeys or ColumnValues, if OldKeys are absent
func (*ChangeItem) Part ¶
func (c *ChangeItem) Part() string
func (*ChangeItem) PgName ¶
func (c *ChangeItem) PgName() string
func (*ChangeItem) RemoveColumns ¶
func (c *ChangeItem) RemoveColumns(cols ...string)
RemoveColumns mutate change item to skip some columns from it. it remove it from column names and column values
func (*ChangeItem) SetTableID ¶
func (c *ChangeItem) SetTableID(tableID TableID)
func (*ChangeItem) SetTableSchema ¶
func (c *ChangeItem) SetTableSchema(tableSchema *TableSchema)
func (*ChangeItem) TableID ¶
func (c *ChangeItem) TableID() TableID
func (*ChangeItem) TablePartID ¶
func (c *ChangeItem) TablePartID() TablePartID
func (*ChangeItem) ToJSONString ¶
func (c *ChangeItem) ToJSONString() string
func (*ChangeItem) UnmarshalJSON ¶
func (c *ChangeItem) UnmarshalJSON(jsonData []byte) error
func (*ChangeItem) UnmarshalYSON ¶
func (c *ChangeItem) UnmarshalYSON(data []byte) error
type ColSchema ¶
type ColSchema struct { TableSchema string `json:"table_schema"` // table namespace - for SQL it's database name TableName string `json:"table_name"` Path string `json:"path"` ColumnName string `json:"name"` DataType string `json:"type"` // string with YT type from arcadia/yt/go/schema/schema.go PrimaryKey bool `json:"key"` FakeKey bool `json:"fake_key"` // TODO: remove after migration (TM-1225) Required bool `json:"required"` // Required - it's about 'can field contains nil' Expression string `json:"expression"` // expression for generated columns OriginalType string `json:"original_type"` // db-prefix:db-specific-type. example: "mysql:bigint(20) unsigned" // field to carry additional optional info. // It's either nil or pair key-value. Value can be nil, if it's meaningful Properties map[PropertyKey]interface{} `json:"properties,omitempty"` }
func MakeTypedColSchema ¶
func NewColSchema ¶
func (*ColSchema) AddProperty ¶
func (c *ColSchema) AddProperty(key PropertyKey, val interface{})
func (*ColSchema) IsNestedKey ¶
type DBSchema ¶
type DBSchema map[TableID]*TableSchema
func (DBSchema) CheckPrimaryKeys ¶
func (DBSchema) FakePkeyTables ¶
type EventSize ¶
type EventSize struct { // Read is the byte length of data read to produce an item Read uint64 // Values is the size of ColumnValues Values uint64 }
func EmptyEventSize ¶
func EmptyEventSize() EventSize
func RawEventSize ¶
type FastTableSchema ¶
type FastTableSchema map[ColumnName]ColSchema
FastTableSchema is a mapping from column name to its schema
func MakeFastTableSchema ¶
func MakeFastTableSchema(slowTableSchema []ColSchema) FastTableSchema
MakeFastTableSchema produces a fast table schema from an array of schemas of each column. Column names are taken from these schemas.
type OldKeysType ¶
type OldKeysType struct { KeyNames []string `json:"keynames,omitempty"` KeyTypes []string `json:"keytypes,omitempty"` KeyValues []interface{} `json:"keyvalues,omitempty"` }
func EmptyOldKeys ¶
func EmptyOldKeys() OldKeysType
type Partition ¶
type Partition struct { Cluster string `json:"cluster"` Partition uint32 `json:"partition"` Topic string `json:"topic"` }
func NewPartition ¶
func (Partition) LegacyShittyString ¶
type PropertyKey ¶
type PropertyKey string
type TableColumns ¶
type TableColumns []ColSchema
func (TableColumns) ColumnNames ¶
func (s TableColumns) ColumnNames() []string
func (TableColumns) Copy ¶
func (s TableColumns) Copy() TableColumns
func (TableColumns) HasFakeKeys ¶
func (s TableColumns) HasFakeKeys() bool
func (TableColumns) HasPrimaryKey ¶
func (s TableColumns) HasPrimaryKey() bool
func (TableColumns) KeysNum ¶
func (s TableColumns) KeysNum() int
type TableID ¶
type TableID struct { Namespace string // Schema for PostgreSQL, database name for MySQL and MongoDB; empty string for YT/YDB Name string // Table name for relational databases, collection name for MongoDB. For YDB - full path }
func NewTableID ¶
func (TableID) Fqtn ¶
Fqtn returns a "fully qualified" table name / FQTN. FQTN is a string with special properties - see definition of this function to figure them out.
func (TableID) Includes ¶
Includes returns true if the given ID identifies a subset of tables identified by the current ID.
In other words, it is a result of an operation "is a superset of"
func (*TableID) IsSystemTable ¶
type TablePartID ¶
TablePartID describes table part for sharded upload. Sinker may use this type to distinguish different upload parts of different tables.
func (*TablePartID) FqtnWithPartID ¶
func (t *TablePartID) FqtnWithPartID() string
type TableSchema ¶
type TableSchema struct {
// contains filtered or unexported fields
}
func NewTableSchema ¶
func NewTableSchema(columns []ColSchema) *TableSchema
func (*TableSchema) ColumnNames ¶
func (s *TableSchema) ColumnNames() []string
func (*TableSchema) Columns ¶
func (s *TableSchema) Columns() TableColumns
func (*TableSchema) Copy ¶
func (s *TableSchema) Copy() *TableSchema
func (*TableSchema) FastColumns ¶
func (s *TableSchema) FastColumns() FastTableSchema
func (*TableSchema) Hash ¶
func (s *TableSchema) Hash() (string, error)