coordinator

package
v0.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WarningStatusMessageType = StatusMessageType("TYPE_WARNING")
	ErrorStatusMessageType   = StatusMessageType("TYPE_ERROR")
)
View Source
const FakePKeyStatusMessageCategory string = "fake_primary_key"

Variables

View Source
var (
	NoPKey = coded.Register("generic", "no_primary_key")
)
View Source
var OperationStateNotFoundError = xerrors.New("state is not found")

Functions

func ReportFakePKey

func ReportFakePKey(cp Coordinator, transferID string, category string, fakePkeyTables []abstract.TableID) error

Types

type Coordinator

Coordinator is an interface encapsulating (remote) procedure calls of the Coordinator of the dataplane nodes. This is an interface of an internal transport facility. Coordinator implementation MUST be thread-safe.

type Editor deprecated

type Editor interface {
	// GetEndpointTransfers get all *other* linked transfer to either source or target of provider *transferID*
	GetEndpointTransfers(transferID string, isSource bool) ([]*server.Transfer, error)
	// GetTransfers return *related* transfers to *transferID* in desired statuses
	GetTransfers(statuses []server.TransferStatus, transferID string) ([]*server.Transfer, error)
	// GetEndpoint get source or target *server.EndpointParams* for provided *transferID*
	GetEndpoint(transferID string, isSource bool) (server.EndpointParams, error)
	// UpdateEndpoint update *server.EndpointParams* for provided *transferID*
	UpdateEndpoint(transferID string, endpoint server.EndpointParams) error
}

Editor is obsolete interface to update entities on coordinator

Deprecated: do not rely on it anymore

type FakeClient

type FakeClient struct {
	// contains filtered or unexported fields
}

func NewFakeClient

func NewFakeClient() *FakeClient

func NewFakeClientWithOpts

func NewFakeClientWithOpts(
	getTransferState func(id string) (map[string]*TransferStateData, error),
) *FakeClient

func (*FakeClient) AssignOperationTablePart

func (f *FakeClient) AssignOperationTablePart(operationID string, workerIndex int) (*server.OperationTablePart, error)

func (*FakeClient) ClearAssignedTablesParts

func (f *FakeClient) ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error)

func (*FakeClient) CloseStatusMessagesForCategory

func (f *FakeClient) CloseStatusMessagesForCategory(transferID string, category string) error

func (*FakeClient) CloseStatusMessagesForTransfer

func (f *FakeClient) CloseStatusMessagesForTransfer(transferID string) error

func (*FakeClient) CreateOperationTablesParts

func (f *FakeClient) CreateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error

func (*FakeClient) CreateOperationWorkers

func (f *FakeClient) CreateOperationWorkers(operationID string, workersCount int) error

func (*FakeClient) FailReplication

func (f *FakeClient) FailReplication(transferID string, err error) error

func (*FakeClient) FinishOperation

func (f *FakeClient) FinishOperation(taskID string, shardIndex int, taskErr error) error

func (*FakeClient) GetEndpoint

func (f *FakeClient) GetEndpoint(transferID string, isSource bool) (server.EndpointParams, error)

func (*FakeClient) GetEndpointTransfers

func (f *FakeClient) GetEndpointTransfers(transferID string, isSource bool) ([]*server.Transfer, error)

func (*FakeClient) GetOperationProgress

func (f *FakeClient) GetOperationProgress(operationID string) (*server.AggregatedProgress, error)

func (*FakeClient) GetOperationState

func (f *FakeClient) GetOperationState(taskID string) (string, error)

func (*FakeClient) GetOperationTablesParts

func (f *FakeClient) GetOperationTablesParts(operationID string) ([]*server.OperationTablePart, error)

func (*FakeClient) GetOperationWorkers

func (f *FakeClient) GetOperationWorkers(operationID string) ([]*server.OperationWorker, error)

func (*FakeClient) GetOperationWorkersCount

func (f *FakeClient) GetOperationWorkersCount(operationID string, completed bool) (int, error)

func (*FakeClient) GetTransferState

func (f *FakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)

func (*FakeClient) GetTransfers

func (f *FakeClient) GetTransfers(statuses []server.TransferStatus, transferID string) ([]*server.Transfer, error)

func (*FakeClient) OpenStatusMessage

func (f *FakeClient) OpenStatusMessage(transferID string, category string, content *StatusMessage) error

func (*FakeClient) OperationHealth

func (f *FakeClient) OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error

func (*FakeClient) RemoveTransferState

func (f *FakeClient) RemoveTransferState(transferID string, stateKeys []string) error

func (*FakeClient) SetOperationState

func (f *FakeClient) SetOperationState(taskID string, state string) error

func (*FakeClient) SetStatus

func (f *FakeClient) SetStatus(transferID string, status server.TransferStatus) error

func (*FakeClient) SetTransferState

func (f *FakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error

func (*FakeClient) TransferHealth

func (f *FakeClient) TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error

func (*FakeClient) UpdateEndpoint

func (f *FakeClient) UpdateEndpoint(transferID string, endpoint server.EndpointParams) error

func (*FakeClient) UpdateOperationTablesParts

func (f *FakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error

func (*FakeClient) UpdateTestResults

func (f *FakeClient) UpdateTestResults(id string, request *abstract.TestResult) error

func (*FakeClient) UploadTable

func (f *FakeClient) UploadTable(transferID string, tables []abstract.TableDescription) error

type MysqlBinlogPositionState

type MysqlBinlogPositionState struct {
	File     string
	Position int64
}

type MysqlGtidState

type MysqlGtidState struct {
	Gtid   string
	Flavor string
}

type OperationState

type OperationState interface {
	// SetOperationState called by *main* worker to store coordinator info
	// for example:
	// 		postgres can store `pg_lsn` so secondary worker will read same data
	SetOperationState(taskID string, state string) error
	// GetOperationState called by *secondary* workers, so they know when to start and what to do
	GetOperationState(taskID string) (string, error)
}

type OperationStatus

type OperationStatus interface {
	// OperationHealth used by *main* and *secondary* workers to signal worker alive
	OperationHealth(ctx context.Context, operationID string, workerIndex int, workerTime time.Time) error
	// FinishOperation used by *main* and *secondary* workers to signal completeness of work
	// if worker had a failure `taskErr` will be filled up with this error
	FinishOperation(taskID string, shardIndex int, taskErr error) error
}

type OraclePositionState

type OraclePositionState struct {
	Scn       uint64
	RsID      string
	SSN       uint64
	Type      string
	Timestamp time.Time
}

type Sharding

type Sharding interface {
	// GetOperationProgress called by *main* worker to track progress over secondary workers
	GetOperationProgress(operationID string) (*server.AggregatedProgress, error)
	// CreateOperationWorkers init secondary workers state inside coordinator
	CreateOperationWorkers(operationID string, workersCount int) error
	// GetOperationWorkers return all secondary workers to *main* worker
	GetOperationWorkers(operationID string) ([]*server.OperationWorker, error)
	// GetOperationWorkersCount return number of registered secondary operation workers to *main* worker
	GetOperationWorkersCount(operationID string, completed bool) (int, error)
	// CreateOperationTablesParts store operation parts (or shards or splits).
	// each part is either full table, or some part of table defined by predicate
	// called by *main* worker
	CreateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
	// GetOperationTablesParts return list of part needed to be uploaded
	// called by *secondary* workers
	GetOperationTablesParts(operationID string) ([]*server.OperationTablePart, error)
	// AssignOperationTablePart assign next part to defined *secondary* worker
	// each worker indexed, and use this index to assign parts
	// act as iterator.
	// if no more parts left - return `nil, nil`
	AssignOperationTablePart(operationID string, workerIndex int) (*server.OperationTablePart, error)
	// ClearAssignedTablesParts clear all assignments for worker
	// and return the number of table parts for which the assignment was cleared
	ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error)
	// UpdateOperationTablesParts update tables parts for operation
	// used to track more granular part progress
	//
	// Deprecated: used only in A2
	UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error
}

Sharding coordinate multiple worker for transfer operations transfer utilize MPP aproach, when we have a main (or leader) worker main worker coordinate secondary workers via single coordinator (API or remote storage) main worker is usually first instance of multi-node transfer operation worker

type StatefulFakeClient

type StatefulFakeClient struct {
	*FakeClient
	// contains filtered or unexported fields
}

func NewStatefulFakeClient

func NewStatefulFakeClient() *StatefulFakeClient

func (*StatefulFakeClient) GetTransferState

func (f *StatefulFakeClient) GetTransferState(id string) (map[string]*TransferStateData, error)

func (*StatefulFakeClient) Progres

func (*StatefulFakeClient) RemoveTransferState

func (f *StatefulFakeClient) RemoveTransferState(transferID string, stateKeys []string) error

func (*StatefulFakeClient) SetTransferState

func (f *StatefulFakeClient) SetTransferState(transferID string, state map[string]*TransferStateData) error

func (*StatefulFakeClient) UpdateOperationTablesParts

func (f *StatefulFakeClient) UpdateOperationTablesParts(operationID string, tables []*server.OperationTablePart) error

type StatusMessage

type StatusMessage struct {
	Type       StatusMessageType `json:"type"`
	Heading    string            `json:"heading"`
	Message    string            `json:"message"`
	Categories []string          `json:"categories"`
	Code       coded.Code        `json:"code"`
	ID         string            `json:"id"`
}

type StatusMessageProvider

type StatusMessageProvider interface {
	// OpenStatusMessage open new line of error status message
	// for example: add timeout error for SOURCE db category
	OpenStatusMessage(transferID string, category string, content *StatusMessage) error
	// CloseStatusMessagesForCategory close line of error status message for certain category
	// for example: close timeout error for SOURCE db (once connectivity reached)
	CloseStatusMessagesForCategory(transferID string, category string) error
	// CloseStatusMessagesForTransfer remove all error lines for all categories
	CloseStatusMessagesForTransfer(transferID string) error
}

StatusMessageProvider set time-based status for particular transfer

type StatusMessageType

type StatusMessageType string

type TestReporter

type TestReporter interface {
	// UpdateTestResults save transfer test results, used by test operation
	UpdateTestResults(transferID string, request *abstract.TestResult) error
}

type TransferHeartbeat

type TransferHeartbeat struct {
	RetryCount int
	LastError  string
}

type TransferState

type TransferState interface {
	// GetTransferState return known transfer state
	GetTransferState(id string) (map[string]*TransferStateData, error)
	// SetTransferState set certain keys to transfer state.
	// it will add keys to exists state
	SetTransferState(transferID string, state map[string]*TransferStateData) error
	// RemoveTransferState remove certain state keys from state
	RemoveTransferState(transferID string, state []string) error
}

TransferState is to manage transfer state, simple K-V structure

type TransferStateData

type TransferStateData struct {
	// Generic is recommended way, you can put anything json serializable here
	Generic any
	// IncrementalTables store current cursor progress for incremental tables
	IncrementalTables []abstract.TableDescription

	// Obsolete states, per-db, do not add new
	OraclePosition      *OraclePositionState
	MysqlGtid           *MysqlGtidState
	MysqlBinlogPosition *MysqlBinlogPositionState
	YtStaticPart        *YtStaticPartState
}

TransferStateData contain transfer state, shared across retries / restarts can contain any generic information about transfer progress

func (*TransferStateData) GetGeneric

func (s *TransferStateData) GetGeneric() any

func (*TransferStateData) GetIncrementalTables

func (s *TransferStateData) GetIncrementalTables() []abstract.TableDescription

func (*TransferStateData) GetMysqlBinlogPosition

func (s *TransferStateData) GetMysqlBinlogPosition() *MysqlBinlogPositionState

func (*TransferStateData) GetMysqlGtid

func (s *TransferStateData) GetMysqlGtid() *MysqlGtidState

func (*TransferStateData) GetOraclePosition

func (s *TransferStateData) GetOraclePosition() *OraclePositionState

type TransferStatus

type TransferStatus interface {
	// SetStatus move transfer to certain status
	SetStatus(transferID string, status server.TransferStatus) error
	// FailReplication stop replication with error
	FailReplication(transferID string, err error) error
	// TransferHealth add heartbeat for replication instance
	TransferHealth(ctx context.Context, transferID string, health *TransferHeartbeat) error
}

TransferStatus main coordinator interface used by transfer just to start / stop transfer and move it via transfer workflow

type YtStaticPartState

type YtStaticPartState struct {
	SchemaName            string
	TableName             string
	PartID                string
	RotatedShardedTableID string
	YtTargetPath          string
	YtShardTargetPath     string
	YtShardTmpPath        string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL