Documentation ¶
Index ¶
- Constants
- Variables
- func ActivateDelivery(ctx context.Context, task *server.TransferOperation, ...) error
- func AddExtraTransformers(ctx context.Context, transfer *server.Transfer, registry metrics.Registry) error
- func AddTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func Checksum(transfer server.Transfer, lgr log.Logger, registry metrics.Registry, ...) error
- func CleanupResource(ctx context.Context, task server.TransferOperation, transfer server.Transfer, ...) error
- func CompareChecksum(src abstract.SampleableStorage, dst abstract.SampleableStorage, ...) error
- func Deactivate(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func GetLeftTerminalSrcEndpoints(cp coordinator.Coordinator, transfer server.Transfer) ([]server.Source, error)
- func GetLeftTerminalTransfers(cp coordinator.Coordinator, transfer server.Transfer) ([]*server.Transfer, error)
- func GetRightTerminalDstEndpoints(cp coordinator.Coordinator, transfer server.Transfer) ([]server.Destination, error)
- func GetRightTerminalTransfers(cp coordinator.Coordinator, transfer server.Transfer) ([]*server.Transfer, error)
- func IntersectFilter(transfer *server.Transfer, basic base.DataObjectFilter) (base.DataObjectFilter, error)
- func NewLoadProgress(workerIndex int, part *server.OperationTablePart, ...) *loadProgress
- func ObtainAllSrcTables(transfer *server.Transfer, registry metrics.Registry) (abstract.TableMap, error)
- func PrepareSourceChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)
- func PrepareTargetChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)
- func RemoveTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func Reupload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func Run(ctx context.Context, task server.TransferOperation, ...) error
- func SniffReplicationData(ctx context.Context, sniffer abstract.Fetchable, tr *abstract.TestResult, ...) *abstract.TestResult
- func SniffSnapshotData(ctx context.Context, tr *abstract.TestResult, transfer *model.Transfer) *abstract.TestResult
- func StartJob(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func StopJob(cp coordinator.Coordinator, transfer server.Transfer) error
- func TestDestinationEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
- func TestEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
- func TestSourceEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
- func TestTargetEndpoint(transfer *model.Transfer) error
- func TransitReupload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func TransitUpload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func TransitionalAddTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func UpdateTransfer(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func Upload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, ...) error
- func VerifyDelivery(transfer server.Transfer, lgr log.Logger, registry metrics.Registry) error
- type ChecksumComparator
- type ChecksumParameters
- type EndpointParam
- type SingleStorageSchema
- type SnapshotLoader
- func (l *SnapshotLoader) CheckIncludeDirectives(tables []abstract.TableDescription) error
- func (l *SnapshotLoader) CleanupSinker(tables abstract.TableMap) error
- func (l *SnapshotLoader) DoUploadTables(ctx context.Context, source abstract.Storage, ...) error
- func (l *SnapshotLoader) GetLocalTablePartProvider(tables ...*server.OperationTablePart) TablePartProvider
- func (l *SnapshotLoader) GetRemoteTablePartProvider() TablePartProvider
- func (l *SnapshotLoader) GetShardState(ctx context.Context) error
- func (l *SnapshotLoader) GetShardedStateFromSource(source interface{}) error
- func (l *SnapshotLoader) LoadSnapshot(ctx context.Context) error
- func (l *SnapshotLoader) NewServicePusher() (abstract.Pusher, *util.Rollbacks, error)
- func (l *SnapshotLoader) OperationStateExists(ctx context.Context) (bool, error)
- func (l *SnapshotLoader) SetShardedStateToCP(logger log.Logger) error
- func (l *SnapshotLoader) SetShardedStateToSource(source interface{}) error
- func (l *SnapshotLoader) SplitTables(ctx context.Context, logger log.Logger, tables []abstract.TableDescription, ...) ([]*server.OperationTablePart, error)
- func (l *SnapshotLoader) UploadTables(ctx context.Context, tables []abstract.TableDescription, ...) error
- func (l *SnapshotLoader) UploadV2(ctx context.Context, snapshotProvider base.SnapshotProvider, ...) error
- func (l *SnapshotLoader) WaitWorkersCompleted(ctx context.Context, workersCount int) error
- func (l *SnapshotLoader) WaitWorkersInitiated(ctx context.Context) error
- type SnapshotTableMetricsTracker
- func NewNotShardedSnapshotTableMetricsTracker(ctx context.Context, transfer *server.Transfer, registry metrics.Registry, ...) (*SnapshotTableMetricsTracker, error)
- func NewShardedSnapshotTableMetricsTracker(ctx context.Context, transfer *server.Transfer, registry metrics.Registry, ...) (*SnapshotTableMetricsTracker, error)
- type SnapshotTableProgressTracker
- func (t *SnapshotTableProgressTracker) Add(part *server.OperationTablePart)
- func (t *SnapshotTableProgressTracker) AddGetProgress(part *server.OperationTablePart, progressFunc func())
- func (t *SnapshotTableProgressTracker) Close()
- func (t *SnapshotTableProgressTracker) Flush()
- func (t *SnapshotTableProgressTracker) RemoveGetProgress(part *server.OperationTablePart)
- type TablePartProvider
- type TestEndpointParams
- type UploadSpec
Constants ¶
const ( ResolveStorageErrorText string = "failed to resolve storage: %w" TableListErrorText string = "failed to list tables and their schemas: %w" )
const ( CredentialsCheckType = abstract.CheckType("credentials") DataSampleCheckType = abstract.CheckType("data-sample") ConfigCheckType = abstract.CheckType("config-valid-check") InitCheckType = abstract.CheckType("init-check") PingCheckType = abstract.CheckType("ping-check") WriteableCheckType = abstract.CheckType("writeable") LoadTablesCheckType = abstract.CheckType("load-all-tables") EstimateTableCheckType = abstract.CheckType("estimate-table") LoadSampleCheckType = abstract.CheckType("load-sample") )
const MaxTableStatCount = 1000
const TablesFilterStateKey = "tables_filter"
Variables ¶
var ErrNoActiveOperation = xerrors.NewSentinel("TM: missed operation id")
var NoTablesError = xerrors.New("Unable to find any tables")
Functions ¶
func ActivateDelivery ¶
func ActivateDelivery(ctx context.Context, task *server.TransferOperation, cp coordinator.Coordinator, transfer server.Transfer, registry metrics.Registry) error
func AddExtraTransformers ¶
func AddTables ¶
func AddTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, tables []string, registry metrics.Registry) error
func CleanupResource ¶
func CleanupResource(ctx context.Context, task server.TransferOperation, transfer server.Transfer, logger log.Logger, cp coordinator.Coordinator) error
func CompareChecksum ¶
func CompareChecksum( src abstract.SampleableStorage, dst abstract.SampleableStorage, tables []abstract.TableDescription, lgr log.Logger, registry metrics.Registry, equalDataTypes func(lDataType, rDataType string) bool, params *ChecksumParameters, ) error
func Deactivate ¶
func Deactivate(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, registry metrics.Registry) error
func GetLeftTerminalSrcEndpoints ¶
func GetLeftTerminalSrcEndpoints(cp coordinator.Coordinator, transfer server.Transfer) ([]server.Source, error)
func GetLeftTerminalTransfers ¶
func GetLeftTerminalTransfers(cp coordinator.Coordinator, transfer server.Transfer) ([]*server.Transfer, error)
func GetRightTerminalDstEndpoints ¶
func GetRightTerminalDstEndpoints(cp coordinator.Coordinator, transfer server.Transfer) ([]server.Destination, error)
func GetRightTerminalTransfers ¶
func GetRightTerminalTransfers(cp coordinator.Coordinator, transfer server.Transfer) ([]*server.Transfer, error)
func IntersectFilter ¶
func IntersectFilter(transfer *server.Transfer, basic base.DataObjectFilter) (base.DataObjectFilter, error)
func NewLoadProgress ¶
func NewLoadProgress(workerIndex int, part *server.OperationTablePart, progressUpdateMutex *sync.Mutex) *loadProgress
func ObtainAllSrcTables ¶
func ObtainAllSrcTables(transfer *server.Transfer, registry metrics.Registry) (abstract.TableMap, error)
ObtainAllSrcTables uses a temporary Storage for transfer source to obtain a list of tables
func PrepareSourceChecks ¶
func PrepareSourceChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)
PrepareSourceChecks registers all source specific check. These checks also contain the providers specific checks returned by the ToTest method.
func PrepareTargetChecks ¶
func PrepareTargetChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)
PrepareTargetChecks registers all target specific check.
func RemoveTables ¶
func RemoveTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, tables []string) error
func Reupload ¶
func Reupload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, registry metrics.Registry) error
func Run ¶
func Run(ctx context.Context, task server.TransferOperation, command abstract.RunnableTask, cp coordinator.Coordinator, transfer server.Transfer, params interface{}, registry metrics.Registry) error
func SniffReplicationData ¶
func SniffReplicationData( ctx context.Context, sniffer abstract.Fetchable, tr *abstract.TestResult, transfer *model.Transfer, ) *abstract.TestResult
func SniffSnapshotData ¶
func SniffSnapshotData(ctx context.Context, tr *abstract.TestResult, transfer *model.Transfer) *abstract.TestResult
func StartJob ¶
func StartJob(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task *server.TransferOperation) error
func StopJob ¶
func StopJob(cp coordinator.Coordinator, transfer server.Transfer) error
func TestDestinationEndpoint ¶
func TestDestinationEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
func TestEndpoint ¶
func TestEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
func TestSourceEndpoint ¶
func TestSourceEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult
func TestTargetEndpoint ¶
func TransitReupload ¶
func TransitReupload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, registry metrics.Registry) error
TransitReupload is shitty method mainly for transfers with LB in the middle, same as TransitUpload
func TransitUpload ¶
func TransitUpload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task *server.TransferOperation, spec UploadSpec, registry metrics.Registry) error
TransitUpload is shitty method mainly for transfers with LB in the middle, so we could make @lupach happy and isolate crappy code in separate func.
func TransitionalAddTables ¶
func TransitionalAddTables(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, tables []string, registry metrics.Registry) error
TransitionalAddTables same as above
func UpdateTransfer ¶
func UpdateTransfer(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task server.TransferOperation, registry metrics.Registry, objects abstract.UpdateTransferParams) error
UpdateTransfer is system task, generated by transfer update for `Running` transfers with active replication `control plane` will prepare `UpdateDataObjectsParams` update transfer and start operation `data plane` for all new object we will generate homo-DDL and upload data this operation similar to `AddTables` but without endpoint mutation
func Upload ¶
func Upload(ctx context.Context, cp coordinator.Coordinator, transfer server.Transfer, task *server.TransferOperation, spec UploadSpec, registry metrics.Registry) error
Types ¶
type ChecksumComparator ¶
type ChecksumParameters ¶
type ChecksumParameters struct { TableSizeThreshold int64 Tables []abstract.TableDescription PriorityComparators []ChecksumComparator }
func (*ChecksumParameters) GetPriorityComparators ¶
func (p *ChecksumParameters) GetPriorityComparators() []ChecksumComparator
func (*ChecksumParameters) GetTableSizeThreshold ¶
func (p *ChecksumParameters) GetTableSizeThreshold() uint64
type EndpointParam ¶
type EndpointParam struct { Type abstract.ProviderType Param string }
type SingleStorageSchema ¶
type SingleStorageSchema interface {
DatabaseSchema() string
}
type SnapshotLoader ¶
type SnapshotLoader struct {
// contains filtered or unexported fields
}
func NewSnapshotLoader ¶
func NewSnapshotLoader(cp coordinator.Coordinator, operationID string, transfer *server.Transfer, registry metrics.Registry) *SnapshotLoader
func (*SnapshotLoader) CheckIncludeDirectives ¶
func (l *SnapshotLoader) CheckIncludeDirectives(tables []abstract.TableDescription) error
func (*SnapshotLoader) CleanupSinker ¶
func (l *SnapshotLoader) CleanupSinker(tables abstract.TableMap) error
CleanupSinker cleans up the sinker when non-incremental transfer is activated.
This method changes the sinker database' contents, thus it should be called only after the checks on source (that ensure a transfer is possible) are completed.
func (*SnapshotLoader) DoUploadTables ¶
func (l *SnapshotLoader) DoUploadTables(ctx context.Context, source abstract.Storage, nextTablePartProvider TablePartProvider) error
func (*SnapshotLoader) GetLocalTablePartProvider ¶
func (l *SnapshotLoader) GetLocalTablePartProvider(tables ...*server.OperationTablePart) TablePartProvider
func (*SnapshotLoader) GetRemoteTablePartProvider ¶
func (l *SnapshotLoader) GetRemoteTablePartProvider() TablePartProvider
func (*SnapshotLoader) GetShardState ¶
func (l *SnapshotLoader) GetShardState(ctx context.Context) error
func (*SnapshotLoader) GetShardedStateFromSource ¶
func (l *SnapshotLoader) GetShardedStateFromSource(source interface{}) error
func (*SnapshotLoader) LoadSnapshot ¶
func (l *SnapshotLoader) LoadSnapshot(ctx context.Context) error
func (*SnapshotLoader) NewServicePusher ¶
NewServicePusher returns pusher for sink that provides sinker functionality for `UploadTables()` itself, but without middlewares. If no error returned by NewServicePusher you should defer Rollbacks.Do() to close created sink.
func (*SnapshotLoader) OperationStateExists ¶
func (l *SnapshotLoader) OperationStateExists(ctx context.Context) (bool, error)
OperationStateExists returns true if the state of the operation of the given task exists (is not nil)
func (*SnapshotLoader) SetShardedStateToCP ¶
func (l *SnapshotLoader) SetShardedStateToCP(logger log.Logger) error
func (*SnapshotLoader) SetShardedStateToSource ¶
func (l *SnapshotLoader) SetShardedStateToSource(source interface{}) error
func (*SnapshotLoader) SplitTables ¶
func (l *SnapshotLoader) SplitTables( ctx context.Context, logger log.Logger, tables []abstract.TableDescription, source abstract.Storage, ) ([]*server.OperationTablePart, error)
func (*SnapshotLoader) UploadTables ¶
func (l *SnapshotLoader) UploadTables(ctx context.Context, tables []abstract.TableDescription, updateIncrementalState bool) error
func (*SnapshotLoader) UploadV2 ¶
func (l *SnapshotLoader) UploadV2(ctx context.Context, snapshotProvider base.SnapshotProvider, tables []abstract.TableDescription) error
func (*SnapshotLoader) WaitWorkersCompleted ¶
func (l *SnapshotLoader) WaitWorkersCompleted(ctx context.Context, workersCount int) error
func (*SnapshotLoader) WaitWorkersInitiated ¶
func (l *SnapshotLoader) WaitWorkersInitiated(ctx context.Context) error
type SnapshotTableMetricsTracker ¶
type SnapshotTableMetricsTracker struct {
// contains filtered or unexported fields
}
func NewShardedSnapshotTableMetricsTracker ¶
func NewShardedSnapshotTableMetricsTracker( ctx context.Context, transfer *server.Transfer, registry metrics.Registry, operationID string, cpClient coordinator.Coordinator, ) (*SnapshotTableMetricsTracker, error)
func (*SnapshotTableMetricsTracker) Close ¶
func (t *SnapshotTableMetricsTracker) Close()
Close Safe to close few time, not thread safe; But safe to use with defer and standalone call in same time;
type SnapshotTableProgressTracker ¶
type SnapshotTableProgressTracker struct {
// contains filtered or unexported fields
}
func NewSnapshotTableProgressTracker ¶
func NewSnapshotTableProgressTracker( ctx context.Context, operationID string, cpClient coordinator.Coordinator, progressUpdateMutex *sync.Mutex, ) *SnapshotTableProgressTracker
func (*SnapshotTableProgressTracker) Add ¶
func (t *SnapshotTableProgressTracker) Add(part *server.OperationTablePart)
func (*SnapshotTableProgressTracker) AddGetProgress ¶
func (t *SnapshotTableProgressTracker) AddGetProgress(part *server.OperationTablePart, progressFunc func())
AddGetProgress TODO: Remove, A2 thing
func (*SnapshotTableProgressTracker) Close ¶
func (t *SnapshotTableProgressTracker) Close()
Close Safe to close few time, not thread safe; But safe to use with defer and standalone call in same time;
func (*SnapshotTableProgressTracker) Flush ¶
func (t *SnapshotTableProgressTracker) Flush()
func (*SnapshotTableProgressTracker) RemoveGetProgress ¶
func (t *SnapshotTableProgressTracker) RemoveGetProgress(part *server.OperationTablePart)
RemoveGetProgress TODO: Remove, A2 thing
type TablePartProvider ¶
type TablePartProvider func() (*server.OperationTablePart, error)
type TestEndpointParams ¶
type TestEndpointParams struct { Transfer *model.Transfer TransformationConfig []byte ParamsSrc *EndpointParam ParamsDst *EndpointParam }
type UploadSpec ¶
type UploadSpec struct {
Tables []abstract.TableDescription
}
Source Files ¶
- activate_delivery.go
- add_tables.go
- asynchronous_snapshot_state.go
- checksum.go
- cleanup_resource.go
- cleanup_sinker.go
- data_chain.go
- deactivate.go
- load_progress.go
- load_sharded_snapshot.go
- load_snapshot.go
- load_snapshot_incremental.go
- load_snapshot_v2.go
- remove_tables.go
- reupload.go
- snapshot_table_metrics_tracker.go
- snapshot_table_progress_tracker.go
- start_job.go
- stop_job.go
- task_visitor.go
- test_endpoint.go
- transformation.go
- transitional_upload.go
- update_transfer.go
- upload_tables.go
- verify_delivery.go