Documentation ¶
Overview ¶
Package workflow defines types and functions for working with Vitess workflows.
This is still a very rough sketch, far from a final API, but I want to document some things here as I go:
(1) The lines between package workflow and package workflow/vexec are, uh,
blurry at best, and definitely need serious thinking and refinement. Maybe there shouldn't even be two separate packages at all. The reason I have the two packages right now is because I'm operating under the assumption that there are workflows that are vexec, and then there are other workflows. If it's true that all workflows are vexec workflows, then probably one single package could make more sense. For now, two packages seems the way to go, but like I said, the boundaries are blurry, and things that belong in one package are in the other, because I haven't gone back and moved things around.
(2) I'm aiming for this to be a drop-in replacement (more or less) for the
function calls in go/vt/wrangler. However, I'd rather define a better abstraction if it means having to rewrite even significant portions of the existing wrangler code to adapt to it, than make a subpar API in the name of backwards compatibility. I'm not sure if that's a tradeoff I'll even need to consider in the future, but I'm putting a stake in the ground on which side of that tradeoff I intend to fall, should it come to it.
(3) Eventually we'll need to consider how the online schema migration workflows
fit into this. I'm trying to at least be somewhat abstract in the vexec / queryplanner APIs to fit with the QueryParams thing that wrangler uses, which _should_ work, but who knows?? Time will tell.
Index ¶
- Constants
- Variables
- func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ...) error
- func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64
- func ReverseWorkflowName(workflow string) string
- func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error
- type ITrafficSwitcher
- type LogRecorder
- type MigrationSource
- type MigrationTarget
- type Server
- func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)
- func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, ...) error
- func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recursive, evenIfServing bool) error
- func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, ...) (*[]string, error)
- func (s *Server) GetCellsWithShardReadsSwitched(ctx context.Context, keyspace string, si *topo.ShardInfo, ...) (cellsSwitched []string, cellsNotSwitched []string, err error)
- func (s *Server) GetCellsWithTableReadsSwitched(ctx context.Context, keyspace string, table string, ...) (cellsSwitched []string, cellsNotSwitched []string, err error)
- func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error)
- func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool, ...) (*vtctldatapb.Workflow, error)
- func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error)
- func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.LookupVindexCreateRequest) (*vtctldatapb.LookupVindexCreateResponse, error)
- func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.LookupVindexExternalizeRequest) (*vtctldatapb.LookupVindexExternalizeResponse, error)
- func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error
- func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error)
- func (s *Server) MountList(ctx context.Context, req *vtctldatapb.MountListRequest) (*vtctldatapb.MountListResponse, error)
- func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegisterRequest) (*vtctldatapb.MountRegisterResponse, error)
- func (s *Server) MountShow(ctx context.Context, req *vtctldatapb.MountShowRequest) (*vtctldatapb.MountShowResponse, error)
- func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnregisterRequest) (*vtctldatapb.MountUnregisterResponse, error)
- func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTablesCompleteRequest) (*vtctldatapb.MoveTablesCompleteResponse, error)
- func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (res *vtctldatapb.WorkflowStatusResponse, err error)
- func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error)
- func (s *Server) SQLParser() *sqlparser.Parser
- func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error)
- func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error)
- func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error)
- func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error)
- func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error)
- func (s *Server) VReplicationExec(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)
- func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDeleteRequest) (*vtctldatapb.WorkflowDeleteResponse, error)
- func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowStatusRequest) (*vtctldatapb.WorkflowStatusResponse, error)
- func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest) (*vtctldatapb.WorkflowSwitchTrafficResponse, error)
- func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUpdateRequest) (*vtctldatapb.WorkflowUpdateResponse, error)
- type State
- type StreamMigrator
- func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context)
- func (sm *StreamMigrator) LegacyStopStreams(ctx context.Context) ([]string, error)
- func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error
- func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)
- func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream
- func (sm *StreamMigrator) Templates() []*VReplicationStream
- type StreamType
- type TableRemovalType
- type TargetInfo
- type TrafficSwitchDirection
- type Type
- type VReplicationStream
- type VReplicationStreams
- type VReplicationWorkflowType
Constants ¶
const ( MoveTablesWorkflow = VReplicationWorkflowType(iota) ReshardWorkflow MigrateWorkflow )
VReplicationWorkflowType enums.
const ( StreamTypeUnknown = StreamType(iota) StreamTypeSharded StreamTypeReference )
StreamType values.
const ( // Frozen is the message value of frozen vreplication streams. Frozen = "FROZEN" // Running is the state value of a vreplication stream in the // replicating state. Running = "RUNNING" )
const ( DirectionForward = TrafficSwitchDirection(iota) DirectionBackward )
The following constants define the switching direction.
const ( DropTable = TableRemovalType(iota) RenameTable )
The following consts define if DropSource will drop or rename the table.
Variables ¶
var ( // ErrInvalidWorkflow is a catchall error type for conditions that should be // impossible when operating on a workflow. ErrInvalidWorkflow = errors.New("invalid workflow") // ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple // source keyspaces across different shard primaries. This should be // impossible. ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow") // ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple // target keyspaces across different shard primaries. This should be // impossible. ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") ErrWorkflowNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic") ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic") )
var ( // ErrNoStreams occurs when no target streams are found for a workflow in a // target keyspace. ErrNoStreams = errors.New("no streams found") )
var TypeIntMap = map[Type]VReplicationWorkflowType{ TypeMoveTables: MoveTablesWorkflow, TypeReshard: ReshardWorkflow, TypeMigrate: MigrateWorkflow, }
var TypeStrMap = map[VReplicationWorkflowType]Type{ MoveTablesWorkflow: TypeMoveTables, ReshardWorkflow: TypeReshard, MigrateWorkflow: TypeMigrate, }
Functions ¶
func CompareShards ¶
func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error
CompareShards compares the list of shards in a workflow with the shards in that keyspace according to the topo. It returns an error if they do not match.
This function is used to validate MoveTables workflows.
(TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher has been fully moved over to this package. Once that refactor is finished, this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON THIS FUNCTION EXTERNALLY.
func HashStreams ¶
func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64
HashStreams produces a stable hash based on the target keyspace and migration targets.
func ReverseWorkflowName ¶
ReverseWorkflowName returns the "reversed" name of a workflow. For a "forward" workflow, this is the workflow name with "_reverse" appended, and for a "reversed" workflow, this is the workflow name with the "_reverse" suffix removed.
func StreamMigratorFinalize ¶
func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error
StreamMigratorFinalize finalizes the stream migration.
(TODO:@ajm88) in the original implementation, "it's a standalone function because it does not use the streamMigrater state". That's still true, but moving this to a method on StreamMigrator would provide a cleaner namespacing in package workflow. But, we would have to update more callers in order to do that (*wrangler.switcher's streamMigrateFinalize would need to change its signature to also take a *workflow.StreamMigrator), so we will do that in a second PR.
Types ¶
type ITrafficSwitcher ¶
type ITrafficSwitcher interface { TopoServer() *topo.Server TabletManagerClient() tmclient.TabletManagerClient Logger() logutil.Logger // VReplicationExec here is used when we want the (*wrangler.Wrangler) // implementation, which does a topo lookup on the tablet alias before // calling the underlying TabletManagerClient RPC. VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) ExternalTopo() *topo.Server MigrationType() binlogdatapb.MigrationType ReverseWorkflowName() string SourceKeyspaceName() string SourceKeyspaceSchema() *vindexes.KeyspaceSchema Sources() map[string]*MigrationSource Tables() []string TargetKeyspaceName() string Targets() map[string]*MigrationTarget WorkflowName() string SourceTimeZone() string ForAllSources(f func(source *MigrationSource) error) error ForAllTargets(f func(target *MigrationTarget) error) error ForAllUIDs(f func(target *MigrationTarget, uid int32) error) error SourceShards() []*topo.ShardInfo TargetShards() []*topo.ShardInfo }
ITrafficSwitcher is a hack to allow us to maintain the legacy wrangler package for vtctl/vtctlclient while migrating most of the TrafficSwitcher related code to the workflow package for vtctldclient usage.
After moving TrafficSwitcher to this package and removing the implementation in wrangler, this type should be removed, and StreamMigrator should be updated to contain a field of type *TrafficSwitcher instead of ITrafficSwitcher.
type LogRecorder ¶
type LogRecorder struct {
// contains filtered or unexported fields
}
LogRecorder is used to collect logs for a specific purpose. Not thread-safe since it is expected to be generated in repeatable sequence
func NewLogRecorder ¶
func NewLogRecorder() *LogRecorder
NewLogRecorder creates a new instance of LogRecorder
func (*LogRecorder) GetLogs ¶
func (lr *LogRecorder) GetLogs() []string
GetLogs returns all recorded logs in sequence
func (*LogRecorder) LogSlice ¶
func (lr *LogRecorder) LogSlice(logs []string)
LogSlice sorts a given slice using natural sort, so that the result is predictable. Useful when logging arrays or maps where order of objects can vary
func (*LogRecorder) Logf ¶
func (lr *LogRecorder) Logf(log string, args ...any)
Logf records a new log message with interpolation parameters using fmt.Sprintf.
type MigrationSource ¶
type MigrationSource struct { Position string Journaled bool // contains filtered or unexported fields }
MigrationSource contains the metadata for each migration source.
func NewMigrationSource ¶
func NewMigrationSource(si *topo.ShardInfo, primary *topo.TabletInfo) *MigrationSource
NewMigrationSource returns a MigrationSource for the given shard and primary.
(TODO|@ajm188): do we always want to start with (position:"", journaled:false)?
func (*MigrationSource) GetPrimary ¶
func (source *MigrationSource) GetPrimary() *topo.TabletInfo
GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration source.
func (*MigrationSource) GetShard ¶
func (source *MigrationSource) GetShard() *topo.ShardInfo
GetShard returns the *topo.ShardInfo for the migration source.
type MigrationTarget ¶
type MigrationTarget struct { Sources map[int32]*binlogdatapb.BinlogSource Position string // contains filtered or unexported fields }
MigrationTarget contains the metadata for each migration target.
func (*MigrationTarget) GetPrimary ¶
func (target *MigrationTarget) GetPrimary() *topo.TabletInfo
GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration target.
func (*MigrationTarget) GetShard ¶
func (target *MigrationTarget) GetShard() *topo.ShardInfo
GetShard returns the *topo.ShardInfo for the migration target.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server provides an API to work with Vitess workflows, like vreplication workflows (MoveTables, Reshard, etc) and schema migration workflows.
func NewServer ¶
func NewServer(env *vtenv.Environment, ts *topo.Server, tmc tmclient.TabletManagerClient) *Server
NewServer returns a new server instance with the given topo.Server and TabletManagerClient.
func (*Server) CheckReshardingJournalExistsOnTablet ¶
func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)
CheckReshardingJournalExistsOnTablet returns the journal (or an empty journal) and a boolean to indicate if the resharding_journal table exists on the given tablet.
(TODO:@ajm188) This should not be part of the final public API, and should be un-exported after all places in package wrangler that call this have been migrated over.
func (*Server) CopySchemaShard ¶
func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitReplicasTimeout time.Duration, skipVerify bool) error
CopySchemaShard copies the schema from a source tablet to the specified shard. The schema is applied directly on the primary of the destination shard, and is propagated to the replicas through binlogs.
func (*Server) DeleteShard ¶
func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recursive, evenIfServing bool) error
DeleteShard will do all the necessary changes in the topology server to entirely remove a shard.
func (*Server) DropTargets ¶
func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error)
DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard is cancelled.
func (*Server) GetCellsWithShardReadsSwitched ¶
func (s *Server) GetCellsWithShardReadsSwitched( ctx context.Context, keyspace string, si *topo.ShardInfo, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error)
GetCellsWithShardReadsSwitched returns the topo cells partitioned into two slices: one with the cells where shard reads have been switched for the given tablet type and one with the cells where shard reads have not been switched for the given tablet type.
This function is for use in Reshard, and "switched reads" is defined as if any one of the source shards has the query service disabled in its tablet control record.
func (*Server) GetCellsWithTableReadsSwitched ¶
func (s *Server) GetCellsWithTableReadsSwitched( ctx context.Context, keyspace string, table string, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error)
GetCellsWithTableReadsSwitched returns the topo cells partitioned into two slices: one with the cells where table reads have been switched for the given tablet type and one with the cells where table reads have not been switched for the given tablet type.
This function is for use in MoveTables, and "switched reads" is defined as if the routing rule for a (table, tablet_type) is pointing to the target keyspace.
func (*Server) GetCopyProgress ¶
func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error)
GetCopyProgress returns the progress of all tables being copied in the workflow.
func (*Server) GetWorkflow ¶
func (*Server) GetWorkflows ¶
func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error)
GetWorkflows returns a list of all workflows that exist in a given keyspace, with some additional filtering depending on the request parameters (for example, ActiveOnly=true restricts the search to only workflows that are currently running).
It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows rpc, and grpcvtctldserver delegates to this function.
func (*Server) LookupVindexCreate ¶
func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.LookupVindexCreateRequest) (*vtctldatapb.LookupVindexCreateResponse, error)
LookupVindexCreate creates the lookup vindex in the specified keyspace and creates a VReplication workflow to backfill that vindex from the keyspace to the target/lookup table specified.
func (*Server) LookupVindexExternalize ¶
func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.LookupVindexExternalizeRequest) (*vtctldatapb.LookupVindexExternalizeResponse, error)
LookupVindexExternalize externalizes a lookup vindex that's finished backfilling or has caught up. If the vindex has an owner then the workflow will also be deleted.
func (*Server) Materialize ¶
func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error
Materialize performs the steps needed to materialize a list of tables based on the materialization specs.
func (*Server) MigrateCreate ¶
func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error)
func (*Server) MountList ¶
func (s *Server) MountList(ctx context.Context, req *vtctldatapb.MountListRequest) (*vtctldatapb.MountListResponse, error)
func (*Server) MountRegister ¶
func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegisterRequest) (*vtctldatapb.MountRegisterResponse, error)
func (*Server) MountShow ¶
func (s *Server) MountShow(ctx context.Context, req *vtctldatapb.MountShowRequest) (*vtctldatapb.MountShowResponse, error)
func (*Server) MountUnregister ¶
func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnregisterRequest) (*vtctldatapb.MountUnregisterResponse, error)
func (*Server) MoveTablesComplete ¶
func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTablesCompleteRequest) (*vtctldatapb.MoveTablesCompleteResponse, error)
MoveTablesComplete is part of the vtctlservicepb.VtctldServer interface. It cleans up a successful MoveTables workflow and its related artifacts. Note: this is currently re-used for Reshard as well.
func (*Server) MoveTablesCreate ¶
func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (res *vtctldatapb.WorkflowStatusResponse, err error)
MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that will be executing the workflow.
func (*Server) ReshardCreate ¶
func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error)
ReshardCreate is part of the vtctlservicepb.VtctldServer interface.
func (*Server) VDiffCreate ¶
func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error)
VDiffCreate is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow and VDiff.
func (*Server) VDiffDelete ¶
func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error)
VDiffDelete is part of the vtctlservicepb.VtctldServer interface.
func (*Server) VDiffResume ¶
func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error)
VDiffResume is part of the vtctlservicepb.VtctldServer interface.
func (*Server) VDiffShow ¶
func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error)
VDiffShow is part of the vtctlservicepb.VtctldServer interface.
func (*Server) VDiffStop ¶
func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error)
VDiffStop is part of the vtctlservicepb.VtctldServer interface.
func (*Server) VReplicationExec ¶
func (s *Server) VReplicationExec(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)
VReplicationExec executes a query remotely using the DBA pool.
func (*Server) WorkflowDelete ¶
func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDeleteRequest) (*vtctldatapb.WorkflowDeleteResponse, error)
WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow.
func (*Server) WorkflowStatus ¶
func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowStatusRequest) (*vtctldatapb.WorkflowStatusResponse, error)
func (*Server) WorkflowSwitchTraffic ¶
func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest) (*vtctldatapb.WorkflowSwitchTrafficResponse, error)
WorkflowSwitchTraffic switches traffic in the direction passed for specified tablet types.
func (*Server) WorkflowUpdate ¶
func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUpdateRequest) (*vtctldatapb.WorkflowUpdateResponse, error)
WorkflowUpdate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that are participating in the given workflow.
type State ¶
type State struct { Workflow string SourceKeyspace string TargetKeyspace string WorkflowType Type ReplicaCellsSwitched []string ReplicaCellsNotSwitched []string RdonlyCellsSwitched []string RdonlyCellsNotSwitched []string WritesSwitched bool // Partial MoveTables info IsPartialMigration bool ShardsAlreadySwitched []string ShardsNotYetSwitched []string }
State represents the state of a workflow.
type StreamMigrator ¶
type StreamMigrator struct {
// contains filtered or unexported fields
}
StreamMigrator contains information needed to migrate VReplication streams during Reshard workflows when the keyspace's VReplication workflows need to be migrated from one set of shards to another.
func BuildLegacyStreamMigrator ¶
func BuildLegacyStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool, parser *sqlparser.Parser) (*StreamMigrator, error)
BuildLegacyStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher using the legacy VReplicationExec method. Note: this should be removed along with the vtctl client code / wrangler.
func BuildStreamMigrator ¶
func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool, parser *sqlparser.Parser) (*StreamMigrator, error)
BuildStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher.
func (*StreamMigrator) CancelStreamMigrations ¶
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context)
CancelStreamMigrations cancels the stream migrations.
func (*StreamMigrator) LegacyStopStreams ¶
func (sm *StreamMigrator) LegacyStopStreams(ctx context.Context) ([]string, error)
LegacyStopStreams stops streams using the legacy VReplicationExec method. Note: this should be removed along with the vtctl client code / wrangler.
func (*StreamMigrator) MigrateStreams ¶
func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error
MigrateStreams migrates N streams
func (*StreamMigrator) StopStreams ¶
func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)
StopStreams stops streams.
func (*StreamMigrator) Streams ¶
func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream
Streams returns a deep-copy of the StreamMigrator's streams map.
func (*StreamMigrator) Templates ¶
func (sm *StreamMigrator) Templates() []*VReplicationStream
Templates returns a copy of the StreamMigrator's template streams.
type StreamType ¶
type StreamType int
StreamType is an enum representing the kind of stream.
(TODO:@ajm188) This should be made package-private once the last references in package wrangler are removed.
type TableRemovalType ¶
type TableRemovalType int
TableRemovalType specifies the way the a table will be removed during a DropSource for a MoveTables workflow.
func (TableRemovalType) String ¶
func (trt TableRemovalType) String() string
String returns a string representation of a TableRemovalType
type TargetInfo ¶
type TargetInfo struct { Targets map[string]*MigrationTarget Frozen bool OptCells string OptTabletTypes string WorkflowType binlogdatapb.VReplicationWorkflowType WorkflowSubType binlogdatapb.VReplicationWorkflowSubType Options *vtctldatapb.WorkflowOptions }
TargetInfo contains the metadata for a set of targets involved in a workflow.
func BuildTargets ¶
func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)
BuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace.
It returns ErrNoStreams if there are no targets found for the workflow.
func LegacyBuildTargets ¶
func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string, targetShards []string) (*TargetInfo, error)
LegacyBuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace. It uses VReplicationExec to get the workflow details rather than the new TabletManager ReadVReplicationWorkflow RPC. This is being used to slowly transition all of the older code, including unit tests, over to the new RPC and limit the impact of the new implementation to vtctldclient. You can see how the unit tests were being migrated here: https://gist.github.com/mattlord/738c12befe951f8d09304ff7fdc47c46
New callers should instead use the new BuildTargets function.
It returns ErrNoStreams if there are no targets found for the workflow.
type TrafficSwitchDirection ¶
type TrafficSwitchDirection int
TrafficSwitchDirection specifies the switching direction.
func (TrafficSwitchDirection) String ¶
func (tsd TrafficSwitchDirection) String() string
type Type ¶
type Type string
Type is the type of a workflow as a string and maps directly to what is provided and presented to the user.
type VReplicationStream ¶
type VReplicationStream struct { ID int32 Workflow string BinlogSource *binlogdatapb.BinlogSource Position replication.Position WorkflowType binlogdatapb.VReplicationWorkflowType WorkflowSubType binlogdatapb.VReplicationWorkflowSubType DeferSecondaryKeys bool }
VReplicationStream represents a single stream of a vreplication workflow.
type VReplicationStreams ¶
type VReplicationStreams []*VReplicationStream
VReplicationStreams wraps a slice of VReplicationStream objects to provide some aggregate functionality.
func (VReplicationStreams) Copy ¶
func (streams VReplicationStreams) Copy() VReplicationStreams
Copy returns a copy of the list of streams. All fields except .Position are copied.
func (VReplicationStreams) IDs ¶
func (streams VReplicationStreams) IDs() []int32
IDs returns the IDs of the VReplicationStreams.
func (VReplicationStreams) ToSlice ¶
func (streams VReplicationStreams) ToSlice() []*VReplicationStream
ToSlice unwraps a VReplicationStreams object into the underlying slice of VReplicationStream objects.
func (VReplicationStreams) Values ¶
func (streams VReplicationStreams) Values() string
Values returns a string representing the IDs of the VReplicationStreams for use in an IN clause.
(TODO|@ajm188) This currently returns the literal ")" if len(streams) == 0. We should probably update this function to return the full "IN" clause, and then if len(streams) == 0, return "1 != 1" so that we can still execute a valid SQL query.
func (VReplicationStreams) Workflows ¶
func (streams VReplicationStreams) Workflows() []string
Workflows returns a list of unique workflow names in the list of vreplication streams.
type VReplicationWorkflowType ¶
type VReplicationWorkflowType int
VReplicationWorkflowType specifies whether workflow is MoveTables or Reshard and maps directly to what is stored in the backend database.