Documentation ¶
Index ¶
- Constants
- Variables
- func ReportFakePKey(cp Coordinator, transferID string, category string, ...) error
- type Coordinator
- type Editordeprecated
- type FakeClient
- func (f *FakeClient) AssignOperationTablePart(operationID string, workerIndex int) (*server.OperationTablePart, error)
- func (f *FakeClient) ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error)
- func (f *FakeClient) CloseStatusMessagesForCategory(transferID string, category string) error
- func (f *FakeClient) CloseStatusMessagesForTransfer(transferID string) error
- func (f *FakeClient) CreateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
- func (f *FakeClient) CreateOperationWorkers(operationID string, workersCount int) error
- func (f *FakeClient) FailReplication(transferID string, err error) error
- func (f *FakeClient) FinishOperation(taskID string, shardIndex int, taskErr error) error
- func (f *FakeClient) GetEndpoint(transferID string, isSource bool) (server.EndpointParams, error)
- func (f *FakeClient) GetEndpointTransfers(transferID string, isSource bool) ([]*server.Transfer, error)
- func (f *FakeClient) GetOperationProgress(operationID string) (*server.AggregatedProgress, error)
- func (f *FakeClient) GetOperationState(taskID string) (string, error)
- func (f *FakeClient) GetOperationTablesParts(operationID string) ([]*server.OperationTablePart, error)
- func (f *FakeClient) GetOperationWorkers(operationID string) ([]*server.OperationWorker, error)
- func (f *FakeClient) GetOperationWorkersCount(operationID string, completed bool) (int, error)
- func (f *FakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)
- func (f *FakeClient) GetTransfers(statuses []server.TransferStatus, transferID string) ([]*server.Transfer, error)
- func (f *FakeClient) OpenStatusMessage(transferID string, category string, content *StatusMessage) error
- func (f *FakeClient) OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error
- func (f *FakeClient) RemoveTransferState(transferID string, stateKeys []string) error
- func (f *FakeClient) SetOperationState(taskID string, state string) error
- func (f *FakeClient) SetStatus(transferID string, status server.TransferStatus) error
- func (f *FakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error
- func (f *FakeClient) TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error
- func (f *FakeClient) UpdateEndpoint(transferID string, endpoint server.EndpointParams) error
- func (f *FakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
- func (f *FakeClient) UpdateTestResults(id string, request *abstract.TestResult) error
- func (f *FakeClient) UploadTable(transferID string, tables []abstract.TableDescription) error
- type MysqlBinlogPositionState
- type MysqlGtidState
- type OperationState
- type OperationStatus
- type OraclePositionState
- type Sharding
- type StatefulFakeClient
- func (f *StatefulFakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)
- func (f *StatefulFakeClient) Progres() []*server.OperationTablePart
- func (f *StatefulFakeClient) RemoveTransferState(transferID string, stateKeys []string) error
- func (f *StatefulFakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error
- func (f *StatefulFakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
- type StatusMessage
- type StatusMessageProvider
- type StatusMessageType
- type TestReporter
- type TransferHeartbeat
- type TransferState
- type TransferStateData
- func (s *TransferStateData) GetGeneric() any
- func (s *TransferStateData) GetIncrementalTables() []abstract.TableDescription
- func (s *TransferStateData) GetMysqlBinlogPosition() *MysqlBinlogPositionState
- func (s *TransferStateData) GetMysqlGtid() *MysqlGtidState
- func (s *TransferStateData) GetOraclePosition() *OraclePositionState
- type TransferStatus
- type YtStaticPartState
Constants ¶
const ( WarningStatusMessageType = StatusMessageType("TYPE_WARNING") ErrorStatusMessageType = StatusMessageType("TYPE_ERROR") )
const FakePKeyStatusMessageCategory string = "fake_primary_key"
Variables ¶
var (
NoPKey = coded.Register("generic", "no_primary_key")
)
var OperationStateNotFoundError = xerrors.New("state is not found")
Functions ¶
func ReportFakePKey ¶
Types ¶
type Coordinator ¶
type Coordinator interface { Editor TransferStatus StatusMessageProvider TransferState TestReporter OperationStatus OperationState Sharding }
Coordinator is an interface encapsulating (remote) procedure calls of the Coordinator of the dataplane nodes. This is an interface of an internal transport facility. Coordinator implementation MUST be thread-safe.
type Editor
deprecated
type Editor interface { // GetEndpointTransfers get all *other* linked transfer to either source or target of provider *transferID* GetEndpointTransfers(transferID string, isSource bool) ([]*server.Transfer, error) // GetTransfers return *related* transfers to *transferID* in desired statuses GetTransfers(statuses []server.TransferStatus, transferID string) ([]*server.Transfer, error) // GetEndpoint get source or target *server.EndpointParams* for provided *transferID* GetEndpoint(transferID string, isSource bool) (server.EndpointParams, error) // UpdateEndpoint update *server.EndpointParams* for provided *transferID* UpdateEndpoint(transferID string, endpoint server.EndpointParams) error }
Editor is obsolete interface to update entities on coordinator
Deprecated: do not rely on it anymore
type FakeClient ¶
type FakeClient struct {
// contains filtered or unexported fields
}
func NewFakeClient ¶
func NewFakeClient() *FakeClient
func NewFakeClientWithOpts ¶
func NewFakeClientWithOpts( getTransferState func(id string) (map[string]*TransferStateData, error), ) *FakeClient
func (*FakeClient) AssignOperationTablePart ¶
func (f *FakeClient) AssignOperationTablePart(operationID string, workerIndex int) (*server.OperationTablePart, error)
func (*FakeClient) ClearAssignedTablesParts ¶
func (*FakeClient) CloseStatusMessagesForCategory ¶
func (f *FakeClient) CloseStatusMessagesForCategory(transferID string, category string) error
func (*FakeClient) CloseStatusMessagesForTransfer ¶
func (f *FakeClient) CloseStatusMessagesForTransfer(transferID string) error
func (*FakeClient) CreateOperationTablesParts ¶
func (f *FakeClient) CreateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
func (*FakeClient) CreateOperationWorkers ¶
func (f *FakeClient) CreateOperationWorkers(operationID string, workersCount int) error
func (*FakeClient) FailReplication ¶
func (f *FakeClient) FailReplication(transferID string, err error) error
func (*FakeClient) FinishOperation ¶
func (f *FakeClient) FinishOperation(taskID string, shardIndex int, taskErr error) error
func (*FakeClient) GetEndpoint ¶
func (f *FakeClient) GetEndpoint(transferID string, isSource bool) (server.EndpointParams, error)
func (*FakeClient) GetEndpointTransfers ¶
func (*FakeClient) GetOperationProgress ¶
func (f *FakeClient) GetOperationProgress(operationID string) (*server.AggregatedProgress, error)
func (*FakeClient) GetOperationState ¶
func (f *FakeClient) GetOperationState(taskID string) (string, error)
func (*FakeClient) GetOperationTablesParts ¶
func (f *FakeClient) GetOperationTablesParts(operationID string) ([]*server.OperationTablePart, error)
func (*FakeClient) GetOperationWorkers ¶
func (f *FakeClient) GetOperationWorkers(operationID string) ([]*server.OperationWorker, error)
func (*FakeClient) GetOperationWorkersCount ¶
func (f *FakeClient) GetOperationWorkersCount(operationID string, completed bool) (int, error)
func (*FakeClient) GetTransferState ¶
func (f *FakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)
func (*FakeClient) GetTransfers ¶
func (f *FakeClient) GetTransfers(statuses []server.TransferStatus, transferID string) ([]*server.Transfer, error)
func (*FakeClient) OpenStatusMessage ¶
func (f *FakeClient) OpenStatusMessage(transferID string, category string, content *StatusMessage) error
func (*FakeClient) OperationHealth ¶
func (*FakeClient) RemoveTransferState ¶
func (f *FakeClient) RemoveTransferState(transferID string, stateKeys []string) error
func (*FakeClient) SetOperationState ¶
func (f *FakeClient) SetOperationState(taskID string, state string) error
func (*FakeClient) SetStatus ¶
func (f *FakeClient) SetStatus(transferID string, status server.TransferStatus) error
func (*FakeClient) SetTransferState ¶
func (f *FakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error
func (*FakeClient) TransferHealth ¶
func (f *FakeClient) TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error
func (*FakeClient) UpdateEndpoint ¶
func (f *FakeClient) UpdateEndpoint(transferID string, endpoint server.EndpointParams) error
func (*FakeClient) UpdateOperationTablesParts ¶
func (f *FakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
func (*FakeClient) UpdateTestResults ¶
func (f *FakeClient) UpdateTestResults(id string, request *abstract.TestResult) error
func (*FakeClient) UploadTable ¶
func (f *FakeClient) UploadTable(transferID string, tables []abstract.TableDescription) error
type MysqlGtidState ¶
type OperationState ¶
type OperationState interface { // SetOperationState called by *main* worker to store coordinator info // for example: // postgres can store `pg_lsn` so secondary worker will read same data SetOperationState(taskID string, state string) error // GetOperationState called by *secondary* workers, so they know when to start and what to do GetOperationState(taskID string) (string, error) }
type OperationStatus ¶
type OperationStatus interface { // OperationHealth used by *main* and *secondary* workers to signal worker alive OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error // FinishOperation used by *main* and *secondary* workers to signal completeness of work // if worker had a failure `taskErr` will be filled up with this error FinishOperation(taskID string, shardIndex int, taskErr error) error }
type OraclePositionState ¶
type Sharding ¶
type Sharding interface { // GetOperationProgress called by *main* worker to track progress over secondary workers GetOperationProgress(operationID string) (*server.AggregatedProgress, error) // CreateOperationWorkers init secondary workers state inside coordinator CreateOperationWorkers(operationID string, workersCount int) error // GetOperationWorkers return all secondary workers to *main* worker GetOperationWorkers(operationID string) ([]*server.OperationWorker, error) // GetOperationWorkersCount return number of registered secondary operation workers to *main* worker GetOperationWorkersCount(operationID string, completed bool) (int, error) // CreateOperationTablesParts store operation parts (or shards or splits). // each part is either full table, or some part of table defined by predicate // called by *main* worker CreateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error // GetOperationTablesParts return list of part needed to be uploaded // called by *secondary* workers GetOperationTablesParts(operationID string) ([]*server.OperationTablePart, error) // AssignOperationTablePart assign next part to defined *secondary* worker // each worker indexed, and use this index to assign parts // act as iterator. // if no more parts left - return `nil, nil` AssignOperationTablePart(operationID string, workerIndex int) (*server.OperationTablePart, error) // ClearAssignedTablesParts clear all assignments for worker // and return the number of table parts for which the assignment was cleared ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error) // UpdateOperationTablesParts update tables parts for operation // used to track more granular part progress // // Deprecated: used only in A2 UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error }
Sharding coordinate multiple worker for transfer operations transfer utilize MPP aproach, when we have a main (or leader) worker main worker coordinate secondary workers via single coordinator (API or remote storage) main worker is usually first instance of multi-node transfer operation worker
type StatefulFakeClient ¶
type StatefulFakeClient struct { *FakeClient // contains filtered or unexported fields }
func NewStatefulFakeClient ¶
func NewStatefulFakeClient() *StatefulFakeClient
func (*StatefulFakeClient) GetTransferState ¶
func (f *StatefulFakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)
func (*StatefulFakeClient) Progres ¶
func (f *StatefulFakeClient) Progres() []*server.OperationTablePart
func (*StatefulFakeClient) RemoveTransferState ¶
func (f *StatefulFakeClient) RemoveTransferState(transferID string, stateKeys []string) error
func (*StatefulFakeClient) SetTransferState ¶
func (f *StatefulFakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error
func (*StatefulFakeClient) UpdateOperationTablesParts ¶
func (f *StatefulFakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
type StatusMessage ¶
type StatusMessageProvider ¶
type StatusMessageProvider interface { // OpenStatusMessage open new line of error status message // for example: add timeout error for SOURCE db category OpenStatusMessage(transferID string, category string, content *StatusMessage) error // CloseStatusMessagesForCategory close line of error status message for certain category // for example: close timeout error for SOURCE db (once connectivity reached) CloseStatusMessagesForCategory(transferID string, category string) error // CloseStatusMessagesForTransfer remove all error lines for all categories CloseStatusMessagesForTransfer(transferID string) error }
StatusMessageProvider set time-based status for particular transfer
type StatusMessageType ¶
type StatusMessageType string
type TestReporter ¶
type TestReporter interface { // UpdateTestResults save transfer test results, used by test operation UpdateTestResults(transferID string, request *abstract.TestResult) error }
type TransferHeartbeat ¶
type TransferState ¶
type TransferState interface { // GetTransferState return known transfer state GetTransferState(id string) (map[string]*TransferStateData, error) // SetTransferState set certain keys to transfer state. // it will add keys to exists state SetTransferState(transferID string, state map[string]*TransferStateData) error // RemoveTransferState remove certain state keys from state RemoveTransferState(transferID string, state []string) error }
TransferState is to manage transfer state, simple K-V structure
type TransferStateData ¶
type TransferStateData struct { // Generic is recommended way, you can put anything json serializable here Generic any // IncrementalTables store current cursor progress for incremental tables IncrementalTables []abstract.TableDescription // Obsolete states, per-db, do not add new OraclePosition *OraclePositionState MysqlGtid *MysqlGtidState MysqlBinlogPosition *MysqlBinlogPositionState YtStaticPart *YtStaticPartState }
TransferStateData contain transfer state, shared across retries / restarts can contain any generic information about transfer progress
func (*TransferStateData) GetGeneric ¶
func (s *TransferStateData) GetGeneric() any
func (*TransferStateData) GetIncrementalTables ¶
func (s *TransferStateData) GetIncrementalTables() []abstract.TableDescription
func (*TransferStateData) GetMysqlBinlogPosition ¶
func (s *TransferStateData) GetMysqlBinlogPosition() *MysqlBinlogPositionState
func (*TransferStateData) GetMysqlGtid ¶
func (s *TransferStateData) GetMysqlGtid() *MysqlGtidState
func (*TransferStateData) GetOraclePosition ¶
func (s *TransferStateData) GetOraclePosition() *OraclePositionState
type TransferStatus ¶
type TransferStatus interface { // SetStatus move transfer to certain status SetStatus(transferID string, status server.TransferStatus) error // FailReplication stop replication with error FailReplication(transferID string, err error) error // TransferHealth add heartbeat for replication instance TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error }
TransferStatus main coordinator interface used by transfer just to start / stop transfer and move it via transfer workflow