Documentation ¶
Index ¶
- Constants
- Variables
- func ReportFakePKey(cp Coordinator, transferID string, category string, ...) error
- type Coordinator
- type CoordinatorInMemory
- func (f *CoordinatorInMemory) GetTransferState(id string) (map[string]*TransferStateData, error)
- func (f *CoordinatorInMemory) Progress() []*model.OperationTablePart
- func (f *CoordinatorInMemory) RemoveTransferState(transferID string, stateKeys []string) error
- func (f *CoordinatorInMemory) SetTransferState(transferID string, state map[string]*TransferStateData) error
- func (f *CoordinatorInMemory) UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
- type CoordinatorNoOp
- func (f *CoordinatorNoOp) AssignOperationTablePart(operationID string, workerIndex int) (*model.OperationTablePart, error)
- func (f *CoordinatorNoOp) ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error)
- func (f *CoordinatorNoOp) CloseStatusMessagesForCategory(transferID string, category string) error
- func (f *CoordinatorNoOp) CloseStatusMessagesForTransfer(transferID string) error
- func (f *CoordinatorNoOp) CreateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
- func (f *CoordinatorNoOp) CreateOperationWorkers(operationID string, workersCount int) error
- func (f *CoordinatorNoOp) FailReplication(transferID string, err error) error
- func (f *CoordinatorNoOp) FinishOperation(taskID string, shardIndex int, taskErr error) error
- func (f *CoordinatorNoOp) GetEndpoint(transferID string, isSource bool) (model.EndpointParams, error)
- func (f *CoordinatorNoOp) GetEndpointTransfers(transferID string, isSource bool) ([]*model.Transfer, error)
- func (f *CoordinatorNoOp) GetOperationProgress(operationID string) (*model.AggregatedProgress, error)
- func (f *CoordinatorNoOp) GetOperationState(taskID string) (string, error)
- func (f *CoordinatorNoOp) GetOperationTablesParts(operationID string) ([]*model.OperationTablePart, error)
- func (f *CoordinatorNoOp) GetOperationWorkers(operationID string) ([]*model.OperationWorker, error)
- func (f *CoordinatorNoOp) GetOperationWorkersCount(operationID string, completed bool) (int, error)
- func (f *CoordinatorNoOp) GetTransferState(id string) (map[string]*TransferStateData, error)
- func (f *CoordinatorNoOp) GetTransfers(statuses []model.TransferStatus, transferID string) ([]*model.Transfer, error)
- func (f *CoordinatorNoOp) OpenStatusMessage(transferID string, category string, content *StatusMessage) error
- func (f *CoordinatorNoOp) OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error
- func (f *CoordinatorNoOp) RemoveTransferState(transferID string, stateKeys []string) error
- func (f *CoordinatorNoOp) SetOperationState(taskID string, state string) error
- func (f *CoordinatorNoOp) SetStatus(transferID string, status model.TransferStatus) error
- func (f *CoordinatorNoOp) SetTransferState(transferID string, state map[string]*TransferStateData) error
- func (f *CoordinatorNoOp) TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error
- func (f *CoordinatorNoOp) UpdateEndpoint(transferID string, endpoint model.EndpointParams) error
- func (f *CoordinatorNoOp) UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
- func (f *CoordinatorNoOp) UpdateTestResults(id string, request *abstract.TestResult) error
- func (f *CoordinatorNoOp) UploadTable(transferID string, tables []abstract.TableDescription) error
- type Editordeprecated
- type MysqlBinlogPositionState
- type MysqlGtidState
- type OperationState
- type OperationStatus
- type OraclePositionState
- type Progressable
- type Sharding
- type StatusMessage
- type StatusMessageProvider
- type StatusMessageType
- type TestReporter
- type TransferHeartbeat
- type TransferState
- type TransferStateData
- func (s *TransferStateData) GetGeneric() any
- func (s *TransferStateData) GetIncrementalTables() []abstract.TableDescription
- func (s *TransferStateData) GetMysqlBinlogPosition() *MysqlBinlogPositionState
- func (s *TransferStateData) GetMysqlGtid() *MysqlGtidState
- func (s *TransferStateData) GetOraclePosition() *OraclePositionState
- type TransferStatus
- type YtStaticPartState
Constants ¶
const ( WarningStatusMessageType = StatusMessageType("TYPE_WARNING") ErrorStatusMessageType = StatusMessageType("TYPE_ERROR") )
const FakePKeyStatusMessageCategory string = "fake_primary_key"
Variables ¶
var (
NoPKey = coded.Register("generic", "no_primary_key")
)
var OperationStateNotFoundError = xerrors.New("state is not found")
Functions ¶
func ReportFakePKey ¶
Types ¶
type Coordinator ¶
type Coordinator interface { Editor TransferStatus StatusMessageProvider TransferState TestReporter OperationStatus OperationState Sharding }
Coordinator is an interface encapsulating (remote) procedure calls of the Coordinator of the dataplane nodes. This is an interface of an internal transport facility. Coordinator implementation MUST be thread-safe.
type CoordinatorInMemory ¶
type CoordinatorInMemory struct { *CoordinatorNoOp // contains filtered or unexported fields }
func NewStatefulFakeClient ¶
func NewStatefulFakeClient() *CoordinatorInMemory
func (*CoordinatorInMemory) GetTransferState ¶
func (f *CoordinatorInMemory) GetTransferState(id string) (map[string]*TransferStateData, error)
func (*CoordinatorInMemory) Progress ¶
func (f *CoordinatorInMemory) Progress() []*model.OperationTablePart
func (*CoordinatorInMemory) RemoveTransferState ¶
func (f *CoordinatorInMemory) RemoveTransferState(transferID string, stateKeys []string) error
func (*CoordinatorInMemory) SetTransferState ¶
func (f *CoordinatorInMemory) SetTransferState(transferID string, state map[string]*TransferStateData) error
func (*CoordinatorInMemory) UpdateOperationTablesParts ¶
func (f *CoordinatorInMemory) UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
type CoordinatorNoOp ¶
type CoordinatorNoOp struct {
// contains filtered or unexported fields
}
func NewFakeClient ¶
func NewFakeClient() *CoordinatorNoOp
func NewFakeClientWithOpts ¶
func NewFakeClientWithOpts( getTransferState func(id string) (map[string]*TransferStateData, error), ) *CoordinatorNoOp
func (*CoordinatorNoOp) AssignOperationTablePart ¶
func (f *CoordinatorNoOp) AssignOperationTablePart(operationID string, workerIndex int) (*model.OperationTablePart, error)
func (*CoordinatorNoOp) ClearAssignedTablesParts ¶
func (*CoordinatorNoOp) CloseStatusMessagesForCategory ¶
func (f *CoordinatorNoOp) CloseStatusMessagesForCategory(transferID string, category string) error
func (*CoordinatorNoOp) CloseStatusMessagesForTransfer ¶
func (f *CoordinatorNoOp) CloseStatusMessagesForTransfer(transferID string) error
func (*CoordinatorNoOp) CreateOperationTablesParts ¶
func (f *CoordinatorNoOp) CreateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
func (*CoordinatorNoOp) CreateOperationWorkers ¶
func (f *CoordinatorNoOp) CreateOperationWorkers(operationID string, workersCount int) error
func (*CoordinatorNoOp) FailReplication ¶
func (f *CoordinatorNoOp) FailReplication(transferID string, err error) error
func (*CoordinatorNoOp) FinishOperation ¶
func (f *CoordinatorNoOp) FinishOperation(taskID string, shardIndex int, taskErr error) error
func (*CoordinatorNoOp) GetEndpoint ¶
func (f *CoordinatorNoOp) GetEndpoint(transferID string, isSource bool) (model.EndpointParams, error)
func (*CoordinatorNoOp) GetEndpointTransfers ¶
func (*CoordinatorNoOp) GetOperationProgress ¶
func (f *CoordinatorNoOp) GetOperationProgress(operationID string) (*model.AggregatedProgress, error)
func (*CoordinatorNoOp) GetOperationState ¶
func (f *CoordinatorNoOp) GetOperationState(taskID string) (string, error)
func (*CoordinatorNoOp) GetOperationTablesParts ¶
func (f *CoordinatorNoOp) GetOperationTablesParts(operationID string) ([]*model.OperationTablePart, error)
func (*CoordinatorNoOp) GetOperationWorkers ¶
func (f *CoordinatorNoOp) GetOperationWorkers(operationID string) ([]*model.OperationWorker, error)
func (*CoordinatorNoOp) GetOperationWorkersCount ¶
func (f *CoordinatorNoOp) GetOperationWorkersCount(operationID string, completed bool) (int, error)
func (*CoordinatorNoOp) GetTransferState ¶
func (f *CoordinatorNoOp) GetTransferState(id string) (map[string]*TransferStateData, error)
func (*CoordinatorNoOp) GetTransfers ¶
func (f *CoordinatorNoOp) GetTransfers(statuses []model.TransferStatus, transferID string) ([]*model.Transfer, error)
func (*CoordinatorNoOp) OpenStatusMessage ¶
func (f *CoordinatorNoOp) OpenStatusMessage(transferID string, category string, content *StatusMessage) error
func (*CoordinatorNoOp) OperationHealth ¶
func (*CoordinatorNoOp) RemoveTransferState ¶
func (f *CoordinatorNoOp) RemoveTransferState(transferID string, stateKeys []string) error
func (*CoordinatorNoOp) SetOperationState ¶
func (f *CoordinatorNoOp) SetOperationState(taskID string, state string) error
func (*CoordinatorNoOp) SetStatus ¶
func (f *CoordinatorNoOp) SetStatus(transferID string, status model.TransferStatus) error
func (*CoordinatorNoOp) SetTransferState ¶
func (f *CoordinatorNoOp) SetTransferState(transferID string, state map[string]*TransferStateData) error
func (*CoordinatorNoOp) TransferHealth ¶
func (f *CoordinatorNoOp) TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error
func (*CoordinatorNoOp) UpdateEndpoint ¶
func (f *CoordinatorNoOp) UpdateEndpoint(transferID string, endpoint model.EndpointParams) error
func (*CoordinatorNoOp) UpdateOperationTablesParts ¶
func (f *CoordinatorNoOp) UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
func (*CoordinatorNoOp) UpdateTestResults ¶
func (f *CoordinatorNoOp) UpdateTestResults(id string, request *abstract.TestResult) error
func (*CoordinatorNoOp) UploadTable ¶
func (f *CoordinatorNoOp) UploadTable(transferID string, tables []abstract.TableDescription) error
type Editor
deprecated
type Editor interface { // GetEndpointTransfers get all *other* linked transfer to either source or target of provider *transferID* GetEndpointTransfers(transferID string, isSource bool) ([]*model.Transfer, error) // GetTransfers return *related* transfers to *transferID* in desired statuses GetTransfers(statuses []model.TransferStatus, transferID string) ([]*model.Transfer, error) // GetEndpoint get source or target *server.EndpointParams* for provided *transferID* GetEndpoint(transferID string, isSource bool) (model.EndpointParams, error) // UpdateEndpoint update *server.EndpointParams* for provided *transferID* UpdateEndpoint(transferID string, endpoint model.EndpointParams) error }
Editor is obsolete interface to update entities on coordinator
Deprecated: do not rely on it anymore
type MysqlGtidState ¶
type OperationState ¶
type OperationState interface { // SetOperationState called by *main* worker to store coordinator info // for example: // postgres can store `pg_lsn` so secondary worker will read same data SetOperationState(taskID string, state string) error // GetOperationState called by *secondary* workers, so they know when to start and what to do GetOperationState(taskID string) (string, error) }
type OperationStatus ¶
type OperationStatus interface { // OperationHealth used by *main* and *secondary* workers to signal worker alive OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error // FinishOperation used by *main* and *secondary* workers to signal completeness of work // if worker had a failure `taskErr` will be filled up with this error FinishOperation(taskID string, shardIndex int, taskErr error) error }
type OraclePositionState ¶
type Progressable ¶
type Progressable interface {
Progress() []*model.OperationTablePart
}
Progressable is opt-in interface to show total progress over upload once completed
type Sharding ¶
type Sharding interface { // GetOperationProgress called by *main* worker to track progress over secondary workers GetOperationProgress(operationID string) (*model.AggregatedProgress, error) // CreateOperationWorkers init secondary workers state inside coordinator CreateOperationWorkers(operationID string, workersCount int) error // GetOperationWorkers return all secondary workers to *main* worker GetOperationWorkers(operationID string) ([]*model.OperationWorker, error) // GetOperationWorkersCount return number of registered secondary operation workers to *main* worker GetOperationWorkersCount(operationID string, completed bool) (int, error) // CreateOperationTablesParts store operation parts (or shards or splits). // each part is either full table, or some part of table defined by predicate // called by *main* worker CreateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error // GetOperationTablesParts return list of part needed to be uploaded // called by *secondary* workers GetOperationTablesParts(operationID string) ([]*model.OperationTablePart, error) // AssignOperationTablePart assign next part to defined *secondary* worker // each worker indexed, and use this index to assign parts // act as iterator. // if no more parts left - return `nil, nil` AssignOperationTablePart(operationID string, workerIndex int) (*model.OperationTablePart, error) // ClearAssignedTablesParts clear all assignments for worker // and return the number of table parts for which the assignment was cleared ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error) // UpdateOperationTablesParts update tables parts for operation // used to track more granular part progress // // Deprecated: used only in A2 UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error }
Sharding coordinate multiple worker for transfer operations transfer utilize MPP aproach, when we have a main (or leader) worker main worker coordinate secondary workers via single coordinator (API or remote storage) main worker is usually first instance of multi-node transfer operation worker
type StatusMessage ¶
type StatusMessageProvider ¶
type StatusMessageProvider interface { // OpenStatusMessage open new line of error status message // for example: add timeout error for SOURCE db category OpenStatusMessage(transferID string, category string, content *StatusMessage) error // CloseStatusMessagesForCategory close line of error status message for certain category // for example: close timeout error for SOURCE db (once connectivity reached) CloseStatusMessagesForCategory(transferID string, category string) error // CloseStatusMessagesForTransfer remove all error lines for all categories CloseStatusMessagesForTransfer(transferID string) error }
StatusMessageProvider set time-based status for particular transfer
type StatusMessageType ¶
type StatusMessageType string
type TestReporter ¶
type TestReporter interface { // UpdateTestResults save transfer test results, used by test operation UpdateTestResults(transferID string, request *abstract.TestResult) error }
type TransferHeartbeat ¶
type TransferState ¶
type TransferState interface { // GetTransferState return known transfer state GetTransferState(id string) (map[string]*TransferStateData, error) // SetTransferState set certain keys to transfer state. // it will add keys to exists state SetTransferState(transferID string, state map[string]*TransferStateData) error // RemoveTransferState remove certain state keys from state RemoveTransferState(transferID string, state []string) error }
TransferState is to manage transfer state, simple K-V structure
type TransferStateData ¶
type TransferStateData struct { // Generic is recommended way, you can put anything json serializable here Generic any // IncrementalTables store current cursor progress for incremental tables IncrementalTables []abstract.TableDescription // Obsolete states, per-db, do not add new OraclePosition *OraclePositionState MysqlGtid *MysqlGtidState MysqlBinlogPosition *MysqlBinlogPositionState YtStaticPart *YtStaticPartState }
TransferStateData contain transfer state, shared across retries / restarts can contain any generic information about transfer progress
func (*TransferStateData) GetGeneric ¶
func (s *TransferStateData) GetGeneric() any
func (*TransferStateData) GetIncrementalTables ¶
func (s *TransferStateData) GetIncrementalTables() []abstract.TableDescription
func (*TransferStateData) GetMysqlBinlogPosition ¶
func (s *TransferStateData) GetMysqlBinlogPosition() *MysqlBinlogPositionState
func (*TransferStateData) GetMysqlGtid ¶
func (s *TransferStateData) GetMysqlGtid() *MysqlGtidState
func (*TransferStateData) GetOraclePosition ¶
func (s *TransferStateData) GetOraclePosition() *OraclePositionState
type TransferStatus ¶
type TransferStatus interface { // SetStatus move transfer to certain status SetStatus(transferID string, status model.TransferStatus) error // FailReplication stop replication with error FailReplication(transferID string, err error) error // TransferHealth add heartbeat for replication instance TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error }
TransferStatus main coordinator interface used by transfer just to start / stop transfer and move it via transfer workflow