tasks

package
v0.0.0-rc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ResolveStorageErrorText string = "failed to resolve storage: %w"
	TableListErrorText      string = "failed to list tables and their schemas: %w"
)
View Source
const (
	CredentialsCheckType   = abstract.CheckType("credentials")
	DataSampleCheckType    = abstract.CheckType("data-sample")
	ConfigCheckType        = abstract.CheckType("config-valid-check")
	WriteableCheckType     = abstract.CheckType("writeable")
	LoadTablesCheckType    = abstract.CheckType("load-all-tables")
	EstimateTableCheckType = abstract.CheckType("estimate-table")
	LoadSampleCheckType    = abstract.CheckType("load-sample")
)
View Source
const MaxTableStatCount = 1000
View Source
const TablesFilterStateKey = "tables_filter"

Variables

View Source
var ErrNoActiveOperation = xerrors.NewSentinel("TM: missed operation id")
View Source
var NoTablesError = xerrors.New("Unable to find any tables")

Functions

func ActivateDelivery

func ActivateDelivery(ctx context.Context, task *model.TransferOperation, cp coordinator.Coordinator, transfer model.Transfer, registry metrics.Registry) error

func AddExtraTransformers

func AddExtraTransformers(ctx context.Context, transfer *model.Transfer, registry metrics.Registry) error

func AddTables

func AddTables(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, tables []string, registry metrics.Registry) error

func Checksum

func Checksum(transfer model.Transfer, lgr log.Logger, registry metrics.Registry, params *ChecksumParameters) error

func CleanupResource

func CleanupResource(ctx context.Context, task model.TransferOperation, transfer model.Transfer, logger log.Logger, cp coordinator.Coordinator) error

func CompareChecksum

func CompareChecksum(
	src abstract.SampleableStorage,
	dst abstract.SampleableStorage,
	tables []abstract.TableDescription,
	lgr log.Logger,
	registry metrics.Registry,
	equalDataTypes func(lDataType, rDataType string) bool,
	params *ChecksumParameters,
) error

func Deactivate

func Deactivate(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, registry metrics.Registry) error

func GetLeftTerminalSrcEndpoints

func GetLeftTerminalSrcEndpoints(cp coordinator.Coordinator, transfer model.Transfer) ([]model.Source, error)

func GetLeftTerminalTransfers

func GetLeftTerminalTransfers(cp coordinator.Coordinator, transfer model.Transfer) ([]*model.Transfer, error)

func GetRightTerminalDstEndpoints

func GetRightTerminalDstEndpoints(cp coordinator.Coordinator, transfer model.Transfer) ([]model.Destination, error)

func GetRightTerminalTransfers

func GetRightTerminalTransfers(cp coordinator.Coordinator, transfer model.Transfer) ([]*model.Transfer, error)

func IntersectFilter

func IntersectFilter(transfer *model.Transfer, basic base.DataObjectFilter) (base.DataObjectFilter, error)

func NewLoadProgress

func NewLoadProgress(workerIndex int, part *model.OperationTablePart, progressUpdateMutex *sync.Mutex) *loadProgress

func ObtainAllSrcTables

func ObtainAllSrcTables(transfer *model.Transfer, registry metrics.Registry) (abstract.TableMap, error)

ObtainAllSrcTables uses a temporary Storage for transfer source to obtain a list of tables

func PrepareSourceChecks

func PrepareSourceChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)

PrepareSourceChecks registers all source specific check. These checks also contain the providers specific checks returned by the ToTest method.

func PrepareTargetChecks

func PrepareTargetChecks(param *TestEndpointParams, tr *abstract.TestResult) (*abstract.TestResult, error)

PrepareTargetChecks registers all target specific check.

func RemoveTables

func RemoveTables(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, tables []string) error

func Reupload

func Reupload(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, registry metrics.Registry) error

func Run

func Run(ctx context.Context, task model.TransferOperation, command abstract.RunnableTask, cp coordinator.Coordinator, transfer model.Transfer, params interface{}, registry metrics.Registry) error

func SniffReplicationData

func SniffReplicationData(
	ctx context.Context,
	sniffer abstract.Fetchable,
	tr *abstract.TestResult,
	transfer *model.Transfer,
) *abstract.TestResult

func SniffSnapshotData

func SniffSnapshotData(ctx context.Context, tr *abstract.TestResult, transfer *model.Transfer) *abstract.TestResult

func StopJob

func StopJob(cp coordinator.Coordinator, transfer model.Transfer) error

func TestDestinationEndpoint

func TestDestinationEndpoint(ctx context.Context, param *TestEndpointParams, tr *abstract.TestResult) *abstract.TestResult

func TestTargetEndpoint

func TestTargetEndpoint(transfer *model.Transfer) error

func TransitReupload

func TransitReupload(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, registry metrics.Registry) error

TransitReupload is shitty method mainly for transfers with LB in the middle, same as TransitUpload

func TransitUpload

func TransitUpload(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task *model.TransferOperation, spec UploadSpec, registry metrics.Registry) error

TransitUpload is shitty method mainly for transfers with LB in the middle, so we could make @lupach happy and isolate crappy code in separate func.

func TransitionalAddTables

func TransitionalAddTables(ctx context.Context, cp coordinator.Coordinator, transfer model.Transfer, task model.TransferOperation, tables []string, registry metrics.Registry) error

TransitionalAddTables same as above

func UpdateTransfer

UpdateTransfer is system task, generated by transfer update for `Running` transfers with active replication `control plane` will prepare `UpdateDataObjectsParams` update transfer and start operation `data plane` for all new object we will generate homo-DDL and upload data this operation similar to `AddTables` but without endpoint mutation

func Upload

func VerifyDelivery

func VerifyDelivery(transfer model.Transfer, lgr log.Logger, registry metrics.Registry) error

Types

type ChecksumComparator

type ChecksumComparator func(lVal interface{}, lSchema abstract.ColSchema, rVal interface{}, rSchema abstract.ColSchema, intoArray bool) (comparable bool, result bool, err error)

type ChecksumParameters

type ChecksumParameters struct {
	TableSizeThreshold  int64
	Tables              []abstract.TableDescription
	PriorityComparators []ChecksumComparator
}

func (*ChecksumParameters) GetPriorityComparators

func (p *ChecksumParameters) GetPriorityComparators() []ChecksumComparator

func (*ChecksumParameters) GetTableSizeThreshold

func (p *ChecksumParameters) GetTableSizeThreshold() uint64

type EndpointParam

type EndpointParam struct {
	Type  abstract.ProviderType
	Param string
}

type SingleStorageSchema

type SingleStorageSchema interface {
	DatabaseSchema() string
}

type SnapshotLoader

type SnapshotLoader struct {
	// contains filtered or unexported fields
}

func NewSnapshotLoader

func NewSnapshotLoader(cp coordinator.Coordinator, operationID string, transfer *model.Transfer, registry metrics.Registry) *SnapshotLoader

func (*SnapshotLoader) CheckIncludeDirectives

func (l *SnapshotLoader) CheckIncludeDirectives(tables []abstract.TableDescription) error

func (*SnapshotLoader) CleanupSinker

func (l *SnapshotLoader) CleanupSinker(tables abstract.TableMap) error

CleanupSinker cleans up the sinker when non-incremental transfer is activated.

This method changes the sinker database' contents, thus it should be called only after the checks on source (that ensure a transfer is possible) are completed.

func (*SnapshotLoader) DoUploadTables

func (l *SnapshotLoader) DoUploadTables(ctx context.Context, source abstract.Storage, nextTablePartProvider TablePartProvider) error

func (*SnapshotLoader) GetLocalTablePartProvider

func (l *SnapshotLoader) GetLocalTablePartProvider(tables ...*model.OperationTablePart) TablePartProvider

func (*SnapshotLoader) GetRemoteTablePartProvider

func (l *SnapshotLoader) GetRemoteTablePartProvider() TablePartProvider

func (*SnapshotLoader) GetShardState

func (l *SnapshotLoader) GetShardState(ctx context.Context) error

func (*SnapshotLoader) GetShardedStateFromSource

func (l *SnapshotLoader) GetShardedStateFromSource(source interface{}) error

func (*SnapshotLoader) LoadSnapshot

func (l *SnapshotLoader) LoadSnapshot(ctx context.Context) error

func (*SnapshotLoader) NewServicePusher

func (l *SnapshotLoader) NewServicePusher() (abstract.Pusher, *util.Rollbacks, error)

NewServicePusher returns pusher for sink that provides sinker functionality for `UploadTables()` itself, but without middlewares. If no error returned by NewServicePusher you should defer Rollbacks.Do() to close created sink.

func (*SnapshotLoader) OperationStateExists

func (l *SnapshotLoader) OperationStateExists(ctx context.Context) (bool, error)

OperationStateExists returns true if the state of the operation of the given task exists (is not nil)

func (*SnapshotLoader) SetShardedStateToCP

func (l *SnapshotLoader) SetShardedStateToCP(logger log.Logger) error

func (*SnapshotLoader) SetShardedStateToSource

func (l *SnapshotLoader) SetShardedStateToSource(source interface{}) error

func (*SnapshotLoader) SplitTables

func (l *SnapshotLoader) SplitTables(
	ctx context.Context,
	logger log.Logger,
	tables []abstract.TableDescription,
	source abstract.Storage,
) ([]*model.OperationTablePart, error)

func (*SnapshotLoader) UploadTables

func (l *SnapshotLoader) UploadTables(ctx context.Context, tables []abstract.TableDescription, updateIncrementalState bool) error

func (*SnapshotLoader) UploadV2

func (l *SnapshotLoader) UploadV2(ctx context.Context, snapshotProvider base.SnapshotProvider, tables []abstract.TableDescription) error

func (*SnapshotLoader) WaitWorkersCompleted

func (l *SnapshotLoader) WaitWorkersCompleted(ctx context.Context, workersCount int) error

func (*SnapshotLoader) WaitWorkersInitiated

func (l *SnapshotLoader) WaitWorkersInitiated(ctx context.Context) error

type SnapshotTableMetricsTracker

type SnapshotTableMetricsTracker struct {
	// contains filtered or unexported fields
}

func NewNotShardedSnapshotTableMetricsTracker

func NewNotShardedSnapshotTableMetricsTracker(
	ctx context.Context,
	transfer *model.Transfer,
	registry metrics.Registry,
	parts []*model.OperationTablePart,
	progressUpdateMutex *sync.Mutex,
) (*SnapshotTableMetricsTracker, error)

func NewShardedSnapshotTableMetricsTracker

func NewShardedSnapshotTableMetricsTracker(
	ctx context.Context,
	transfer *model.Transfer,
	registry metrics.Registry,
	operationID string,
	cpClient coordinator.Coordinator,
) (*SnapshotTableMetricsTracker, error)

func (*SnapshotTableMetricsTracker) Close

func (t *SnapshotTableMetricsTracker) Close()

Close Safe to close few time, not thread safe; But safe to use with defer and standalone call in same time;

type SnapshotTableProgressTracker

type SnapshotTableProgressTracker struct {
	// contains filtered or unexported fields
}

func NewSnapshotTableProgressTracker

func NewSnapshotTableProgressTracker(
	ctx context.Context,
	operationID string,
	cpClient coordinator.Coordinator,
	progressUpdateMutex *sync.Mutex,
) *SnapshotTableProgressTracker

func (*SnapshotTableProgressTracker) Add

func (*SnapshotTableProgressTracker) AddGetProgress

func (t *SnapshotTableProgressTracker) AddGetProgress(part *model.OperationTablePart, progressFunc func())

AddGetProgress TODO: Remove, A2 thing

func (*SnapshotTableProgressTracker) Close

func (t *SnapshotTableProgressTracker) Close()

Close Safe to close few time, not thread safe; But safe to use with defer and standalone call in same time;

func (*SnapshotTableProgressTracker) Flush

func (t *SnapshotTableProgressTracker) Flush()

func (*SnapshotTableProgressTracker) RemoveGetProgress

func (t *SnapshotTableProgressTracker) RemoveGetProgress(part *model.OperationTablePart)

RemoveGetProgress TODO: Remove, A2 thing

type TablePartProvider

type TablePartProvider func() (*model.OperationTablePart, error)

type TestEndpointParams

type TestEndpointParams struct {
	Transfer             *model.Transfer
	TransformationConfig []byte
	ParamsSrc            *EndpointParam
	ParamsDst            *EndpointParam
}

type UploadSpec

type UploadSpec struct {
	Tables []abstract.TableDescription
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL