Documentation ¶
Index ¶
- Constants
- Variables
- func AddTransformer(t *testing.T, transfer *model.Transfer, transformer abstract.Transformer)
- func CanonizeTableChangeItems(t *testing.T, storage abstract.Storage, table abstract.TableDescription)
- func CheckConnections(labeledPorts ...LabeledPort) error
- func CheckRowsCount(t *testing.T, serverModel interface{}, schema, tableName string, ...)
- func CheckRowsGreaterOrEqual(t *testing.T, serverModel interface{}, schema, tableName string, ...)
- func CompareStorages(t *testing.T, sourceModel, targetModel interface{}, ...) error
- func Deactivate(t *testing.T, transfer *model.Transfer, worker *Worker, ...) error
- func DeactivateErr(transfer *model.Transfer, worker *Worker, onErrorCallback ...func(err error)) error
- func EmptyRegistry() metrics.Registry
- func ExecuteMySQLStatement(t *testing.T, statement string, connectionParams *mysql.ConnectionParams)
- func ExecuteMySQLStatementsLineByLine(t *testing.T, statements string, connectionParams *mysql.ConnectionParams)
- func FilterTechnicalTables(tables abstract.TableMap) []abstract.TableDescription
- func GenerateCanonCheckerValues(t *testing.T, changeItem *abstract.ChangeItem)
- func GetEnvOfFail(t *testing.T, key string) string
- func GetIntFromEnv(varName string) int
- func GetPortFromStr(s string) (int, error)
- func GetSampleableStorageByModel(t *testing.T, serverModel interface{}) abstract.SampleableStorage
- func InitConnectionResolver(connections map[string]connection.ManagedConnection)
- func InitSrcDst(transferID string, src model.Source, dst model.Destination, ...)
- func LoadTable(t *testing.T, storage abstract.Storage, table abstract.TableDescription) []abstract.ChangeItem
- func MakeTransfer(transferID string, src model.Source, dst model.Destination, ...) *model.Transfer
- func MakeTransferForIncrementalSnapshot(transferID string, src model.Source, dst model.Destination, ...) *model.Transfer
- func ManagedConnection(port int, host, dbName, user, password string) *connection.ConnectionMySQL
- func MySQLDump(t *testing.T, storageParams *mysql.MysqlStorageParams) string
- func NewMySQLConnectionParams(t *testing.T, storageParams *mysql.MysqlStorageParams) *mysql.ConnectionParams
- func NewMySQLStorageFromSource(t *testing.T, src *mysql.MysqlSource) *mysql.Storage
- func NewMySQLStorageFromTarget(t *testing.T, dst *mysql.MysqlDestination) *mysql.Storage
- func PgDebeziumIgnoreTemporalAccuracyForArraysComparator(lVal interface{}, lSchema abstract.ColSchema, rVal interface{}, ...) (comparable bool, result bool, err error)
- func RecipeMysqlSource() *mysql.MysqlSource
- func RecipeMysqlSourceWithConnection(connID string) (*mysql.MysqlSource, *connection.ConnectionMySQL)
- func RecipeMysqlTarget() *mysql.MysqlDestination
- func RecipeMysqlTargetWithConnection(connID string) (*mysql.MysqlDestination, *connection.ConnectionMySQL)
- func RemoveColumnsFromChangeItem(item abstract.ChangeItem, columnsToRemove []string) abstract.ChangeItem
- func StrictEquality(l, r string) bool
- func TableIDFullName(tableID abstract.TableID) string
- func TimeWithPrecision(t string, precision int) string
- func UnmarshalChangeItem(t *testing.T, changeItemBuf []byte) *abstract.ChangeItem
- func UnmarshalChangeItemStr(t *testing.T, in string) *abstract.ChangeItem
- func UnmarshalChangeItems(t *testing.T, changeItemBuf []byte) []abstract.ChangeItem
- func UnmarshalChangeItemsStr(t *testing.T, in string) []abstract.ChangeItem
- func WaitCond(maxDuration time.Duration, condFunc func() bool) error
- func WaitDestinationEqualRowsCount(destinationSchema, destinationTableName string, destination abstract.Storage, ...) error
- func WaitEqualRowsCount(t *testing.T, schema, tableName string, source, destination abstract.Storage, ...) error
- func WaitEqualRowsCountDifferentSchemas(t *testing.T, sourceSchema, destinationSchema, tableName string, ...) error
- func WaitEqualRowsCountDifferentTables(t *testing.T, ...) error
- func WaitStoragesSynced(t *testing.T, sourceModel, targetModel interface{}, retries uint64, ...) error
- func WithLocalRuntime(transfer *model.Transfer, jobCount int, processCount int) *model.Transfer
- func WithMysqlInclude(src *mysql.MysqlSource, regex []string) *mysql.MysqlSource
- func YDBInitChangeItem(tablePath string) *abstract.ChangeItem
- func YDBPullDataFromTable(t *testing.T, token, database, instance, table string) []abstract.ChangeItem
- func YDBStmtDelete(t *testing.T, tablePath string, id int) *abstract.ChangeItem
- func YDBStmtDeleteCompoundKey(t *testing.T, tablePath string, ids ...any) *abstract.ChangeItem
- func YDBStmtInsert(t *testing.T, tablePath string, id int) *abstract.ChangeItem
- func YDBStmtInsertNulls(t *testing.T, tablePath string, id int) *abstract.ChangeItem
- func YDBStmtInsertValues(t *testing.T, tablePath string, values []interface{}, id int) *abstract.ChangeItem
- func YDBStmtInsertValuesMultikey(t *testing.T, tablePath string, values []any, ids ...any) *abstract.ChangeItem
- func YDBStmtUpdate(t *testing.T, tablePath string, id int, newInt32Val int) *abstract.ChangeItem
- func YDBStmtUpdateTOAST(t *testing.T, tablePath string, id int, newInt32Val int) *abstract.ChangeItem
- func YDBTwoTablesEqual(t *testing.T, token, database, instance, tableA, tableB string)
- type CanonTypedChangeItem
- type ChangeItemsBuilder
- func (c *ChangeItemsBuilder) Deletes(t *testing.T, oldKeys []map[string]interface{}) []abstract.ChangeItem
- func (c *ChangeItemsBuilder) DoneShardedTableLoad() []abstract.ChangeItem
- func (c *ChangeItemsBuilder) DoneTableLoad() []abstract.ChangeItem
- func (c *ChangeItemsBuilder) InitShardedTableLoad() []abstract.ChangeItem
- func (c *ChangeItemsBuilder) InitTableLoad() []abstract.ChangeItem
- func (c *ChangeItemsBuilder) Inserts(t *testing.T, in []map[string]interface{}) []abstract.ChangeItem
- func (c *ChangeItemsBuilder) Updates(t *testing.T, in []map[string]interface{}, oldKeys []map[string]interface{}) []abstract.ChangeItem
- type CompareStoragesParams
- func (p *CompareStoragesParams) WithEqualDataTypes(equalDataTypes func(lDataType, rDataType string) bool) *CompareStoragesParams
- func (p *CompareStoragesParams) WithPriorityComparators(comparators ...tasks.ChecksumComparator) *CompareStoragesParams
- func (p *CompareStoragesParams) WithTableFilter(tableFilter func(tables abstract.TableMap) []abstract.TableDescription) *CompareStoragesParams
- type FakeStorage
- func (f *FakeStorage) Close()
- func (f *FakeStorage) EstimateTableRowsCount(_ abstract.TableID) (uint64, error)
- func (f *FakeStorage) ExactTableRowsCount(_ abstract.TableID) (uint64, error)
- func (f *FakeStorage) LoadTable(_ context.Context, _ abstract.TableDescription, pusher abstract.Pusher) error
- func (f *FakeStorage) Ping() error
- func (f *FakeStorage) TableExists(_ abstract.TableID) (bool, error)
- func (f *FakeStorage) TableList(_ abstract.IncludeTableList) (abstract.TableMap, error)
- func (f *FakeStorage) TableSchema(_ context.Context, _ abstract.TableID) (*abstract.TableSchema, error)
- type LabeledPort
- type MockSink
- type MySQL2YTTestFixture
- type SimpleTransformer
- func (s *SimpleTransformer) Apply(items []abstract.ChangeItem) abstract.TransformerResult
- func (s *SimpleTransformer) Description() string
- func (s *SimpleTransformer) ResultSchema(original *abstract.TableSchema) (*abstract.TableSchema, error)
- func (s *SimpleTransformer) Suitable(table abstract.TableID, schema *abstract.TableSchema) bool
- func (s *SimpleTransformer) Type() abstract.TransformerType
- type SimpleTransformerApplyUDF
- type SimpleTransformerSuitableUDF
- type TableSchema
- type TestCase
- type TestCaseContainer
- func (c *TestCaseContainer) AddCase(in TestCase)
- func (c *TestCaseContainer) AddChangeItem(t *testing.T, in *abstract.ChangeItem)
- func (c *TestCaseContainer) Check(t *testing.T)
- func (c *TestCaseContainer) ExecStatement(ctx context.Context, t *testing.T, client *pgxpool.Pool)
- func (c *TestCaseContainer) Initialize(t *testing.T)
- func (c *TestCaseContainer) IsEnoughChangeItems(t *testing.T) bool
- type Worker
Constants ¶
View Source
const (
AllCAs = `` /* 3760-byte string literal not displayed */
)
Variables ¶
View Source
var ( YDBTestValues1 = []interface{}{ 2, false, int8(1), int16(2), int32(3), int64(4), uint8(5), uint16(6), uint32(8), uint64(9), float32(21.1), 22.2, "234.000000001", "1.123e3", []byte{2}, "other_utf_8_string", map[string]interface{}{"1": 1}, map[string]interface{}{"2": 2}, time.Date(2022, 2, 2, 0, 0, 0, 0, time.UTC), time.Date(2022, 2, 2, 10, 2, 22, 0, time.UTC), time.Date(2022, 2, 2, 10, 2, 22, 0, time.UTC), time.Duration(234000), } YDBTestValues2 = []interface{}{ 3, true, int8(4), int16(5), int32(6), int64(7), uint8(8), uint16(9), uint32(10), uint64(11), float32(21.1), 32.2, "1234.000000001", ".223e3", []byte{4}, "utf8_string", map[string]interface{}{"3": 6}, map[string]interface{}{"4": 5}, time.Date(2023, 2, 2, 0, 0, 0, 0, time.UTC), time.Date(2023, 2, 2, 10, 2, 22, 0, time.UTC), time.Date(2023, 2, 2, 10, 2, 22, 0, time.UTC), time.Duration(423000), } YDBTestValues3 = []interface{}{ 4, false, int8(9), int16(11), int32(21), int64(31), uint8(41), uint16(51), uint32(71), uint64(81), float32(1.2), 2.4, "4.000000000", "8.323e3", []byte{9}, "4_string_string", map[string]interface{}{"8": 5}, map[string]interface{}{"7": 2}, time.Date(2025, 2, 2, 0, 0, 0, 0, time.UTC), time.Date(2025, 2, 2, 10, 2, 22, 0, time.UTC), time.Date(2025, 2, 2, 10, 2, 22, 0, time.UTC), time.Duration(321000), } YDBTestMultikeyValues1 = []interface{}{ 1, false, int8(127), int16(32767), int32(2147483647), int64(9223372036854775807), uint8(255), uint16(65535), uint32(4294967295), uint64(18446744073709551615), float32(9999.9999), 9999999999.999999, "99999999999999999999999.99999999999999999999999999999999999999999999999", "1.123e3", []byte{8, 8, 0, 0, 5, 5, 5, 3, 5, 3, 5}, "Bobr kurwa", map[string]interface{}{"a": -1}, map[string]interface{}{"b": 2}, time.Date(2024, 4, 8, 18, 38, 0, 0, time.UTC), time.Date(2024, 4, 8, 18, 38, 22, 0, time.UTC), time.Date(2024, 4, 8, 18, 38, 44, 0, time.UTC), time.Duration(4291747200000000 - 1), } YDBTestMultikeyValues2 = []interface{}{ 2, false, int8(-128), int16(-32768), int32(-2147483648), int64(-9223372036854775808), uint8(0), uint16(0), uint32(0), uint64(0), float32(-0.000001), -0.000000000000000001, "-0.0000000000000000000000000000000000000001", "1.123e3", []byte{8, 80, 0, 55, 5, 35, 35}, "Ja pierdole", map[string]interface{}{"x": 1, "y": -2}, map[string]interface{}{"x": -2, "y": -1}, time.Date(1970, 1, 1, 1, 1, 1, 1, time.UTC), time.Date(1970, 1, 1, 1, 1, 1, 2, time.UTC), time.Date(1970, 1, 1, 1, 1, 1, 3, time.UTC), time.Duration(-4291747200000000 + 1), } YDBTestMultikeyValues3 = []interface{}{ 2, true, int8(8), int16(8), int32(0), int64(0), uint8(5), uint16(5), uint32(5), uint64(5), float32(3.5), 3.5, "8800.5553535", "1.123e3", []byte{8, 8, 00, 5, 55, 35, 35}, "prosche pozvonit chem u kogo-to zanimat", map[string]interface{}{"foo": 146, "bar": -238}, map[string]interface{}{"fizz": -64, "buzz": 63}, time.Date(2022, 6, 27, 0, 0, 0, 0, time.UTC), time.Date(2022, 6, 28, 0, 2, 40, 0, time.UTC), time.Date(2022, 6, 29, 0, 5, 20, 0, time.UTC), 24*time.Hour + 2*time.Minute + 40*time.Second, } )
View Source
var TransferID = "dtt"
Functions ¶
func AddTransformer ¶
func CheckConnections ¶
func CheckConnections(labeledPorts ...LabeledPort) error
func CheckRowsCount ¶
func CheckRowsGreaterOrEqual ¶
func CompareStorages ¶
func CompareStorages(t *testing.T, sourceModel, targetModel interface{}, params *CompareStoragesParams) error
func Deactivate ¶
func DeactivateErr ¶
func EmptyRegistry ¶
func ExecuteMySQLStatement ¶
func ExecuteMySQLStatement(t *testing.T, statement string, connectionParams *mysql.ConnectionParams)
func ExecuteMySQLStatementsLineByLine ¶
func ExecuteMySQLStatementsLineByLine(t *testing.T, statements string, connectionParams *mysql.ConnectionParams)
func FilterTechnicalTables ¶
func FilterTechnicalTables(tables abstract.TableMap) []abstract.TableDescription
func GenerateCanonCheckerValues ¶
func GenerateCanonCheckerValues(t *testing.T, changeItem *abstract.ChangeItem)
func GetIntFromEnv ¶
func GetPortFromStr ¶
GetPortFromStr - works when the port is in the end of the string, preceded by a colon
func GetSampleableStorageByModel ¶
func GetSampleableStorageByModel(t *testing.T, serverModel interface{}) abstract.SampleableStorage
func InitConnectionResolver ¶
func InitConnectionResolver(connections map[string]connection.ManagedConnection)
func InitSrcDst ¶
func InitSrcDst(transferID string, src model.Source, dst model.Destination, transferType abstract.TransferType)
func LoadTable ¶
func LoadTable(t *testing.T, storage abstract.Storage, table abstract.TableDescription) []abstract.ChangeItem
func MakeTransfer ¶
func MakeTransfer(transferID string, src model.Source, dst model.Destination, transferType abstract.TransferType) *model.Transfer
func ManagedConnection ¶
func ManagedConnection(port int, host, dbName, user, password string) *connection.ConnectionMySQL
func NewMySQLConnectionParams ¶
func NewMySQLConnectionParams(t *testing.T, storageParams *mysql.MysqlStorageParams) *mysql.ConnectionParams
func RecipeMysqlSource ¶
func RecipeMysqlSource() *mysql.MysqlSource
func RecipeMysqlSourceWithConnection ¶
func RecipeMysqlSourceWithConnection(connID string) (*mysql.MysqlSource, *connection.ConnectionMySQL)
func RecipeMysqlTarget ¶
func RecipeMysqlTarget() *mysql.MysqlDestination
func RecipeMysqlTargetWithConnection ¶
func RecipeMysqlTargetWithConnection(connID string) (*mysql.MysqlDestination, *connection.ConnectionMySQL)
func RemoveColumnsFromChangeItem ¶
func RemoveColumnsFromChangeItem(item abstract.ChangeItem, columnsToRemove []string) abstract.ChangeItem
RemoveColumnsFromChangeItem removes ColumnNames[i] and ColumnValues[i] where ColumnNames[i] is in columnsToRemove.
func StrictEquality ¶
StrictEquality - default callback for checksum - just compare typeNames
func TableIDFullName ¶
func TimeWithPrecision ¶
TimeWithPrecision takes the time in format `01:02:03[.123456]` and returns it with the given precision
func UnmarshalChangeItem ¶
func UnmarshalChangeItem(t *testing.T, changeItemBuf []byte) *abstract.ChangeItem
func UnmarshalChangeItemStr ¶
func UnmarshalChangeItemStr(t *testing.T, in string) *abstract.ChangeItem
func UnmarshalChangeItems ¶
func UnmarshalChangeItems(t *testing.T, changeItemBuf []byte) []abstract.ChangeItem
func UnmarshalChangeItemsStr ¶
func UnmarshalChangeItemsStr(t *testing.T, in string) []abstract.ChangeItem
func WaitEqualRowsCount ¶
func WaitEqualRowsCount( t *testing.T, schema, tableName string, source, destination abstract.Storage, maxDuration time.Duration, ) error
Wait for equals rows count in different DB servers.
func WaitEqualRowsCountDifferentSchemas ¶
func WaitEqualRowsCountDifferentSchemas( t *testing.T, sourceSchema, destinationSchema, tableName string, source, destination abstract.Storage, maxDuration time.Duration, ) error
Wait for equals rows count in different schemas. May be used for wait rows count in same DB.
func WaitEqualRowsCountDifferentTables ¶
func WaitEqualRowsCountDifferentTables( t *testing.T, sourceSchema, sourceTableName, destinationSchema, destinationTableName string, source, destination abstract.Storage, maxDuration time.Duration, ) error
Wait for equals rows count in different schemas and tables. May be used for wait rows count in same DB, same schema and different tables.
func WaitStoragesSynced ¶
func WaitStoragesSynced(t *testing.T, sourceModel, targetModel interface{}, retries uint64, compareParams *CompareStoragesParams) error
func WithLocalRuntime ¶
func WithMysqlInclude ¶
func WithMysqlInclude(src *mysql.MysqlSource, regex []string) *mysql.MysqlSource
func YDBInitChangeItem ¶
func YDBInitChangeItem(tablePath string) *abstract.ChangeItem
func YDBPullDataFromTable ¶
func YDBPullDataFromTable(t *testing.T, token, database, instance, table string) []abstract.ChangeItem
func YDBStmtDelete ¶
func YDBStmtInsert ¶
func YDBStmtInsertNulls ¶
func YDBStmtInsertValues ¶
func YDBStmtInsertValues(t *testing.T, tablePath string, values []interface{}, id int) *abstract.ChangeItem
Test values
func YDBStmtUpdate ¶
func YDBStmtUpdateTOAST ¶
func YDBTwoTablesEqual ¶
Types ¶
type CanonTypedChangeItem ¶
type CanonTypedChangeItem []row
func ToCanonTypedChangeItem ¶
func ToCanonTypedChangeItem(item abstract.ChangeItem) CanonTypedChangeItem
func ToCanonTypedChangeItems ¶
func ToCanonTypedChangeItems(items []abstract.ChangeItem) []CanonTypedChangeItem
type ChangeItemsBuilder ¶
type ChangeItemsBuilder struct { Schema string Table string TableSchema *abstract.TableSchema // contains filtered or unexported fields }
func NewChangeItemsBuilder ¶
func NewChangeItemsBuilder( schema string, table string, tableSchema *abstract.TableSchema, ) ChangeItemsBuilder
func (*ChangeItemsBuilder) Deletes ¶
func (c *ChangeItemsBuilder) Deletes(t *testing.T, oldKeys []map[string]interface{}) []abstract.ChangeItem
func (*ChangeItemsBuilder) DoneShardedTableLoad ¶
func (c *ChangeItemsBuilder) DoneShardedTableLoad() []abstract.ChangeItem
func (*ChangeItemsBuilder) DoneTableLoad ¶
func (c *ChangeItemsBuilder) DoneTableLoad() []abstract.ChangeItem
func (*ChangeItemsBuilder) InitShardedTableLoad ¶
func (c *ChangeItemsBuilder) InitShardedTableLoad() []abstract.ChangeItem
func (*ChangeItemsBuilder) InitTableLoad ¶
func (c *ChangeItemsBuilder) InitTableLoad() []abstract.ChangeItem
func (*ChangeItemsBuilder) Inserts ¶
func (c *ChangeItemsBuilder) Inserts(t *testing.T, in []map[string]interface{}) []abstract.ChangeItem
func (*ChangeItemsBuilder) Updates ¶
func (c *ChangeItemsBuilder) Updates(t *testing.T, in []map[string]interface{}, oldKeys []map[string]interface{}) []abstract.ChangeItem
type CompareStoragesParams ¶
type CompareStoragesParams struct { EqualDataTypes func(lDataType, rDataType string) bool TableFilter func(tables abstract.TableMap) []abstract.TableDescription PriorityComparators []tasks.ChecksumComparator }
func NewCompareStorageParams ¶
func NewCompareStorageParams() *CompareStoragesParams
func (*CompareStoragesParams) WithEqualDataTypes ¶
func (p *CompareStoragesParams) WithEqualDataTypes(equalDataTypes func(lDataType, rDataType string) bool) *CompareStoragesParams
func (*CompareStoragesParams) WithPriorityComparators ¶
func (p *CompareStoragesParams) WithPriorityComparators(comparators ...tasks.ChecksumComparator) *CompareStoragesParams
func (*CompareStoragesParams) WithTableFilter ¶
func (p *CompareStoragesParams) WithTableFilter(tableFilter func(tables abstract.TableMap) []abstract.TableDescription) *CompareStoragesParams
type FakeStorage ¶
type FakeStorage struct {
// contains filtered or unexported fields
}
func NewFakeStorage ¶
func NewFakeStorage(changeItems []abstract.ChangeItem) *FakeStorage
func (*FakeStorage) Close ¶
func (f *FakeStorage) Close()
func (*FakeStorage) EstimateTableRowsCount ¶
func (f *FakeStorage) EstimateTableRowsCount(_ abstract.TableID) (uint64, error)
func (*FakeStorage) ExactTableRowsCount ¶
func (f *FakeStorage) ExactTableRowsCount(_ abstract.TableID) (uint64, error)
func (*FakeStorage) LoadTable ¶
func (f *FakeStorage) LoadTable(_ context.Context, _ abstract.TableDescription, pusher abstract.Pusher) error
func (*FakeStorage) Ping ¶
func (f *FakeStorage) Ping() error
func (*FakeStorage) TableExists ¶
func (f *FakeStorage) TableExists(_ abstract.TableID) (bool, error)
func (*FakeStorage) TableList ¶
func (f *FakeStorage) TableList(_ abstract.IncludeTableList) (abstract.TableMap, error)
func (*FakeStorage) TableSchema ¶
func (f *FakeStorage) TableSchema(_ context.Context, _ abstract.TableID) (*abstract.TableSchema, error)
type LabeledPort ¶
type MockSink ¶
type MockSink struct {
PushCallback func([]abstract.ChangeItem)
}
type MySQL2YTTestFixture ¶
type MySQL2YTTestFixture struct { YTDir ypath.Path YTEnv *yttest.Env Src *mysql.MysqlSource Dst yt.YtDestinationModel SrcStorage *mysql.Storage DstStorage *storage.Storage // contains filtered or unexported fields }
func SetupMySQL2YTTest ¶
func SetupMySQL2YTTest(t *testing.T, src *mysql.MysqlSource, dst yt.YtDestinationModel) *MySQL2YTTestFixture
func (*MySQL2YTTestFixture) Teardown ¶
func (f *MySQL2YTTestFixture) Teardown(t *testing.T)
type SimpleTransformer ¶
type SimpleTransformer struct {
// contains filtered or unexported fields
}
func NewSimpleTransformer ¶
func NewSimpleTransformer(t *testing.T, applyUdf SimpleTransformerApplyUDF, suitableUdf SimpleTransformerSuitableUDF) *SimpleTransformer
func (*SimpleTransformer) Apply ¶
func (s *SimpleTransformer) Apply(items []abstract.ChangeItem) abstract.TransformerResult
func (*SimpleTransformer) Description ¶
func (s *SimpleTransformer) Description() string
func (*SimpleTransformer) ResultSchema ¶
func (s *SimpleTransformer) ResultSchema(original *abstract.TableSchema) (*abstract.TableSchema, error)
func (*SimpleTransformer) Suitable ¶
func (s *SimpleTransformer) Suitable(table abstract.TableID, schema *abstract.TableSchema) bool
func (*SimpleTransformer) Type ¶
func (s *SimpleTransformer) Type() abstract.TransformerType
type SimpleTransformerApplyUDF ¶
type SimpleTransformerApplyUDF func(*testing.T, []abstract.ChangeItem) abstract.TransformerResult
type SimpleTransformerSuitableUDF ¶
type SimpleTransformerSuitableUDF func(abstract.TableID, abstract.TableColumns) bool
type TableSchema ¶
type TableSchema struct {
// contains filtered or unexported fields
}
func MakeTableSchema ¶
func MakeTableSchema(changeItem *abstract.ChangeItem) *TableSchema
func (*TableSchema) NameToTableSchema ¶
type TestCaseContainer ¶
type TestCaseContainer struct {
// contains filtered or unexported fields
}
func NewTestCaseContainer ¶
func NewTestCaseContainer() *TestCaseContainer
func (*TestCaseContainer) AddCase ¶
func (c *TestCaseContainer) AddCase(in TestCase)
func (*TestCaseContainer) AddChangeItem ¶
func (c *TestCaseContainer) AddChangeItem(t *testing.T, in *abstract.ChangeItem)
func (*TestCaseContainer) Check ¶
func (c *TestCaseContainer) Check(t *testing.T)
func (*TestCaseContainer) ExecStatement ¶
func (*TestCaseContainer) Initialize ¶
func (c *TestCaseContainer) Initialize(t *testing.T)
func (*TestCaseContainer) IsEnoughChangeItems ¶
func (c *TestCaseContainer) IsEnoughChangeItems(t *testing.T) bool
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func ActivateErr ¶
func ActivateWithCP ¶
func ActivateWithCP(transfer *model.Transfer, cp coordinator.Coordinator) (*Worker, error)
Source Files ¶
- abstract.go
- activate_delivery_wrapper.go
- canon_typed_changeitems.go
- canonization.go
- changeitem_helpers.go
- compare_storages.go
- connections.go
- deactivate_delivery_wrapper.go
- debezium_pg_array_comparator.go
- fake_storage.go
- gp_helpers.go
- load_table.go
- mock_sink.go
- mysql_helpers.go
- mysql_yt_helpers.go
- replication.go
- table_schema.go
- test_case.go
- transformers.go
- utils.go
- ydb.go
Click to show internal directories.
Click to hide internal directories.