Documentation ¶
Index ¶
- Constants
- Variables
- func AddTransformer(transformerName string, transformer SinkOption) error
- func BalanceBrackets(phrase string) bool
- func BuildIncludeMap(objects []string) (map[TableID]bool, error)
- func DefaultValue(c *changeitem.ColSchema) interface{}
- func GetTypeFromString(dataType string) (schema.Type, error)
- func IsMysqlBinaryType(in string) bool
- func KnownRuntime(r RuntimeType) bool
- func ParseFilter(rawFilter string) (*Table, *WhereStatement, error)
- func RegisterProviderName(providerType ProviderType, name string)
- func RegisterRuntime(r RuntimeType, f func(spec string) (Runtime, error))
- func Restore(column ColSchema, value interface{}) interface{}
- func RestoreChangeItems(batch []ChangeItem)
- func Rows(metrics metrics.Registry, table string, rows int)
- func ToYtSchema(original []ColSchema, fixAnyTypeInPrimaryKey bool) []schema.Column
- func TrimMySQLType(rawColumnType string) string
- func ValidateChangeItem(changeItem *ChangeItem) error
- func ValidateChangeItems(changeItems []ChangeItem) error
- func ValidateChangeItemsPtrs(changeItems []*ChangeItem) error
- type Activate
- type AddTables
- type AsyncMiddleware
- type AsyncSink
- type Asyncer
- type Block
- type ChangeItem
- func MakeDoneTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, ...) []ChangeItem
- func MakeInitTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, ...) []ChangeItem
- func MakeSynchronizeEvent() ChangeItem
- func MakeTxDone(txSequence uint32, lsn uint64, execTS time.Time, ...) ChangeItem
- func UnmarshalChangeItem(changeItemBuf []byte) (*ChangeItem, error)
- func UnmarshalChangeItems(changeItemsBuf []byte) ([]ChangeItem, error)
- type CheckResult
- type CheckType
- type Checksum
- type CleanupResource
- type Closeable
- type ColSchema
- type ColumnInfo
- type ColumnName
- type ColumnSchema
- type ColumnType
- type Committable
- type DBSchema
- type DateColumn
- type Deactivate
- type EmptyIoCloser
- type EndpointDelete
- type EphemeralTask
- type EventSize
- type FakeTask
- type FakeVisitor
- type FastTableSchema
- type Fetchable
- type Filters
- type HomoValuer
- type IncludeTableList
- type Includeable
- type IncrementalStorage
- type IncrementalTable
- type Kind
- type LfLineSplitter
- type LimitedResourceRuntime
- type LoadProgress
- type LocalRuntime
- func (l *LocalRuntime) CurrentJobIndex() int
- func (l *LocalRuntime) IsMain() bool
- func (l *LocalRuntime) NeedRestart(runtime Runtime) bool
- func (l *LocalRuntime) SetVersion(runtimeSpecificVersion string, versionProperties *string) error
- func (l *LocalRuntime) ThreadsNumPerWorker() int
- func (*LocalRuntime) Type() RuntimeType
- func (l *LocalRuntime) Validate() error
- func (l *LocalRuntime) WithDefaults()
- func (l *LocalRuntime) WorkersNum() int
- type LogPosition
- type Middleware
- type MonitorableSlot
- type Movable
- type OldKeysType
- type Partition
- type PositionalStorage
- type Progress
- type PropertyKey
- type ProviderType
- type Pusher
- type ReUpload
- type RegularSnapshot
- type RemoveTables
- type Replication
- type Restart
- type RetriablePartUploadError
- type RunnableTask
- type RunnableVisitor
- type Runtime
- type RuntimeType
- type SampleableStorage
- type ScheduledTask
- type SchemaStorage
- type ShardUploadParams
- type ShardableTask
- type ShardingContextStorage
- type ShardingStorage
- type ShardingTaskRuntime
- type SinkOption
- type Sinker
- type SlotKiller
- type SnapshotableStorage
- type Source
- type SourceReader
- type Start
- type Stop
- type Storage
- type StubSlotKiller
- type Table
- type TableColumns
- type TableDescription
- type TableID
- type TableInfo
- type TableMap
- type TablePartID
- type TableSchema
- type TableSplitter
- type TableUploadError
- type Task
- type TaskType
- func (t *TaskType) DecodeText(ci *pgtype.ConnInfo, src []byte) error
- func (t TaskType) Description(taskParams interface{}) string
- func (t TaskType) EncodeText(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error)
- func (t *TaskType) GobDecode(data []byte) error
- func (t TaskType) GobEncode() ([]byte, error)
- func (t TaskType) MarshalJSON() ([]byte, error)
- func (t TaskType) NewParams() interface{}
- func (t TaskType) String() string
- func (t *TaskType) UnmarshalJSON(data []byte) error
- type TaskTypeName
- type TaskVisitor
- type Termination
- type TestEndpoint
- type TestResult
- type TimeoutableTask
- type TimestampCol
- type Transfer
- type TransferCreate
- type TransferDelete
- type TransferType
- type TransferVersionFreeze
- type TransferVersionFreezeParams
- type TransferVersionUnfreeze
- type TransferVersionUnfreezeParams
- type TransferVersionUpdate
- type TransferVersionUpdateParams
- type Transformation
- type TransformationRuntimeOpts
- type Transformer
- type TransformerError
- type TransformerResult
- type TransformerType
- type TxBound
- type TypedChangeItem
- type TypedValue
- type UpdateTransfer
- type UpdateTransferParams
- type Upload
- type Verify
- type WhereStatement
- type YtCluster
Constants ¶
const ( ProviderTypeMock = ProviderType("mock") ProviderTypeNone = ProviderType("none") )
const ( TransferTypeNone = TransferType("TRANSFER_TYPE_UNSPECIFIED") TransferTypeSnapshotAndIncrement = TransferType("SNAPSHOT_AND_INCREMENT") TransferTypeSnapshotOnly = TransferType("SNAPSHOT_ONLY") TransferTypeIncrementOnly = TransferType("INCREMENT_ONLY") )
const (
LocalRuntimeType = RuntimeType("local")
)
const OriginalTypeMirrorBinary = changeitem.OriginalTypeMirrorBinary
const TableConsumerKeeper = changeitem.TableConsumerKeeper
const TableLSN = changeitem.TableLSN
const TestVersion = 2
TestVersion is the version of unit tests that this will pass
Variables ¶
var AllTasks []Task = makeAllTasks()
var AsyncPushConcurrencyErr = xerrors.NewSentinel("AsyncPush is called after Close")
AsyncPushConcurrencyErr indicates a Push has been called on an already closed AsyncSink. This must not happen and means there are concurrency issues in the implementation of a source.
var ChCreateTableDistributedKind = changeitem.ChCreateTableDistributedKind
var ChCreateTableKind = changeitem.ChCreateTableKind
var ChangeItemFromMap = changeitem.ChangeItemFromMap
var CheckErrorWrapping = dterrors.CheckErrorWrapping
var CheckOpaqueErrorWrapping = dterrors.CheckOpaqueErrorWrapping
var ClickhouseDDLBuilderKind = changeitem.ClickhouseDDLBuilderKind
var ColIDX = changeitem.ColIDX
var Collapse = changeitem.Collapse
var ContainsNonRowItem = changeitem.ContainsNonRowItem
var DDLKind = changeitem.DDLKind
var DeleteKind = changeitem.DeleteKind
var DoneShardedTableLoad = changeitem.DoneShardedTableLoad
var DoneTableLoad = changeitem.DoneTableLoad
var DropTableKind = changeitem.DropTableKind
var Dump = changeitem.Dump
var ElasticsearchDumpIndexKind = changeitem.ElasticsearchDumpIndexKind
var EmptyEventSize = changeitem.EmptyEventSize
var EmptyOldKeys = changeitem.EmptyOldKeys
var FakeTasks []FakeTask = makeFakeTasks()
var FindItemOfKind = changeitem.FindItemOfKind
var GetRawMessageData = changeitem.GetRawMessageData
var InitShardedTableLoad = changeitem.InitShardedTableLoad
var InitTableLoad = changeitem.InitTableLoad
var InsertKind = changeitem.InsertKind
var IsFatal = dterrors.IsFatal
var IsNonShardableError = dterrors.IsNonShardableError
var IsRetriablePartUploadError = dterrors.IsRetriablePartUploadError
var IsSystemTable = changeitem.IsSystemTable
var IsTableUploadError = dterrors.IsTableUploadError
var KeyNames = changeitem.KeyNames
var MakeFastTableSchema = changeitem.MakeFastTableSchema
var MakeMapColNameToIndex = changeitem.MakeMapColNameToIndex
var MakeOriginallyTypedColSchema = changeitem.MakeOriginallyTypedColSchema
var MakeRawMessage = changeitem.MakeRawMessage
var MakeTypedColSchema = changeitem.MakeTypedColSchema
var MongoCreateKind = changeitem.MongoCreateKind
var MongoDropDatabaseKind = changeitem.MongoDropDatabaseKind
var MongoDropKind = changeitem.MongoDropKind
var MongoNoop = changeitem.MongoNoop
var MongoRenameKind = changeitem.MongoRenameKind
var MongoUpdateDocumentKind = changeitem.MongoUpdateDocumentKind
var NewColSchema = changeitem.NewColSchema
var NewFatalError = dterrors.NewFatalError
var NewNonShardableError = dterrors.NewNonShardableError
var NewPartition = changeitem.NewPartition
var NewRetriablePartUploadError = dterrors.NewRetriablePartUploadError
var NewStrictifyError = strictify.NewStrictifyError
var NewTableID = changeitem.NewTableID
var NewTableSchema = changeitem.NewTableSchema
var NewTableUploadError = dterrors.NewTableUploadError
var PgDDLKind = changeitem.PgDDLKind
var PgName = changeitem.PgName
var RawDataColsIDX = changeitem.RawDataColsIDX
var RawDataSchema = changeitem.RawDataSchema
var RawEventSize = changeitem.RawEventSize
var RawMessagePartition = changeitem.RawMessagePartition
var RawMessageSeqNo = changeitem.RawMessageSeqNo
var RawMessageTopic = changeitem.RawMessageTopic
var RawMessageWriteTime = changeitem.RawMessageWriteTime
var RegisterSystemTables = changeitem.RegisterSystemTables
var RunnableTasks []RunnableTask = makeRunnableTasks()
var Sniff = changeitem.Sniff
var SplitByID = changeitem.SplitByID
var SplitByTableID = changeitem.SplitByTableID
var SplitUpdatedPKeys = changeitem.SplitUpdatedPKeys
var SynchronizeKind = changeitem.SynchronizeKind
var TaskTypeByName map[TaskTypeName]TaskType = makeTaskTypeByNameMap()
var TruncateTableKind = changeitem.TruncateTableKind
var UpdateKind = changeitem.UpdateKind
Functions ¶
func AddTransformer ¶
func AddTransformer(transformerName string, transformer SinkOption) error
func BalanceBrackets ¶
func DefaultValue ¶
func DefaultValue(c *changeitem.ColSchema) interface{}
DefaultValue returns a default instance of the type represented by this schema. This method only works safely in heterogenous transfers.
func IsMysqlBinaryType ¶
func KnownRuntime ¶
func KnownRuntime(r RuntimeType) bool
func ParseFilter ¶
func ParseFilter(rawFilter string) (*Table, *WhereStatement, error)
func RegisterProviderName ¶
func RegisterProviderName(providerType ProviderType, name string)
func RegisterRuntime ¶
func RegisterRuntime(r RuntimeType, f func(spec string) (Runtime, error))
func RestoreChangeItems ¶
func RestoreChangeItems(batch []ChangeItem)
func ToYtSchema ¶
func TrimMySQLType ¶
func ValidateChangeItem ¶
func ValidateChangeItem(changeItem *ChangeItem) error
func ValidateChangeItems ¶
func ValidateChangeItems(changeItems []ChangeItem) error
func ValidateChangeItemsPtrs ¶
func ValidateChangeItemsPtrs(changeItems []*ChangeItem) error
Types ¶
type Activate ¶
type Activate struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Activate) Visit ¶
func (t Activate) Visit(v TaskVisitor) interface{}
func (Activate) VisitRunnable ¶
func (t Activate) VisitRunnable(v RunnableVisitor) interface{}
type AddTables ¶
type AddTables struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (AddTables) Visit ¶
func (t AddTables) Visit(v TaskVisitor) interface{}
func (AddTables) VisitRunnable ¶
func (t AddTables) VisitRunnable(v RunnableVisitor) interface{}
type AsyncMiddleware ¶
func ComposeAsyncMiddleware ¶
func ComposeAsyncMiddleware(mw ...AsyncMiddleware) AsyncMiddleware
ComposeAsyncMiddleware builds a pipeline of AsyncMiddlewares. Arguments order should be the same as the desired data flow. ComposeAsyncMiddleware(A, B, C)(sink) call is equivalent to A(B(C(sink)))
type AsyncSink ¶
type AsyncSink interface { io.Closer // AsyncPush writes items asynchronously. The error for the given batch of items will be written into the resulting channel when an underlying (synchronous) Push actually happens. // Note, that AsyncPush takes ownership on slice `items`, so it shouldn't be further used. AsyncPush(items []ChangeItem) chan error }
AsyncSink provides asynchronous Push operation, which should be a wrapper over synchronous Push implemented by sink.
All of its methods may be called concurrently.
type ChangeItem ¶
type ChangeItem = changeitem.ChangeItem
func MakeDoneTableLoad ¶
func MakeDoneTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem
func MakeInitTableLoad ¶
func MakeInitTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem
func MakeSynchronizeEvent ¶
func MakeSynchronizeEvent() ChangeItem
func MakeTxDone ¶
func UnmarshalChangeItem ¶
func UnmarshalChangeItem(changeItemBuf []byte) (*ChangeItem, error)
func UnmarshalChangeItems ¶
func UnmarshalChangeItems(changeItemsBuf []byte) ([]ChangeItem, error)
type CheckResult ¶
CheckResult describe particular check result that was performed against endpoint / transfer
type Checksum ¶
type Checksum struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Checksum) Visit ¶
func (t Checksum) Visit(v TaskVisitor) interface{}
func (Checksum) VisitRunnable ¶
func (t Checksum) VisitRunnable(v RunnableVisitor) interface{}
type CleanupResource ¶
type CleanupResource struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (CleanupResource) Visit ¶
func (t CleanupResource) Visit(v TaskVisitor) interface{}
func (CleanupResource) VisitRunnable ¶
func (t CleanupResource) VisitRunnable(v RunnableVisitor) interface{}
type ColSchema ¶
type ColSchema = changeitem.ColSchema
type ColumnInfo ¶
type ColumnInfo struct { Name string Type *ColumnType Nullable bool }
type ColumnName ¶
type ColumnName = changeitem.ColumnName
type ColumnSchema ¶
type ColumnType ¶
type ColumnType interface {
GetRepresentation() string
}
type Committable ¶
type Committable interface { Sinker // Commit commits all changes made by all sinks during the operation of this transfer. // This can be implemented in the future for CH and S3. To implement atomicity, you need to use a transaction that // captures all actions with destination DB during the snapshot. This transaction must be committed when calling Commit(). Commit() error }
Committable is a sinker that needs to be completed after snapshot.
type DBSchema ¶
type DBSchema = changeitem.DBSchema
type DateColumn ¶
type DateColumn struct {
From, To, Nano string
}
type Deactivate ¶
type Deactivate struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Deactivate) Visit ¶
func (t Deactivate) Visit(v TaskVisitor) interface{}
func (Deactivate) VisitRunnable ¶
func (t Deactivate) VisitRunnable(v RunnableVisitor) interface{}
type EmptyIoCloser ¶
type EmptyIoCloser struct{} // stub implementation of io.Closer
func NewEmptyIoCloser ¶
func NewEmptyIoCloser() *EmptyIoCloser
func (*EmptyIoCloser) Close ¶
func (s *EmptyIoCloser) Close() error
type EndpointDelete ¶
type EndpointDelete struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (EndpointDelete) Visit ¶
func (t EndpointDelete) Visit(v TaskVisitor) interface{}
func (EndpointDelete) VisitFake ¶
func (t EndpointDelete) VisitFake(v FakeVisitor) interface{}
type EphemeralTask ¶
type EphemeralTask interface { Task // contains filtered or unexported methods }
type EventSize ¶
type EventSize = changeitem.EventSize
type FakeTask ¶
type FakeTask interface { Task VisitFake(visitor FakeVisitor) interface{} // contains filtered or unexported methods }
type FakeVisitor ¶
type FakeVisitor interface { OnEndpointDelete(t EndpointDelete) interface{} OnTransferCreate(t TransferCreate) interface{} OnTransferDelete(t TransferDelete) interface{} OnReplication(t Replication) interface{} OnTermination(t Termination) interface{} OnTransferVersionUpdate(t TransferVersionUpdate) interface{} OnTransferVersionFreeze(t TransferVersionFreeze) interface{} OnTransferVersionUnfreeze(t TransferVersionUnfreeze) interface{} }
type FastTableSchema ¶
type FastTableSchema = changeitem.FastTableSchema
type Fetchable ¶
type Fetchable interface {
Fetch() ([]ChangeItem, error)
}
type HomoValuer ¶
type HomoValuer interface {
HomoValue() any
}
HomoValuer is the same as sql/driver.Valuer, but for homogenous values
type IncludeTableList ¶
type IncludeTableList interface { Includeable IncludeTableList() ([]TableID, error) }
type Includeable ¶
type Includeable interface { // Include returns true if the given table is included Include(tID TableID) bool }
func NewIntersectionIncludeable ¶
func NewIntersectionIncludeable(a, b Includeable) Includeable
type IncrementalStorage ¶
type IncrementalStorage interface { GetIncrementalState(ctx context.Context, incremental []IncrementalTable) ([]TableDescription, error) SetInitialState(tables []TableDescription, incremental []IncrementalTable) }
type IncrementalTable ¶
type IncrementalTable struct { Name string `yaml:"name"` Namespace string `yaml:"namespace"` CursorField string `yaml:"cursor_field"` InitialState string `yaml:"initial_state"` }
func (IncrementalTable) Initialized ¶
func (t IncrementalTable) Initialized() bool
func (IncrementalTable) TableID ¶
func (t IncrementalTable) TableID() TableID
type Kind ¶
type Kind = changeitem.Kind
type LfLineSplitter ¶
type LfLineSplitter string
const ( LfLineSplitterNewLine LfLineSplitter = "\n" LfLineSplitterDoNotSplit LfLineSplitter = "do-not-split" LfLineSplitterProtoseq LfLineSplitter = "protoseq" )
type LimitedResourceRuntime ¶
type LoadProgress ¶
type LoadProgress func(current, progress, total uint64)
type LocalRuntime ¶
type LocalRuntime struct { Host string CurrentJob int ShardingUpload ShardUploadParams }
func (*LocalRuntime) CurrentJobIndex ¶
func (l *LocalRuntime) CurrentJobIndex() int
func (*LocalRuntime) IsMain ¶
func (l *LocalRuntime) IsMain() bool
func (*LocalRuntime) NeedRestart ¶
func (l *LocalRuntime) NeedRestart(runtime Runtime) bool
func (*LocalRuntime) SetVersion ¶
func (l *LocalRuntime) SetVersion(runtimeSpecificVersion string, versionProperties *string) error
func (*LocalRuntime) ThreadsNumPerWorker ¶
func (l *LocalRuntime) ThreadsNumPerWorker() int
func (*LocalRuntime) Type ¶
func (*LocalRuntime) Type() RuntimeType
func (*LocalRuntime) Validate ¶
func (l *LocalRuntime) Validate() error
func (*LocalRuntime) WithDefaults ¶
func (l *LocalRuntime) WithDefaults()
func (*LocalRuntime) WorkersNum ¶
func (l *LocalRuntime) WorkersNum() int
type LogPosition ¶
type Middleware ¶
type MonitorableSlot ¶
type Movable ¶
type Movable interface { Sinker // Move moves (renames) the given source table into the given destination table Move(ctx context.Context, src, dst TableID) error }
Movable is a sinker which can move tables. This interface allows to use temporator middleware.
type OldKeysType ¶
type OldKeysType = changeitem.OldKeysType
type Partition ¶
type Partition = changeitem.Partition
type PositionalStorage ¶
type PositionalStorage interface { // Position provide info about snapshot read position Position(ctx context.Context) (*LogPosition, error) }
PositionalStorage some storages may provide specific position for snapshot consistency
type PropertyKey ¶
type PropertyKey = changeitem.PropertyKey
type ProviderType ¶
type ProviderType string
func (ProviderType) Name ¶
func (p ProviderType) Name() string
type Pusher ¶
type Pusher func(items []ChangeItem) error
func PusherFromAsyncSink ¶
PusherFromAsyncSink wraps the given sink into a (synchronous) pusher interface
type ReUpload ¶
type ReUpload struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (ReUpload) Visit ¶
func (t ReUpload) Visit(v TaskVisitor) interface{}
func (ReUpload) VisitRunnable ¶
func (t ReUpload) VisitRunnable(v RunnableVisitor) interface{}
type RegularSnapshot ¶
type RegularSnapshot struct { Enabled bool `json:"Enabled" yaml:"enabled"` Interval time.Duration `json:"Interval" yaml:"interval"` CronExpression string `json:"CronExpression" yaml:"cron_expression"` IncrementDelaySeconds int64 `json:"IncrementDelaySeconds" yaml:"increment_delay_seconds"` Incremental []IncrementalTable `json:"Incremental" yaml:"incremental"` }
type RemoveTables ¶
type RemoveTables struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (RemoveTables) Visit ¶
func (t RemoveTables) Visit(v TaskVisitor) interface{}
func (RemoveTables) VisitRunnable ¶
func (t RemoveTables) VisitRunnable(v RunnableVisitor) interface{}
type Replication ¶
type Replication struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (Replication) Visit ¶
func (t Replication) Visit(v TaskVisitor) interface{}
func (Replication) VisitFake ¶
func (t Replication) VisitFake(v FakeVisitor) interface{}
type Restart ¶
type Restart struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Restart) Visit ¶
func (t Restart) Visit(v TaskVisitor) interface{}
func (Restart) VisitRunnable ¶
func (t Restart) VisitRunnable(v RunnableVisitor) interface{}
type RetriablePartUploadError ¶
type RetriablePartUploadError = dterrors.RetriablePartUploadError
type RunnableTask ¶
type RunnableTask interface { Task VisitRunnable(visitor RunnableVisitor) interface{} // contains filtered or unexported methods }
type RunnableVisitor ¶
type RunnableVisitor interface { OnActivate(t Activate) interface{} OnAddTables(t AddTables) interface{} OnUpdateTransfer(t UpdateTransfer) interface{} OnChecksum(t Checksum) interface{} OnDeactivate(t Deactivate) interface{} OnCleanupResource(t CleanupResource) interface{} OnReUpload(t ReUpload) interface{} OnRemoveTables(t RemoveTables) interface{} OnRestart(t Restart) interface{} OnStart(t Start) interface{} OnStop(t Stop) interface{} OnUpload(t Upload) interface{} OnVerify(t Verify) interface{} OnTestEndpoint(t TestEndpoint) interface{} }
type Runtime ¶
type Runtime interface { NeedRestart(runtime Runtime) bool WithDefaults() Validate() error Type() RuntimeType SetVersion(runtimeSpecificVersion string, versionProperties *string) error }
func NewRuntime ¶
func NewRuntime(runtime RuntimeType, runtimeSpec string) (Runtime, error)
type RuntimeType ¶
type RuntimeType string
type SampleableStorage ¶
type SampleableStorage interface { Storage TableSizeInBytes(table TableID) (uint64, error) LoadTopBottomSample(table TableDescription, pusher Pusher) error LoadRandomSample(table TableDescription, pusher Pusher) error LoadSampleBySet(table TableDescription, keySet []map[string]interface{}, pusher Pusher) error TableAccessible(table TableDescription) bool }
SampleableStorage is for dataplane tests
type ScheduledTask ¶
type ScheduledTask interface { Stop() Runtime() Runtime }
type SchemaStorage ¶
SchemaStorage allow to resolve DB Schema from storage
type ShardUploadParams ¶
Parallelism params
func DefaultShardUploadParams ¶
func DefaultShardUploadParams() *ShardUploadParams
func NewShardUploadParams ¶
func NewShardUploadParams(jobCount int, processCount int) *ShardUploadParams
type ShardableTask ¶
type ShardableTask interface { Task // contains filtered or unexported methods }
type ShardingContextStorage ¶
type ShardingContextStorage interface { // ShardingContext Return shared data, used on *MAIN* worker; // Take care, method return OperationState_ShardedUploadState, but only fill field Context; // Because type of Context is private, this is protoc thing; ShardingContext() ([]byte, error) // SetShardingContext for storage, used on *SECONDARY* worker SetShardingContext(shardedState []byte) error }
Storage has data, that need to be shared with all workers
type ShardingStorage ¶
type ShardingStorage interface {
ShardTable(ctx context.Context, table TableDescription) ([]TableDescription, error)
}
ShardingStorage is for in table sharding
type ShardingTaskRuntime ¶
type SinkOption ¶
TODO: Drop by making transformers a common middleware
func GetTransformers ¶
func GetTransformers(middlewareNames []string) ([]SinkOption, error)
type Sinker ¶
type Sinker interface { io.Closer // Push writes the given items into destination synchronously. If its result is nil, the items are considered to be successfully written to the destination. // The method must be retriable: it can be called again after it returns an error, except for fatal errors (these must be wrapped in a particular struct) Push(items []ChangeItem) error }
Sinker is the destination's data writer interface.
All its methods are guaranteed to be called non-concurrently (synchronously).
TODO: rename to Sink
type SlotKiller ¶
type SlotKiller interface {
KillSlot() error
}
func MakeStubSlotKiller ¶
func MakeStubSlotKiller() SlotKiller
type SnapshotableStorage ¶
type SourceReader ¶
type Start ¶
type Start struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Start) Visit ¶
func (t Start) Visit(v TaskVisitor) interface{}
func (Start) VisitRunnable ¶
func (t Start) VisitRunnable(v RunnableVisitor) interface{}
type Stop ¶
type Stop struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Stop) Visit ¶
func (t Stop) Visit(v TaskVisitor) interface{}
func (Stop) VisitRunnable ¶
func (t Stop) VisitRunnable(v RunnableVisitor) interface{}
type Storage ¶
type Storage interface { Closeable Ping() error LoadTable(ctx context.Context, table TableDescription, pusher Pusher) error TableSchema(ctx context.Context, table TableID) (*TableSchema, error) TableList(filter IncludeTableList) (TableMap, error) ExactTableRowsCount(table TableID) (uint64, error) EstimateTableRowsCount(table TableID) (uint64, error) TableExists(table TableID) (bool, error) }
Storage is for simple storage implementations For extra functionalities implement below storages.
type StubSlotKiller ¶
type StubSlotKiller struct { }
func (*StubSlotKiller) KillSlot ¶
func (k *StubSlotKiller) KillSlot() error
type TableColumns ¶
type TableColumns = changeitem.TableColumns
type TableDescription ¶
type TableDescription struct { Name string Schema string // for example - for mysql here are database name Filter WhereStatement EtaRow uint64 // estimated number of rows in the table Offset uint64 // offset (in rows) along the ordering key (not necessary primary key) }
func (*TableDescription) Fqtn ¶
func (t *TableDescription) Fqtn() string
func (*TableDescription) ID ¶
func (t *TableDescription) ID() TableID
func (*TableDescription) PartID ¶
func (t *TableDescription) PartID() string
func (*TableDescription) Same ¶
func (t *TableDescription) Same(table string) bool
func (*TableDescription) String ¶
func (t *TableDescription) String() string
type TableID ¶
type TableID = changeitem.TableID
var NonExistentTableID TableID = *NewTableID("", "")
func NewTableIDFromStringPg ¶
NewTableIDFromStringPg parses the given FQTN in PostgreSQL syntax to construct a TableID.
func ParseTableID ¶
func ParseTableIDs ¶
func TableIDsIntersection ¶
TableIDsIntersection returns an intersection of two lists of TableIDs
type TableInfo ¶
type TableInfo struct { EtaRow uint64 IsView bool Schema *TableSchema }
type TableMap ¶
func (*TableMap) ConvertToTableDescriptions ¶
func (m *TableMap) ConvertToTableDescriptions() []TableDescription
func (*TableMap) FakePkeyTables ¶
func (*TableMap) NoKeysTables ¶
func (*TableMap) ToDBSchema ¶
type TablePartID ¶
type TablePartID = changeitem.TablePartID
type TableSchema ¶
type TableSchema = changeitem.TableSchema
type TableSplitter ¶
type TableSplitter struct {
Columns []string
}
type TableUploadError ¶
type TableUploadError = dterrors.TableUploadError
type Task ¶
type Task interface { Visit(visitor TaskVisitor) interface{} // contains filtered or unexported methods }
type TaskType ¶
type TaskType struct {
Task
}
func (TaskType) Description ¶
func (TaskType) EncodeText ¶
func (TaskType) MarshalJSON ¶
func (*TaskType) UnmarshalJSON ¶
type TaskTypeName ¶
type TaskTypeName = string
type Termination ¶
type Termination struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (Termination) Visit ¶
func (t Termination) Visit(v TaskVisitor) interface{}
func (Termination) VisitFake ¶
func (t Termination) VisitFake(v FakeVisitor) interface{}
type TestEndpoint ¶
type TestEndpoint struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (TestEndpoint) HasTimedOut ¶
func (TestEndpoint) Visit ¶
func (t TestEndpoint) Visit(v TaskVisitor) interface{}
func (TestEndpoint) VisitRunnable ¶
func (t TestEndpoint) VisitRunnable(v RunnableVisitor) interface{}
type TestResult ¶
type TestResult struct { Checks map[CheckType]CheckResult Schema TableMap Preview map[TableID][]ChangeItem }
TestResult aggregated result of test for endpoint
func NewTestResult ¶
func NewTestResult(checks ...CheckType) *TestResult
func (*TestResult) Add ¶
func (t *TestResult) Add(extraChecks ...CheckType)
func (*TestResult) Err ¶
func (t *TestResult) Err() error
func (*TestResult) NotOk ¶
func (t *TestResult) NotOk(checkType CheckType, err error) *TestResult
func (*TestResult) Ok ¶
func (t *TestResult) Ok(checkType CheckType) *TestResult
type TimeoutableTask ¶
type TimestampCol ¶
type Transfer ¶
type Transfer interface { // Start the transfer. Retry endlessly when errors occur, until stopped with Stop(). // This method does not block, the work is done in the background. Start() // Stop the transfer. May be called multiple times even without prior Start() to clean // up external resources, e.g. terminate YT operations. Synchronous, i.e. blocks // until either all resources are released or an error occurrs. Stop() error Runtime() Runtime Error() error }
type TransferCreate ¶
type TransferCreate struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferCreate) Visit ¶
func (t TransferCreate) Visit(v TaskVisitor) interface{}
func (TransferCreate) VisitFake ¶
func (t TransferCreate) VisitFake(v FakeVisitor) interface{}
type TransferDelete ¶
type TransferDelete struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferDelete) Visit ¶
func (t TransferDelete) Visit(v TaskVisitor) interface{}
func (TransferDelete) VisitFake ¶
func (t TransferDelete) VisitFake(v FakeVisitor) interface{}
type TransferType ¶
type TransferType string
func (*TransferType) Expand ¶
func (t *TransferType) Expand() []TransferType
type TransferVersionFreeze ¶
type TransferVersionFreeze struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionFreeze) Visit ¶
func (t TransferVersionFreeze) Visit(v TaskVisitor) interface{}
func (TransferVersionFreeze) VisitFake ¶
func (t TransferVersionFreeze) VisitFake(v FakeVisitor) interface{}
type TransferVersionFreezeParams ¶
type TransferVersionFreezeParams struct {
CurrentVersion string `json:"current_version"`
}
type TransferVersionUnfreeze ¶
type TransferVersionUnfreeze struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionUnfreeze) Visit ¶
func (t TransferVersionUnfreeze) Visit(v TaskVisitor) interface{}
func (TransferVersionUnfreeze) VisitFake ¶
func (t TransferVersionUnfreeze) VisitFake(v FakeVisitor) interface{}
type TransferVersionUnfreezeParams ¶
type TransferVersionUnfreezeParams struct {
CurrentVersion string `json:"current_version"`
}
type TransferVersionUpdate ¶
type TransferVersionUpdate struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionUpdate) Visit ¶
func (t TransferVersionUpdate) Visit(v TaskVisitor) interface{}
func (TransferVersionUpdate) VisitFake ¶
func (t TransferVersionUpdate) VisitFake(v FakeVisitor) interface{}
type Transformation ¶
type Transformation interface { MakeSinkMiddleware() SinkOption AddTransformer(transformer Transformer) error RuntimeOpts() TransformationRuntimeOpts }
type TransformationRuntimeOpts ¶
type TransformationRuntimeOpts struct {
JobIndex int
}
type Transformer ¶
type Transformer interface { Apply(input []ChangeItem) TransformerResult Suitable(table TableID, schema *TableSchema) bool ResultSchema(original *TableSchema) (*TableSchema, error) Description() string Type() TransformerType }
type TransformerError ¶
type TransformerError struct { Input ChangeItem Error error }
type TransformerResult ¶
type TransformerResult struct { Transformed []ChangeItem Errors []TransformerError }
type TransformerType ¶
type TransformerType string
type TxBound ¶
type TxBound = changeitem.TxBound
type TypedChangeItem ¶
type TypedChangeItem ChangeItem
func (*TypedChangeItem) MarshalJSON ¶
func (t *TypedChangeItem) MarshalJSON() ([]byte, error)
func (*TypedChangeItem) UnmarshalJSON ¶
func (t *TypedChangeItem) UnmarshalJSON(data []byte) error
type TypedValue ¶
type UpdateTransfer ¶
type UpdateTransfer struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (UpdateTransfer) Visit ¶
func (t UpdateTransfer) Visit(v TaskVisitor) interface{}
func (UpdateTransfer) VisitRunnable ¶
func (t UpdateTransfer) VisitRunnable(v RunnableVisitor) interface{}
type UpdateTransferParams ¶
type UpdateTransferParams struct { OldObjects []string `json:"old_objects"` NewObjects []string `json:"new_objects"` }
func (UpdateTransferParams) AddedTables ¶
func (p UpdateTransferParams) AddedTables() ([]TableDescription, error)
type Upload ¶
type Upload struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Upload) Visit ¶
func (t Upload) Visit(v TaskVisitor) interface{}
func (Upload) VisitRunnable ¶
func (t Upload) VisitRunnable(v RunnableVisitor) interface{}
type Verify ¶
type Verify struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Verify) HasTimedOut ¶
func (Verify) Visit ¶
func (t Verify) Visit(v TaskVisitor) interface{}
func (Verify) VisitRunnable ¶
func (t Verify) VisitRunnable(v RunnableVisitor) interface{}
type WhereStatement ¶
type WhereStatement string
const NoFilter WhereStatement = WhereStatement("")
func FiltersIntersection ¶
func FiltersIntersection(a WhereStatement, b WhereStatement) WhereStatement
func NotStatement ¶
func NotStatement(a WhereStatement) WhereStatement
Source Files ¶
- async_sink.go
- change_item.go
- change_item_builders.go
- closeable.go
- committable.go
- errors.go
- filter.go
- homo_valuer.go
- includeable.go
- local_runtime.go
- metrics.go
- middleware.go
- model.go
- movable.go
- operations.go
- parsers.go
- provider_type.go
- regular_snapshot.go
- restore.go
- runtime.go
- sink.go
- slot_monitor.go
- source.go
- storage.go
- strictify.go
- task_type.go
- test_result.go
- transfer.go
- transfer_type.go
- transformer.go
- type.go
- typed_change_item.go
- validator.go