helpers

package
v0.0.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 46 Imported by: 0

README

Documentation

Index

Constants

View Source
const (
	AllCAs = `` /* 3760-byte string literal not displayed */

)

Variables

View Source
var (
	YDBTestValues1 = []interface{}{
		2,
		false,
		int8(1),
		int16(2),
		int32(3),
		int64(4),
		uint8(5),
		uint16(6),
		uint32(8),
		uint64(9),
		float32(21.1),
		22.2,
		"234.000000001",
		"1.123e3",
		[]byte{2},
		"other_utf_8_string",
		map[string]interface{}{"1": 1},
		map[string]interface{}{"2": 2},
		time.Date(2022, 2, 2, 0, 0, 0, 0, time.UTC),
		time.Date(2022, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Date(2022, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Duration(234000),
	}

	YDBTestValues2 = []interface{}{
		3,
		true,
		int8(4),
		int16(5),
		int32(6),
		int64(7),
		uint8(8),
		uint16(9),
		uint32(10),
		uint64(11),
		float32(21.1),
		32.2,
		"1234.000000001",
		".223e3",
		[]byte{4},
		"utf8_string",
		map[string]interface{}{"3": 6},
		map[string]interface{}{"4": 5},
		time.Date(2023, 2, 2, 0, 0, 0, 0, time.UTC),
		time.Date(2023, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Date(2023, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Duration(423000),
	}

	YDBTestValues3 = []interface{}{
		4,
		false,
		int8(9),
		int16(11),
		int32(21),
		int64(31),
		uint8(41),
		uint16(51),
		uint32(71),
		uint64(81),
		float32(1.2),
		2.4,
		"4.000000000",
		"8.323e3",
		[]byte{9},
		"4_string_string",
		map[string]interface{}{"8": 5},
		map[string]interface{}{"7": 2},
		time.Date(2025, 2, 2, 0, 0, 0, 0, time.UTC),
		time.Date(2025, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Date(2025, 2, 2, 10, 2, 22, 0, time.UTC),
		time.Duration(321000),
	}

	YDBTestMultikeyValues1 = []interface{}{
		1,
		false,
		int8(127),
		int16(32767),
		int32(2147483647),
		int64(9223372036854775807),
		uint8(255),
		uint16(65535),
		uint32(4294967295),
		uint64(18446744073709551615),
		float32(9999.9999),
		9999999999.999999,
		"99999999999999999999999.99999999999999999999999999999999999999999999999",
		"1.123e3",
		[]byte{8, 8, 0, 0, 5, 5, 5, 3, 5, 3, 5},
		"Bobr kurwa",
		map[string]interface{}{"a": -1},
		map[string]interface{}{"b": 2},
		time.Date(2024, 4, 8, 18, 38, 0, 0, time.UTC),
		time.Date(2024, 4, 8, 18, 38, 22, 0, time.UTC),
		time.Date(2024, 4, 8, 18, 38, 44, 0, time.UTC),
		time.Duration(4291747200000000 - 1),
	}

	YDBTestMultikeyValues2 = []interface{}{
		2,
		false,
		int8(-128),
		int16(-32768),
		int32(-2147483648),
		int64(-9223372036854775808),
		uint8(0),
		uint16(0),
		uint32(0),
		uint64(0),
		float32(-0.000001),
		-0.000000000000000001,
		"-0.0000000000000000000000000000000000000001",
		"1.123e3",
		[]byte{8, 80, 0, 55, 5, 35, 35},
		"Ja pierdole",
		map[string]interface{}{"x": 1, "y": -2},
		map[string]interface{}{"x": -2, "y": -1},
		time.Date(1970, 1, 1, 1, 1, 1, 1, time.UTC),
		time.Date(1970, 1, 1, 1, 1, 1, 2, time.UTC),
		time.Date(1970, 1, 1, 1, 1, 1, 3, time.UTC),
		time.Duration(-4291747200000000 + 1),
	}

	YDBTestMultikeyValues3 = []interface{}{
		2,
		true,
		int8(8),
		int16(8),
		int32(0),
		int64(0),
		uint8(5),
		uint16(5),
		uint32(5),
		uint64(5),
		float32(3.5),
		3.5,
		"8800.5553535",
		"1.123e3",
		[]byte{8, 8, 00, 5, 55, 35, 35},
		"prosche pozvonit chem u kogo-to zanimat",
		map[string]interface{}{"foo": 146, "bar": -238},
		map[string]interface{}{"fizz": -64, "buzz": 63},
		time.Date(2022, 6, 27, 0, 0, 0, 0, time.UTC),
		time.Date(2022, 6, 28, 0, 2, 40, 0, time.UTC),
		time.Date(2022, 6, 29, 0, 5, 20, 0, time.UTC),
		24*time.Hour + 2*time.Minute + 40*time.Second,
	}
)
View Source
var TransferID = "dtt"

Functions

func AddTransformer

func AddTransformer(t *testing.T, transfer *server.Transfer, transformer abstract.Transformer)

func CanonizeTableChangeItems

func CanonizeTableChangeItems(t *testing.T, storage abstract.Storage, table abstract.TableDescription)

func CheckConnections

func CheckConnections(labeledPorts ...LabeledPort) error

func CheckRowsCount

func CheckRowsCount(t *testing.T, serverModel interface{}, schema, tableName string, expectedRows uint64)

func CheckRowsGreaterOrEqual

func CheckRowsGreaterOrEqual(t *testing.T, serverModel interface{}, schema, tableName string, expectedMinumumRows uint64)

func CompareStorages

func CompareStorages(t *testing.T, sourceModel, targetModel interface{}, params *CompareStoragesParams) error

func Deactivate

func Deactivate(t *testing.T, transfer *server.Transfer, worker *Worker, onErrorCallback ...func(err error)) error

func DeactivateErr

func DeactivateErr(transfer *server.Transfer, worker *Worker, onErrorCallback ...func(err error)) error

func EmptyRegistry

func EmptyRegistry() metrics.Registry

func ExecuteMySQLStatement

func ExecuteMySQLStatement(t *testing.T, statement string, connectionParams *mysql.ConnectionParams)

func ExecuteMySQLStatementsLineByLine

func ExecuteMySQLStatementsLineByLine(t *testing.T, statements string, connectionParams *mysql.ConnectionParams)

func FilterTechnicalTables

func FilterTechnicalTables(tables abstract.TableMap) []abstract.TableDescription

func GenerateCanonCheckerValues

func GenerateCanonCheckerValues(t *testing.T, changeItem *abstract.ChangeItem)

func GetEnvOfFail

func GetEnvOfFail(t *testing.T, key string) string

func GetIntFromEnv

func GetIntFromEnv(varName string) int

func GetPortFromStr

func GetPortFromStr(s string) (int, error)

GetPortFromStr - works when the port is in the end of the string, preceded by a colon

func GetSampleableStorageByModel

func GetSampleableStorageByModel(t *testing.T, serverModel interface{}) abstract.SampleableStorage

func InitSrcDst

func InitSrcDst(transferID string, src server.Source, dst server.Destination, transferType abstract.TransferType)

func LoadTable

func LoadTable(t *testing.T, storage abstract.Storage, table abstract.TableDescription) []abstract.ChangeItem

func MakeTransfer

func MakeTransfer(transferID string, src server.Source, dst server.Destination, transferType abstract.TransferType) *server.Transfer

func MakeTransferForIncrementalSnapshot

func MakeTransferForIncrementalSnapshot(transferID string, src server.Source, dst server.Destination, transferType abstract.TransferType,
	namespace, tableName, cursorField, initialState string, incrementDelay int64) *server.Transfer

func MySQLDump

func MySQLDump(t *testing.T, storageParams *mysql.MysqlStorageParams) string

func NewMySQLConnectionParams

func NewMySQLConnectionParams(t *testing.T, storageParams *mysql.MysqlStorageParams) *mysql.ConnectionParams

func NewMySQLStorageFromSource

func NewMySQLStorageFromSource(t *testing.T, src *mysql.MysqlSource) *mysql.Storage

func NewMySQLStorageFromTarget

func NewMySQLStorageFromTarget(t *testing.T, dst *mysql.MysqlDestination) *mysql.Storage

func PgDebeziumIgnoreTemporalAccuracyForArraysComparator

func PgDebeziumIgnoreTemporalAccuracyForArraysComparator(lVal interface{}, lSchema abstract.ColSchema, rVal interface{}, rSchema abstract.ColSchema, intoArray bool) (comparable bool, result bool, err error)

func RecipeMysqlSource

func RecipeMysqlSource() *mysql.MysqlSource

func RecipeMysqlTarget

func RecipeMysqlTarget() *mysql.MysqlDestination

func RemoveColumnsFromChangeItem

func RemoveColumnsFromChangeItem(item abstract.ChangeItem, columnsToRemove []string) abstract.ChangeItem

RemoveColumnsFromChangeItem removes ColumnNames[i] and ColumnValues[i] where ColumnNames[i] is in columnsToRemove.

func StrictEquality

func StrictEquality(l, r string) bool

StrictEquality - default callback for checksum - just compare typeNames

func TableIDFullName

func TableIDFullName(tableID abstract.TableID) string

func TimeWithPrecision

func TimeWithPrecision(t string, precision int) string

TimeWithPrecision takes the time in format `01:02:03[.123456]` and returns it with the given precision

func UnmarshalChangeItem

func UnmarshalChangeItem(t *testing.T, changeItemBuf []byte) *abstract.ChangeItem

func UnmarshalChangeItemStr

func UnmarshalChangeItemStr(t *testing.T, in string) *abstract.ChangeItem

func UnmarshalChangeItems

func UnmarshalChangeItems(t *testing.T, changeItemBuf []byte) []abstract.ChangeItem

func UnmarshalChangeItemsStr

func UnmarshalChangeItemsStr(t *testing.T, in string) []abstract.ChangeItem

func WaitCond

func WaitCond(maxDuration time.Duration, condFunc func() bool) error

func WaitDestinationEqualRowsCount

func WaitDestinationEqualRowsCount(
	destinationSchema, destinationTableName string,
	destination abstract.Storage,
	maxDuration time.Duration,
	sourceTableRowsCount uint64,
) error

func WaitEqualRowsCount

func WaitEqualRowsCount(
	t *testing.T,
	schema, tableName string,
	source, destination abstract.Storage,
	maxDuration time.Duration,
) error

Wait for equals rows count in different DB servers.

func WaitEqualRowsCountDifferentSchemas

func WaitEqualRowsCountDifferentSchemas(
	t *testing.T,
	sourceSchema, destinationSchema, tableName string,
	source, destination abstract.Storage,
	maxDuration time.Duration,
) error

Wait for equals rows count in different schemas. May be used for wait rows count in same DB.

func WaitEqualRowsCountDifferentTables

func WaitEqualRowsCountDifferentTables(
	t *testing.T,
	sourceSchema, sourceTableName, destinationSchema, destinationTableName string,
	source, destination abstract.Storage,
	maxDuration time.Duration,
) error

Wait for equals rows count in different schemas and tables. May be used for wait rows count in same DB, same schema and different tables.

func WaitStoragesSynced

func WaitStoragesSynced(t *testing.T, sourceModel, targetModel interface{}, retries uint64, compareParams *CompareStoragesParams) error

func WithLocalRuntime

func WithLocalRuntime(transfer *server.Transfer, jobCount int, processCount int) *server.Transfer

func WithMysqlInclude

func WithMysqlInclude(src *mysql.MysqlSource, regex []string) *mysql.MysqlSource

func YDBInitChangeItem

func YDBInitChangeItem(tablePath string) *abstract.ChangeItem

func YDBPullDataFromTable

func YDBPullDataFromTable(t *testing.T, token, database, instance, table string) []abstract.ChangeItem

func YDBStmtDelete

func YDBStmtDelete(t *testing.T, tablePath string, id int) *abstract.ChangeItem

func YDBStmtDeleteCompoundKey

func YDBStmtDeleteCompoundKey(t *testing.T, tablePath string, ids ...any) *abstract.ChangeItem

func YDBStmtInsert

func YDBStmtInsert(t *testing.T, tablePath string, id int) *abstract.ChangeItem

func YDBStmtInsertNulls

func YDBStmtInsertNulls(t *testing.T, tablePath string, id int) *abstract.ChangeItem

func YDBStmtInsertValues

func YDBStmtInsertValues(t *testing.T, tablePath string, values []interface{}, id int) *abstract.ChangeItem

Test values

func YDBStmtInsertValuesMultikey

func YDBStmtInsertValuesMultikey(t *testing.T, tablePath string, values []any, ids ...any) *abstract.ChangeItem

func YDBStmtUpdate

func YDBStmtUpdate(t *testing.T, tablePath string, id int, newInt32Val int) *abstract.ChangeItem

func YDBStmtUpdateTOAST

func YDBStmtUpdateTOAST(t *testing.T, tablePath string, id int, newInt32Val int) *abstract.ChangeItem

func YDBTwoTablesEqual

func YDBTwoTablesEqual(t *testing.T, token, database, instance, tableA, tableB string)

Types

type CanonTypedChangeItem

type CanonTypedChangeItem []row

func ToCanonTypedChangeItem

func ToCanonTypedChangeItem(item abstract.ChangeItem) CanonTypedChangeItem

func ToCanonTypedChangeItems

func ToCanonTypedChangeItems(items []abstract.ChangeItem) []CanonTypedChangeItem

type ChangeItemsBuilder

type ChangeItemsBuilder struct {
	Schema      string
	Table       string
	TableSchema *abstract.TableSchema
	// contains filtered or unexported fields
}

func NewChangeItemsBuilder

func NewChangeItemsBuilder(
	schema string,
	table string,
	tableSchema *abstract.TableSchema,
) ChangeItemsBuilder

func (*ChangeItemsBuilder) Deletes

func (c *ChangeItemsBuilder) Deletes(t *testing.T, oldKeys []map[string]interface{}) []abstract.ChangeItem

func (*ChangeItemsBuilder) DoneShardedTableLoad

func (c *ChangeItemsBuilder) DoneShardedTableLoad() []abstract.ChangeItem

func (*ChangeItemsBuilder) DoneTableLoad

func (c *ChangeItemsBuilder) DoneTableLoad() []abstract.ChangeItem

func (*ChangeItemsBuilder) InitShardedTableLoad

func (c *ChangeItemsBuilder) InitShardedTableLoad() []abstract.ChangeItem

func (*ChangeItemsBuilder) InitTableLoad

func (c *ChangeItemsBuilder) InitTableLoad() []abstract.ChangeItem

func (*ChangeItemsBuilder) Inserts

func (c *ChangeItemsBuilder) Inserts(t *testing.T, in []map[string]interface{}) []abstract.ChangeItem

func (*ChangeItemsBuilder) Updates

func (c *ChangeItemsBuilder) Updates(t *testing.T, in []map[string]interface{}, oldKeys []map[string]interface{}) []abstract.ChangeItem

type CompareStoragesParams

type CompareStoragesParams struct {
	EqualDataTypes      func(lDataType, rDataType string) bool
	TableFilter         func(tables abstract.TableMap) []abstract.TableDescription
	PriorityComparators []tasks.ChecksumComparator
}

func NewCompareStorageParams

func NewCompareStorageParams() *CompareStoragesParams

func (*CompareStoragesParams) WithEqualDataTypes

func (p *CompareStoragesParams) WithEqualDataTypes(equalDataTypes func(lDataType, rDataType string) bool) *CompareStoragesParams

func (*CompareStoragesParams) WithPriorityComparators

func (p *CompareStoragesParams) WithPriorityComparators(comparators ...tasks.ChecksumComparator) *CompareStoragesParams

func (*CompareStoragesParams) WithTableFilter

func (p *CompareStoragesParams) WithTableFilter(tableFilter func(tables abstract.TableMap) []abstract.TableDescription) *CompareStoragesParams

type FakeStorage

type FakeStorage struct {
	// contains filtered or unexported fields
}

func NewFakeStorage

func NewFakeStorage(changeItems []abstract.ChangeItem) *FakeStorage

func (*FakeStorage) Close

func (f *FakeStorage) Close()

func (*FakeStorage) EstimateTableRowsCount

func (f *FakeStorage) EstimateTableRowsCount(_ abstract.TableID) (uint64, error)

func (*FakeStorage) ExactTableRowsCount

func (f *FakeStorage) ExactTableRowsCount(_ abstract.TableID) (uint64, error)

func (*FakeStorage) LoadTable

func (*FakeStorage) Ping

func (f *FakeStorage) Ping() error

func (*FakeStorage) TableExists

func (f *FakeStorage) TableExists(_ abstract.TableID) (bool, error)

func (*FakeStorage) TableList

func (*FakeStorage) TableSchema

type LabeledPort

type LabeledPort struct {
	Label string
	Port  int
}

type MockSink

type MockSink struct {
	PushCallback func([]abstract.ChangeItem)
}

func (*MockSink) Close

func (s *MockSink) Close() error

func (*MockSink) Push

func (s *MockSink) Push(input []abstract.ChangeItem) error

type MySQL2YTTestFixture

type MySQL2YTTestFixture struct {
	YTDir      ypath.Path
	YTEnv      *yttest.Env
	Src        *mysql.MysqlSource
	Dst        yt.YtDestinationModel
	SrcStorage *mysql.Storage
	DstStorage *storage.Storage
	// contains filtered or unexported fields
}

func (*MySQL2YTTestFixture) Teardown

func (f *MySQL2YTTestFixture) Teardown(t *testing.T)

type SimpleTransformer

type SimpleTransformer struct {
	// contains filtered or unexported fields
}

func NewSimpleTransformer

func NewSimpleTransformer(t *testing.T, applyUdf SimpleTransformerApplyUDF, suitableUdf SimpleTransformerSuitableUDF) *SimpleTransformer

func (*SimpleTransformer) Apply

func (*SimpleTransformer) Description

func (s *SimpleTransformer) Description() string

func (*SimpleTransformer) ResultSchema

func (s *SimpleTransformer) ResultSchema(original *abstract.TableSchema) (*abstract.TableSchema, error)

func (*SimpleTransformer) Suitable

func (s *SimpleTransformer) Suitable(table abstract.TableID, schema *abstract.TableSchema) bool

func (*SimpleTransformer) Type

type SimpleTransformerApplyUDF

type SimpleTransformerApplyUDF func(*testing.T, []abstract.ChangeItem) abstract.TransformerResult

type SimpleTransformerSuitableUDF

type SimpleTransformerSuitableUDF func(abstract.TableID, abstract.TableColumns) bool

type TableSchema

type TableSchema struct {
	// contains filtered or unexported fields
}

func MakeTableSchema

func MakeTableSchema(changeItem *abstract.ChangeItem) *TableSchema

func (*TableSchema) NameToTableSchema

func (t *TableSchema) NameToTableSchema(tt *testing.T, colName string) *abstract.ColSchema

type TestCase

type TestCase interface {
	TableName() string
	Initialize(t *testing.T)
	ExecStatement(ctx context.Context, t *testing.T, client *pgxpool.Pool)
	AddChangeItem(in *abstract.ChangeItem)
	IsEnoughChangeItems() bool
	Check(t *testing.T)
}

type TestCaseContainer

type TestCaseContainer struct {
	// contains filtered or unexported fields
}

func NewTestCaseContainer

func NewTestCaseContainer() *TestCaseContainer

func (*TestCaseContainer) AddCase

func (c *TestCaseContainer) AddCase(in TestCase)

func (*TestCaseContainer) AddChangeItem

func (c *TestCaseContainer) AddChangeItem(t *testing.T, in *abstract.ChangeItem)

func (*TestCaseContainer) Check

func (c *TestCaseContainer) Check(t *testing.T)

func (*TestCaseContainer) ExecStatement

func (c *TestCaseContainer) ExecStatement(ctx context.Context, t *testing.T, client *pgxpool.Pool)

func (*TestCaseContainer) Initialize

func (c *TestCaseContainer) Initialize(t *testing.T)

func (*TestCaseContainer) IsEnoughChangeItems

func (c *TestCaseContainer) IsEnoughChangeItems(t *testing.T) bool

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func Activate

func Activate(t *testing.T, transfer *server.Transfer, onErrorCallback ...func(err error)) *Worker

func ActivateErr

func ActivateErr(transfer *server.Transfer, onErrorCallback ...func(err error)) (*Worker, error)

func ActivateWithCP

func ActivateWithCP(transfer *server.Transfer, cp coordinator.Coordinator) (*Worker, error)

func (*Worker) Close

func (q *Worker) Close(t *testing.T)

func (*Worker) Restart

func (q *Worker) Restart(t *testing.T, transfer *server.Transfer)

Restart replication worker with updated transfer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL