abstract

package
v0.0.0-rc7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProviderTypeMock = ProviderType("mock")
	ProviderTypeNone = ProviderType("none")
)
View Source
const (
	TransferTypeNone                 = TransferType("TRANSFER_TYPE_UNSPECIFIED")
	TransferTypeSnapshotAndIncrement = TransferType("SNAPSHOT_AND_INCREMENT")
	TransferTypeSnapshotOnly         = TransferType("SNAPSHOT_ONLY")
	TransferTypeIncrementOnly        = TransferType("INCREMENT_ONLY")
)
View Source
const (
	LocalRuntimeType = RuntimeType("local")
)
View Source
const OriginalTypeMirrorBinary = changeitem.OriginalTypeMirrorBinary
View Source
const TableConsumerKeeper = changeitem.TableConsumerKeeper
View Source
const TableLSN = changeitem.TableLSN
View Source
const TestVersion = 2

TestVersion is the version of unit tests that this will pass

Variables

View Source
var AllTasks []Task = makeAllTasks()
View Source
var AsyncPushConcurrencyErr = xerrors.NewSentinel("AsyncPush is called after Close")

AsyncPushConcurrencyErr indicates a Push has been called on an already closed AsyncSink. This must not happen and means there are concurrency issues in the implementation of a source.

View Source
var ChCreateTableDistributedKind = changeitem.ChCreateTableDistributedKind
View Source
var ChCreateTableKind = changeitem.ChCreateTableKind
View Source
var ChangeItemFromMap = changeitem.ChangeItemFromMap
View Source
var CheckErrorWrapping = dterrors.CheckErrorWrapping
View Source
var CheckOpaqueErrorWrapping = dterrors.CheckOpaqueErrorWrapping
View Source
var ClickhouseDDLBuilderKind = changeitem.ClickhouseDDLBuilderKind
View Source
var ContainsNonRowItem = changeitem.ContainsNonRowItem
View Source
var DeleteKind = changeitem.DeleteKind
View Source
var DoneShardedTableLoad = changeitem.DoneShardedTableLoad
View Source
var DoneTableLoad = changeitem.DoneTableLoad
View Source
var DropTableKind = changeitem.DropTableKind
View Source
var ElasticsearchDumpIndexKind = changeitem.ElasticsearchDumpIndexKind
View Source
var EmptyEventSize = changeitem.EmptyEventSize
View Source
var EmptyOldKeys = changeitem.EmptyOldKeys
View Source
var FakeTasks []FakeTask = makeFakeTasks()
View Source
var FindItemOfKind = changeitem.FindItemOfKind
View Source
var GetRawMessageData = changeitem.GetRawMessageData
View Source
var InitShardedTableLoad = changeitem.InitShardedTableLoad
View Source
var InitTableLoad = changeitem.InitTableLoad
View Source
var InsertKind = changeitem.InsertKind
View Source
var IsFatal = dterrors.IsFatal
View Source
var IsNonShardableError = dterrors.IsNonShardableError
View Source
var IsRetriablePartUploadError = dterrors.IsRetriablePartUploadError
View Source
var IsSystemTable = changeitem.IsSystemTable
View Source
var IsTableUploadError = dterrors.IsTableUploadError
View Source
var MakeFastTableSchema = changeitem.MakeFastTableSchema
View Source
var MakeMapColNameToIndex = changeitem.MakeMapColNameToIndex
View Source
var MakeOriginallyTypedColSchema = changeitem.MakeOriginallyTypedColSchema
View Source
var MakeRawMessage = changeitem.MakeRawMessage
View Source
var MakeTypedColSchema = changeitem.MakeTypedColSchema
View Source
var MongoCreateKind = changeitem.MongoCreateKind
View Source
var MongoDropDatabaseKind = changeitem.MongoDropDatabaseKind
View Source
var MongoDropKind = changeitem.MongoDropKind
View Source
var MongoNoop = changeitem.MongoNoop
View Source
var MongoRenameKind = changeitem.MongoRenameKind
View Source
var MongoUpdateDocumentKind = changeitem.MongoUpdateDocumentKind
View Source
var NewColSchema = changeitem.NewColSchema
View Source
var NewFatalError = dterrors.NewFatalError
View Source
var NewNonShardableError = dterrors.NewNonShardableError
View Source
var NewPartition = changeitem.NewPartition
View Source
var NewRetriablePartUploadError = dterrors.NewRetriablePartUploadError
View Source
var NewStrictifyError = strictify.NewStrictifyError
View Source
var NewTableID = changeitem.NewTableID
View Source
var NewTableSchema = changeitem.NewTableSchema
View Source
var NewTableUploadError = dterrors.NewTableUploadError
View Source
var PgDDLKind = changeitem.PgDDLKind
View Source
var RawDataColsIDX = changeitem.RawDataColsIDX
View Source
var RawDataSchema = changeitem.RawDataSchema
View Source
var RawEventSize = changeitem.RawEventSize
View Source
var RawMessagePartition = changeitem.RawMessagePartition
View Source
var RawMessageSeqNo = changeitem.RawMessageSeqNo
View Source
var RawMessageTopic = changeitem.RawMessageTopic
View Source
var RawMessageWriteTime = changeitem.RawMessageWriteTime
View Source
var RegisterSystemTables = changeitem.RegisterSystemTables
View Source
var RunnableTasks []RunnableTask = makeRunnableTasks()
View Source
var SplitByID = changeitem.SplitByID
View Source
var SplitByTableID = changeitem.SplitByTableID
View Source
var SplitUpdatedPKeys = changeitem.SplitUpdatedPKeys
View Source
var SynchronizeKind = changeitem.SynchronizeKind
View Source
var TaskTypeByName map[TaskTypeName]TaskType = makeTaskTypeByNameMap()
View Source
var TruncateTableKind = changeitem.TruncateTableKind
View Source
var UpdateKind = changeitem.UpdateKind

Functions

func AddTransformer

func AddTransformer(transformerName string, transformer SinkOption) error

func BalanceBrackets

func BalanceBrackets(phrase string) bool

func BuildIncludeMap

func BuildIncludeMap(objects []string) (map[TableID]bool, error)

func DefaultValue

func DefaultValue(c *changeitem.ColSchema) interface{}

DefaultValue returns a default instance of the type represented by this schema. This method only works safely in heterogenous transfers.

func GetTypeFromString

func GetTypeFromString(dataType string) (schema.Type, error)

func IsMysqlBinaryType

func IsMysqlBinaryType(in string) bool

func KnownRuntime

func KnownRuntime(r RuntimeType) bool

func ParseFilter

func ParseFilter(rawFilter string) (*Table, *WhereStatement, error)

func RegisterProviderName

func RegisterProviderName(providerType ProviderType, name string)

func RegisterRuntime

func RegisterRuntime(r RuntimeType, f func(spec string) (Runtime, error))

func Restore

func Restore(column ColSchema, value interface{}) interface{}

func RestoreChangeItems

func RestoreChangeItems(batch []ChangeItem)

func Rows

func Rows(metrics metrics.Registry, table string, rows int)

func ToYtSchema

func ToYtSchema(original []ColSchema, fixAnyTypeInPrimaryKey bool) []schema.Column

func TrimMySQLType

func TrimMySQLType(rawColumnType string) string

func ValidateChangeItem

func ValidateChangeItem(changeItem *ChangeItem) error

func ValidateChangeItems

func ValidateChangeItems(changeItems []ChangeItem) error

func ValidateChangeItemsPtrs

func ValidateChangeItemsPtrs(changeItems []*ChangeItem) error

Types

type Activate

type Activate struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Activate) Visit

func (t Activate) Visit(v TaskVisitor) interface{}

func (Activate) VisitRunnable

func (t Activate) VisitRunnable(v RunnableVisitor) interface{}

type AddTables

type AddTables struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (AddTables) Visit

func (t AddTables) Visit(v TaskVisitor) interface{}

func (AddTables) VisitRunnable

func (t AddTables) VisitRunnable(v RunnableVisitor) interface{}

type AsyncMiddleware

type AsyncMiddleware func(sink AsyncSink) AsyncSink

func ComposeAsyncMiddleware

func ComposeAsyncMiddleware(mw ...AsyncMiddleware) AsyncMiddleware

ComposeAsyncMiddleware builds a pipeline of AsyncMiddlewares. Arguments order should be the same as the desired data flow. ComposeAsyncMiddleware(A, B, C)(sink) call is equivalent to A(B(C(sink)))

type AsyncSink

type AsyncSink interface {
	io.Closer
	// AsyncPush writes items asynchronously. The error for the given batch of items will be written into the resulting channel when an underlying (synchronous) Push actually happens.
	// Note, that AsyncPush takes ownership on slice `items`, so it shouldn't be further used.
	AsyncPush(items []ChangeItem) chan error
}

AsyncSink provides asynchronous Push operation, which should be a wrapper over synchronous Push implemented by sink.

All of its methods may be called concurrently.

type Asyncer

type Asyncer func(sinker Sinker) AsyncSink

type Block

type Block interface {
	GetData() []map[string]interface{}
}

type ChangeItem

type ChangeItem = changeitem.ChangeItem

func MakeDoneTableLoad

func MakeDoneTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem

func MakeInitTableLoad

func MakeInitTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem

func MakeSynchronizeEvent

func MakeSynchronizeEvent() ChangeItem

func MakeTxDone

func MakeTxDone(txSequence uint32, lsn uint64, execTS time.Time, lastPushedGTID, gtidStr string) ChangeItem

func UnmarshalChangeItem

func UnmarshalChangeItem(changeItemBuf []byte) (*ChangeItem, error)

func UnmarshalChangeItems

func UnmarshalChangeItems(changeItemsBuf []byte) ([]ChangeItem, error)

type CheckResult

type CheckResult struct {
	Error   error
	Success bool
}

CheckResult describe particular check result that was performed against endpoint / transfer

type CheckType

type CheckType string

CheckType test check type

type Checksum

type Checksum struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Checksum) Visit

func (t Checksum) Visit(v TaskVisitor) interface{}

func (Checksum) VisitRunnable

func (t Checksum) VisitRunnable(v RunnableVisitor) interface{}

type CleanupResource

type CleanupResource struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (CleanupResource) Visit

func (t CleanupResource) Visit(v TaskVisitor) interface{}

func (CleanupResource) VisitRunnable

func (t CleanupResource) VisitRunnable(v RunnableVisitor) interface{}

type Closeable

type Closeable interface {
	// Must be safe for concurrent use
	Close()
}

type ColSchema

type ColSchema = changeitem.ColSchema

type ColumnInfo

type ColumnInfo struct {
	Name     string
	Type     *ColumnType
	Nullable bool
}

type ColumnName

type ColumnName = changeitem.ColumnName

type ColumnSchema

type ColumnSchema struct {
	Name    string      `yson:"name" json:"name"`
	YTType  schema.Type `yson:"type" json:"type"`
	Primary bool        `json:"primary"`
}

type ColumnType

type ColumnType interface {
	GetRepresentation() string
}

type Committable

type Committable interface {
	Sinker

	// Commit commits all changes made by all sinks during the operation of this transfer.
	// This can be implemented in the future for CH and S3. To implement atomicity, you need to use a transaction that
	// captures all actions with destination DB during the snapshot. This transaction must be committed when calling Commit().
	Commit() error
}

Committable is a sinker that needs to be completed after snapshot.

type DBSchema

type DBSchema = changeitem.DBSchema

func SchemaFilterByObjects

func SchemaFilterByObjects(schema DBSchema, objects []string) (DBSchema, error)

type DateColumn

type DateColumn struct {
	From, To, Nano string
}

type Deactivate

type Deactivate struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Deactivate) Visit

func (t Deactivate) Visit(v TaskVisitor) interface{}

func (Deactivate) VisitRunnable

func (t Deactivate) VisitRunnable(v RunnableVisitor) interface{}

type EmptyIoCloser

type EmptyIoCloser struct{} // stub implementation of io.Closer

func NewEmptyIoCloser

func NewEmptyIoCloser() *EmptyIoCloser

func (*EmptyIoCloser) Close

func (s *EmptyIoCloser) Close() error

type EndpointDelete

type EndpointDelete struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (EndpointDelete) Visit

func (t EndpointDelete) Visit(v TaskVisitor) interface{}

func (EndpointDelete) VisitFake

func (t EndpointDelete) VisitFake(v FakeVisitor) interface{}

type EphemeralTask

type EphemeralTask interface {
	Task
	// contains filtered or unexported methods
}

type EventSize

type EventSize = changeitem.EventSize

type FakeTask

type FakeTask interface {
	Task
	VisitFake(visitor FakeVisitor) interface{}
	// contains filtered or unexported methods
}

type FakeVisitor

type FakeVisitor interface {
	OnEndpointDelete(t EndpointDelete) interface{}
	OnTransferCreate(t TransferCreate) interface{}
	OnTransferDelete(t TransferDelete) interface{}
	OnReplication(t Replication) interface{}
	OnTermination(t Termination) interface{}
	OnTransferVersionUpdate(t TransferVersionUpdate) interface{}
	OnTransferVersionFreeze(t TransferVersionFreeze) interface{}
	OnTransferVersionUnfreeze(t TransferVersionUnfreeze) interface{}
}

type FastTableSchema

type FastTableSchema = changeitem.FastTableSchema

type Fetchable

type Fetchable interface {
	Fetch() ([]ChangeItem, error)
}

type Filters

type Filters map[Table]WhereStatement

func ParseFilterItems

func ParseFilterItems(rawFilters []string) (Filters, error)

type HomoValuer

type HomoValuer interface {
	HomoValue() any
}

HomoValuer is the same as sql/driver.Valuer, but for homogenous values

type IncludeTableList

type IncludeTableList interface {
	Includeable
	IncludeTableList() ([]TableID, error)
}

type Includeable

type Includeable interface {
	// Include returns true if the given table is included
	Include(tID TableID) bool
}

func NewIntersectionIncludeable

func NewIntersectionIncludeable(a, b Includeable) Includeable

type IncrementalStorage

type IncrementalStorage interface {
	GetIncrementalState(ctx context.Context, incremental []IncrementalTable) ([]TableDescription, error)
	SetInitialState(tables []TableDescription, incremental []IncrementalTable)
}

type IncrementalTable

type IncrementalTable struct {
	Name         string `yaml:"name"`
	Namespace    string `yaml:"namespace"`
	CursorField  string `yaml:"cursor_field"`
	InitialState string `yaml:"initial_state"`
}

func (IncrementalTable) Initialized

func (t IncrementalTable) Initialized() bool

func (IncrementalTable) TableID

func (t IncrementalTable) TableID() TableID

type Kind

type Kind = changeitem.Kind

type LfLineSplitter

type LfLineSplitter string
const (
	LfLineSplitterNewLine    LfLineSplitter = "\n"
	LfLineSplitterDoNotSplit LfLineSplitter = "do-not-split"
	LfLineSplitterProtoseq   LfLineSplitter = "protoseq"
)

type LimitedResourceRuntime

type LimitedResourceRuntime interface {
	RAMGuarantee() uint64
	GCPercentage() int
	ResourceLimiterEnabled() bool
}

type LoadProgress

type LoadProgress func(current, progress, total uint64)

type LocalRuntime

type LocalRuntime struct {
	Host           string
	CurrentJob     int
	ShardingUpload ShardUploadParams
}

func (*LocalRuntime) CurrentJobIndex

func (l *LocalRuntime) CurrentJobIndex() int

func (*LocalRuntime) IsMain

func (l *LocalRuntime) IsMain() bool

func (*LocalRuntime) NeedRestart

func (l *LocalRuntime) NeedRestart(runtime Runtime) bool

func (*LocalRuntime) SetVersion

func (l *LocalRuntime) SetVersion(runtimeSpecificVersion string, versionProperties *string) error

func (*LocalRuntime) ThreadsNumPerWorker

func (l *LocalRuntime) ThreadsNumPerWorker() int

func (*LocalRuntime) Type

func (*LocalRuntime) Type() RuntimeType

func (*LocalRuntime) Validate

func (l *LocalRuntime) Validate() error

func (*LocalRuntime) WithDefaults

func (l *LocalRuntime) WithDefaults()

func (*LocalRuntime) WorkersNum

func (l *LocalRuntime) WorkersNum() int

type LogPosition

type LogPosition struct {
	ID   uint32
	LSN  uint64
	TxID string
}

type Middleware

type Middleware func(sinker Sinker) Sinker

type MonitorableSlot

type MonitorableSlot interface {
	RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (SlotKiller, <-chan error, error)
}

type Movable

type Movable interface {
	Sinker
	// Move moves (renames) the given source table into the given destination table
	Move(ctx context.Context, src, dst TableID) error
}

Movable is a sinker which can move tables. This interface allows to use temporator middleware.

type OldKeysType

type OldKeysType = changeitem.OldKeysType

type Partition

type Partition = changeitem.Partition

type PositionalStorage

type PositionalStorage interface {
	// Position provide info about snapshot read position
	Position(ctx context.Context) (*LogPosition, error)
}

PositionalStorage some storages may provide specific position for snapshot consistency

type Progress

type Progress struct {
	Completed int64
	Total     int64
}

type PropertyKey

type PropertyKey = changeitem.PropertyKey

type ProviderType

type ProviderType string

func (ProviderType) Name

func (p ProviderType) Name() string

type Pusher

type Pusher func(items []ChangeItem) error

func PusherFromAsyncSink

func PusherFromAsyncSink(asink AsyncSink) Pusher

PusherFromAsyncSink wraps the given sink into a (synchronous) pusher interface

type ReUpload

type ReUpload struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (ReUpload) Visit

func (t ReUpload) Visit(v TaskVisitor) interface{}

func (ReUpload) VisitRunnable

func (t ReUpload) VisitRunnable(v RunnableVisitor) interface{}

type RegularSnapshot

type RegularSnapshot struct {
	Enabled               bool               `json:"Enabled" yaml:"enabled"`
	Interval              time.Duration      `json:"Interval" yaml:"interval"`
	CronExpression        string             `json:"CronExpression" yaml:"cron_expression"`
	IncrementDelaySeconds int64              `json:"IncrementDelaySeconds" yaml:"increment_delay_seconds"`
	Incremental           []IncrementalTable `json:"Incremental" yaml:"incremental"`
}

type RemoveTables

type RemoveTables struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (RemoveTables) Visit

func (t RemoveTables) Visit(v TaskVisitor) interface{}

func (RemoveTables) VisitRunnable

func (t RemoveTables) VisitRunnable(v RunnableVisitor) interface{}

type Replication

type Replication struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (Replication) Visit

func (t Replication) Visit(v TaskVisitor) interface{}

func (Replication) VisitFake

func (t Replication) VisitFake(v FakeVisitor) interface{}

type Restart

type Restart struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Restart) Visit

func (t Restart) Visit(v TaskVisitor) interface{}

func (Restart) VisitRunnable

func (t Restart) VisitRunnable(v RunnableVisitor) interface{}

type RetriablePartUploadError

type RetriablePartUploadError = dterrors.RetriablePartUploadError

type RunnableTask

type RunnableTask interface {
	Task
	VisitRunnable(visitor RunnableVisitor) interface{}
	// contains filtered or unexported methods
}

type RunnableVisitor

type RunnableVisitor interface {
	OnActivate(t Activate) interface{}
	OnAddTables(t AddTables) interface{}
	OnUpdateTransfer(t UpdateTransfer) interface{}
	OnChecksum(t Checksum) interface{}
	OnDeactivate(t Deactivate) interface{}
	OnCleanupResource(t CleanupResource) interface{}
	OnReUpload(t ReUpload) interface{}
	OnRemoveTables(t RemoveTables) interface{}
	OnRestart(t Restart) interface{}
	OnStart(t Start) interface{}
	OnStop(t Stop) interface{}
	OnUpload(t Upload) interface{}
	OnVerify(t Verify) interface{}
	OnTestEndpoint(t TestEndpoint) interface{}
}

type Runtime

type Runtime interface {
	NeedRestart(runtime Runtime) bool
	WithDefaults()
	Validate() error
	Type() RuntimeType
	SetVersion(runtimeSpecificVersion string, versionProperties *string) error
}

func NewRuntime

func NewRuntime(runtime RuntimeType, runtimeSpec string) (Runtime, error)

type RuntimeType

type RuntimeType string

type SampleableStorage

type SampleableStorage interface {
	Storage

	TableSizeInBytes(table TableID) (uint64, error)
	LoadTopBottomSample(table TableDescription, pusher Pusher) error
	LoadRandomSample(table TableDescription, pusher Pusher) error
	LoadSampleBySet(table TableDescription, keySet []map[string]interface{}, pusher Pusher) error
	TableAccessible(table TableDescription) bool
}

SampleableStorage is for dataplane tests

type ScheduledTask

type ScheduledTask interface {
	Stop()
	Runtime() Runtime
}

type SchemaStorage

type SchemaStorage interface {
	LoadSchema() (DBSchema, error)
}

SchemaStorage allow to resolve DB Schema from storage

type ShardUploadParams

type ShardUploadParams struct {
	JobCount     int //Workers count
	ProcessCount int //Threads count
}

Parallelism params

func DefaultShardUploadParams

func DefaultShardUploadParams() *ShardUploadParams

func NewShardUploadParams

func NewShardUploadParams(jobCount int, processCount int) *ShardUploadParams

type ShardableTask

type ShardableTask interface {
	Task
	// contains filtered or unexported methods
}

type ShardingContextStorage

type ShardingContextStorage interface {
	// ShardingContext Return shared data, used on *MAIN* worker;
	// Take care, method return OperationState_ShardedUploadState, but only fill field Context;
	// Because type of Context is private, this is protoc thing;
	ShardingContext() ([]byte, error)

	// SetShardingContext for storage, used on *SECONDARY* worker
	SetShardingContext(shardedState []byte) error
}

Storage has data, that need to be shared with all workers

type ShardingStorage

type ShardingStorage interface {
	ShardTable(ctx context.Context, table TableDescription) ([]TableDescription, error)
}

ShardingStorage is for in table sharding

type ShardingTaskRuntime

type ShardingTaskRuntime interface {
	WorkersNum() int
	ThreadsNumPerWorker() int
	CurrentJobIndex() int
	IsMain() bool
}

type SinkOption

type SinkOption func(sinker Sinker) Sinker

TODO: Drop by making transformers a common middleware

func GetTransformers

func GetTransformers(middlewareNames []string) ([]SinkOption, error)

type Sinker

type Sinker interface {
	io.Closer
	// Push writes the given items into destination synchronously. If its result is nil, the items are considered to be successfully written to the destination.
	// The method must be retriable: it can be called again after it returns an error, except for fatal errors (these must be wrapped in a particular struct)
	Push(items []ChangeItem) error
}

Sinker is the destination's data writer interface.

All its methods are guaranteed to be called non-concurrently (synchronously).

TODO: rename to Sink

type SlotKiller

type SlotKiller interface {
	KillSlot() error
}

func MakeStubSlotKiller

func MakeStubSlotKiller() SlotKiller

type SnapshotableStorage

type SnapshotableStorage interface {
	BeginSnapshot(ctx context.Context) error
	EndSnapshot(ctx context.Context) error
}

type Source

type Source interface {
	Run(sink AsyncSink) error
	Stop()
}

type SourceReader

type SourceReader struct {
	TotalCount int
	Reader     chan map[string]interface{}
	Name       string
	RawSchema  []ColumnSchema
	Schema     string
	Table      string
	Lsn        uint64
	CommitTime uint64
}

type Start

type Start struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Start) Visit

func (t Start) Visit(v TaskVisitor) interface{}

func (Start) VisitRunnable

func (t Start) VisitRunnable(v RunnableVisitor) interface{}

type Stop

type Stop struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Stop) Visit

func (t Stop) Visit(v TaskVisitor) interface{}

func (Stop) VisitRunnable

func (t Stop) VisitRunnable(v RunnableVisitor) interface{}

type Storage

type Storage interface {
	Closeable

	Ping() error
	LoadTable(ctx context.Context, table TableDescription, pusher Pusher) error
	TableSchema(ctx context.Context, table TableID) (*TableSchema, error)
	TableList(filter IncludeTableList) (TableMap, error)

	ExactTableRowsCount(table TableID) (uint64, error)
	EstimateTableRowsCount(table TableID) (uint64, error)
	TableExists(table TableID) (bool, error)
}

Storage is for simple storage implementations For extra functionalities implement below storages.

type StubSlotKiller

type StubSlotKiller struct {
}

func (*StubSlotKiller) KillSlot

func (k *StubSlotKiller) KillSlot() error

type Table

type Table string

type TableColumns

type TableColumns = changeitem.TableColumns

type TableDescription

type TableDescription struct {
	Name   string
	Schema string // for example - for mysql here are database name
	Filter WhereStatement
	EtaRow uint64 // estimated number of rows in the table
	Offset uint64 // offset (in rows) along the ordering key (not necessary primary key)
}

func (*TableDescription) Fqtn

func (t *TableDescription) Fqtn() string

func (*TableDescription) ID

func (t *TableDescription) ID() TableID

func (*TableDescription) PartID

func (t *TableDescription) PartID() string

func (*TableDescription) Same

func (t *TableDescription) Same(table string) bool

func (*TableDescription) String

func (t *TableDescription) String() string

type TableID

type TableID = changeitem.TableID
var NonExistentTableID TableID = *NewTableID("", "")

func NewTableIDFromStringPg

func NewTableIDFromStringPg(fqtn string, replaceOmittedSchemaWithPublic bool) (*TableID, error)

NewTableIDFromStringPg parses the given FQTN in PostgreSQL syntax to construct a TableID.

func ParseTableID

func ParseTableID(object string) (*TableID, error)

func ParseTableIDs

func ParseTableIDs(objects ...string) ([]TableID, error)

func TableIDsIntersection

func TableIDsIntersection(a []TableID, b []TableID) []TableID

TableIDsIntersection returns an intersection of two lists of TableIDs

type TableInfo

type TableInfo struct {
	EtaRow uint64
	IsView bool
	Schema *TableSchema
}

type TableMap

type TableMap map[TableID]TableInfo

func (*TableMap) ConvertToTableDescriptions

func (m *TableMap) ConvertToTableDescriptions() []TableDescription

func (*TableMap) Copy

func (m *TableMap) Copy() TableMap

func (*TableMap) FakePkeyTables

func (m *TableMap) FakePkeyTables() []TableID

func (*TableMap) NoKeysTables

func (m *TableMap) NoKeysTables() []TableID

func (*TableMap) String

func (m *TableMap) String(withSchema bool) string

func (*TableMap) ToDBSchema

func (m *TableMap) ToDBSchema() DBSchema

type TablePartID

type TablePartID = changeitem.TablePartID

type TableSchema

type TableSchema = changeitem.TableSchema

type TableSplitter

type TableSplitter struct {
	Columns []string
}

type TableUploadError

type TableUploadError = dterrors.TableUploadError

type Task

type Task interface {
	Visit(visitor TaskVisitor) interface{}
	// contains filtered or unexported methods
}

type TaskType

type TaskType struct {
	Task
}

func (*TaskType) DecodeText

func (t *TaskType) DecodeText(ci *pgtype.ConnInfo, src []byte) error

func (TaskType) Description

func (t TaskType) Description(taskParams interface{}) string

func (TaskType) EncodeText

func (t TaskType) EncodeText(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error)

func (*TaskType) GobDecode

func (t *TaskType) GobDecode(data []byte) error

func (TaskType) GobEncode

func (t TaskType) GobEncode() ([]byte, error)

func (TaskType) MarshalJSON

func (t TaskType) MarshalJSON() ([]byte, error)

func (TaskType) NewParams

func (t TaskType) NewParams() interface{}

func (TaskType) String

func (t TaskType) String() string

func (*TaskType) UnmarshalJSON

func (t *TaskType) UnmarshalJSON(data []byte) error

type TaskTypeName

type TaskTypeName = string

type TaskVisitor

type TaskVisitor interface {
	RunnableVisitor
	FakeVisitor
}

Visitors {{{

type Termination

type Termination struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (Termination) Visit

func (t Termination) Visit(v TaskVisitor) interface{}

func (Termination) VisitFake

func (t Termination) VisitFake(v FakeVisitor) interface{}

type TestEndpoint

type TestEndpoint struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (TestEndpoint) HasTimedOut

func (t TestEndpoint) HasTimedOut(createdAt time.Time, pingedAt time.Time) bool

func (TestEndpoint) Visit

func (t TestEndpoint) Visit(v TaskVisitor) interface{}

func (TestEndpoint) VisitRunnable

func (t TestEndpoint) VisitRunnable(v RunnableVisitor) interface{}

type TestResult

type TestResult struct {
	Checks  map[CheckType]CheckResult
	Schema  TableMap
	Preview map[TableID][]ChangeItem
}

TestResult aggregated result of test for endpoint

func NewTestResult

func NewTestResult(checks ...CheckType) *TestResult

func (*TestResult) Add

func (t *TestResult) Add(extraChecks ...CheckType)

func (*TestResult) Combine

func (t *TestResult) Combine(partialResults *TestResult)

Combine combines the two checkResult maps into one

func (*TestResult) Err

func (t *TestResult) Err() error

func (*TestResult) NotOk

func (t *TestResult) NotOk(checkType CheckType, err error) *TestResult

func (*TestResult) Ok

func (t *TestResult) Ok(checkType CheckType) *TestResult

type TimeoutableTask

type TimeoutableTask interface {
	Task
	HasTimedOut(createdAt time.Time, pingedAt time.Time) bool
}

type TimestampCol

type TimestampCol struct {
	Col    string
	Format string
}

type Transfer

type Transfer interface {
	// Start the transfer. Retry endlessly when errors occur, until stopped with Stop().
	// This method does not block, the work is done in the background.
	Start()

	// Stop the transfer. May be called multiple times even without prior Start() to clean
	// up external resources, e.g. terminate YT operations. Synchronous, i.e. blocks
	// until either all resources are released or an error occurrs.
	Stop() error

	Runtime() Runtime

	Error() error
}

type TransferCreate

type TransferCreate struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (TransferCreate) Visit

func (t TransferCreate) Visit(v TaskVisitor) interface{}

func (TransferCreate) VisitFake

func (t TransferCreate) VisitFake(v FakeVisitor) interface{}

type TransferDelete

type TransferDelete struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (TransferDelete) Visit

func (t TransferDelete) Visit(v TaskVisitor) interface{}

func (TransferDelete) VisitFake

func (t TransferDelete) VisitFake(v FakeVisitor) interface{}

type TransferType

type TransferType string

func (*TransferType) Expand

func (t *TransferType) Expand() []TransferType

type TransferVersionFreeze

type TransferVersionFreeze struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (TransferVersionFreeze) Visit

func (t TransferVersionFreeze) Visit(v TaskVisitor) interface{}

func (TransferVersionFreeze) VisitFake

func (t TransferVersionFreeze) VisitFake(v FakeVisitor) interface{}

type TransferVersionFreezeParams

type TransferVersionFreezeParams struct {
	CurrentVersion string `json:"current_version"`
}

type TransferVersionUnfreeze

type TransferVersionUnfreeze struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (TransferVersionUnfreeze) Visit

func (t TransferVersionUnfreeze) Visit(v TaskVisitor) interface{}

func (TransferVersionUnfreeze) VisitFake

func (t TransferVersionUnfreeze) VisitFake(v FakeVisitor) interface{}

type TransferVersionUnfreezeParams

type TransferVersionUnfreezeParams struct {
	CurrentVersion string `json:"current_version"`
}

type TransferVersionUpdate

type TransferVersionUpdate struct {
	// contains filtered or unexported fields
}

Fake tasks {{{

func (TransferVersionUpdate) Visit

func (t TransferVersionUpdate) Visit(v TaskVisitor) interface{}

func (TransferVersionUpdate) VisitFake

func (t TransferVersionUpdate) VisitFake(v FakeVisitor) interface{}

type TransferVersionUpdateParams

type TransferVersionUpdateParams struct {
	PreviousVersion string `json:"previous_version"`
	CurrentVersion  string `json:"current_version"`
}

type Transformation

type Transformation interface {
	MakeSinkMiddleware() SinkOption
	AddTransformer(transformer Transformer) error
	RuntimeOpts() TransformationRuntimeOpts
}

type TransformationRuntimeOpts

type TransformationRuntimeOpts struct {
	JobIndex int
}

type Transformer

type Transformer interface {
	Apply(input []ChangeItem) TransformerResult
	Suitable(table TableID, schema *TableSchema) bool
	ResultSchema(original *TableSchema) (*TableSchema, error)
	Description() string
	Type() TransformerType
}

type TransformerError

type TransformerError struct {
	Input ChangeItem
	Error error
}

type TransformerResult

type TransformerResult struct {
	Transformed []ChangeItem
	Errors      []TransformerError
}

type TransformerType

type TransformerType string

type TxBound

type TxBound = changeitem.TxBound

type TypedChangeItem

type TypedChangeItem ChangeItem

func (*TypedChangeItem) MarshalJSON

func (t *TypedChangeItem) MarshalJSON() ([]byte, error)

func (*TypedChangeItem) UnmarshalJSON

func (t *TypedChangeItem) UnmarshalJSON(data []byte) error

type TypedValue

type TypedValue struct {
	Type  string `json:"type"`
	Value any    `json:"value"`
}

type UpdateTransfer

type UpdateTransfer struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (UpdateTransfer) Visit

func (t UpdateTransfer) Visit(v TaskVisitor) interface{}

func (UpdateTransfer) VisitRunnable

func (t UpdateTransfer) VisitRunnable(v RunnableVisitor) interface{}

type UpdateTransferParams

type UpdateTransferParams struct {
	OldObjects []string `json:"old_objects"`
	NewObjects []string `json:"new_objects"`
}

func (UpdateTransferParams) AddedTables

func (p UpdateTransferParams) AddedTables() ([]TableDescription, error)

type Upload

type Upload struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Upload) Visit

func (t Upload) Visit(v TaskVisitor) interface{}

func (Upload) VisitRunnable

func (t Upload) VisitRunnable(v RunnableVisitor) interface{}

type Verify

type Verify struct {
	// contains filtered or unexported fields
}

Runnable tasks {{{

func (Verify) HasTimedOut

func (t Verify) HasTimedOut(createdAt time.Time, pingedAt time.Time) bool

func (Verify) Visit

func (t Verify) Visit(v TaskVisitor) interface{}

func (Verify) VisitRunnable

func (t Verify) VisitRunnable(v RunnableVisitor) interface{}

type WhereStatement

type WhereStatement string
const NoFilter WhereStatement = WhereStatement("")

func FiltersIntersection

func FiltersIntersection(a WhereStatement, b WhereStatement) WhereStatement

func NotStatement

func NotStatement(a WhereStatement) WhereStatement

type YtCluster

type YtCluster string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL