Documentation ¶
Overview ¶
Package workflow defines types and functions for working with Vitess workflows.
This is still a very rough sketch, far from a final API, but I want to document some things here as I go:
(1) The lines between package workflow and package workflow/vexec are, uh,
blurry at best, and definitely need serious thinking and refinement. Maybe there shouldn't even be two separate packages at all. The reason I have the two packages right now is because I'm operating under the assumption that there are workflows that are vexec, and then there are other workflows. If it's true that all workflows are vexec workflows, then probably one single package could make more sense. For now, two packages seems the way to go, but like I said, the boundaries are blurry, and things that belong in one package are in the other, because I haven't gone back and moved things around.
(2) I'm aiming for this to be a drop-in replacement (more or less) for the
function calls in go/vt/wrangler. However, I'd rather define a better abstraction if it means having to rewrite even significant portions of the existing wrangler code to adapt to it, than make a subpar API in the name of backwards compatibility. I'm not sure if that's a tradeoff I'll even need to consider in the future, but I'm putting a stake in the ground on which side of that tradeoff I intend to fall, should it come to it.
(3) Eventually we'll need to consider how the online schema migration workflows
fit into this. I'm trying to at least be somewhat abstract in the vexec / queryplanner APIs to fit with the QueryParams thing that wrangler uses, which _should_ work, but who knows?? Time will tell.
Index ¶
- Constants
- Variables
- func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ...) error
- func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64
- func ReverseWorkflowName(workflow string) string
- func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error
- type ITrafficSwitcher
- type MigrationSource
- type MigrationTarget
- type Server
- func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)
- func (s *Server) GetCellsWithShardReadsSwitched(ctx context.Context, keyspace string, si *topo.ShardInfo, ...) (cellsSwitched []string, cellsNotSwitched []string, err error)
- func (s *Server) GetCellsWithTableReadsSwitched(ctx context.Context, keyspace string, table string, ...) (cellsSwitched []string, cellsNotSwitched []string, err error)
- func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error)
- type State
- type StreamMigrator
- func (sm *StreamMigrator) CancelMigration(ctx context.Context)
- func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error
- func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)
- func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream
- func (sm *StreamMigrator) Templates() []*VReplicationStream
- type StreamType
- type TableRemovalType
- type TargetInfo
- type TrafficSwitchDirection
- type Type
- type VReplicationStream
- type VReplicationStreams
Constants ¶
const ( StreamTypeUnknown = StreamType(iota) StreamTypeSharded StreamTypeReference )
StreamType values.
const ( DirectionForward = TrafficSwitchDirection(iota) DirectionBackward )
The following constants define the switching direction.
const ( DropTable = TableRemovalType(iota) RenameTable )
The following consts define if DropSource will drop or rename the table.
const (
// Frozen is the message value of frozen vreplication streams.
Frozen = "FROZEN"
)
Variables ¶
var ( // ErrInvalidWorkflow is a catchall error type for conditions that should be // impossible when operating on a workflow. ErrInvalidWorkflow = errors.New("invalid workflow") // ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple // source keyspaces across different shard primaries. This should be // impossible. ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow") // ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple // target keyspaces across different shard primaries. This should be // impossible. ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") )
var ( // ErrNoStreams occurs when no target streams are found for a workflow in a // target keyspace. ErrNoStreams = errors.New("no streams found") )
Functions ¶
func CompareShards ¶ added in v0.13.0
func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error
CompareShards compares the list of shards in a workflow with the shards in that keyspace according to the topo. It returns an error if they do not match.
This function is used to validate MoveTables workflows.
(TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher has been fully moved over to this package. Once that refactor is finished, this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON THIS FUNCTION EXTERNALLY.
func HashStreams ¶ added in v0.11.0
func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64
HashStreams produces a stable hash based on the target keyspace and migration targets.
func ReverseWorkflowName ¶ added in v0.11.0
ReverseWorkflowName returns the "reversed" name of a workflow. For a "forward" workflow, this is the workflow name with "_reversed" appended, and for a "reversed" workflow, this is the workflow name with the "_reversed" suffix removed.
func StreamMigratorFinalize ¶ added in v0.11.0
func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error
StreamMigratorFinalize finalizes the stream migration.
(TODO:@ajm88) in the original implementation, "it's a standalone function because it does not use the streamMigrater state". That's still true, but moving this to a method on StreamMigrator would provide a cleaner namespacing in package workflow. But, we would have to update more callers in order to do that (*wrangler.switcher's streamMigrateFinalize would need to change its signature to also take a *workflow.StreamMigrator), so we will do that in a second PR.
Types ¶
type ITrafficSwitcher ¶ added in v0.11.0
type ITrafficSwitcher interface { TopoServer() *topo.Server TabletManagerClient() tmclient.TabletManagerClient Logger() logutil.Logger // VReplicationExec here is used when we want the (*wrangler.Wrangler) // implementation, which does a topo lookup on the tablet alias before // calling the underlying TabletManagerClient RPC. VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) ExternalTopo() *topo.Server MigrationType() binlogdatapb.MigrationType ReverseWorkflowName() string SourceKeyspaceName() string SourceKeyspaceSchema() *vindexes.KeyspaceSchema Sources() map[string]*MigrationSource Tables() []string TargetKeyspaceName() string Targets() map[string]*MigrationTarget WorkflowName() string SourceTimeZone() string ForAllSources(f func(source *MigrationSource) error) error ForAllTargets(f func(target *MigrationTarget) error) error ForAllUIDs(f func(target *MigrationTarget, uid uint32) error) error SourceShards() []*topo.ShardInfo TargetShards() []*topo.ShardInfo }
ITrafficSwitcher is a temporary hack to allow us to move streamMigrater out of package wrangler without also needing to move trafficSwitcher in the same changeset.
After moving TrafficSwitcher to this package, this type should be removed, and StreamMigrator should be updated to contain a field of type *TrafficSwitcher instead of ITrafficSwitcher.
type MigrationSource ¶ added in v0.11.0
type MigrationSource struct { Position string Journaled bool // contains filtered or unexported fields }
MigrationSource contains the metadata for each migration source.
func NewMigrationSource ¶ added in v0.11.0
func NewMigrationSource(si *topo.ShardInfo, primary *topo.TabletInfo) *MigrationSource
NewMigrationSource returns a MigrationSource for the given shard and primary.
(TODO|@ajm188): do we always want to start with (position:"", journaled:false)?
func (*MigrationSource) GetPrimary ¶ added in v0.11.0
func (source *MigrationSource) GetPrimary() *topo.TabletInfo
GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration source.
func (*MigrationSource) GetShard ¶ added in v0.11.0
func (source *MigrationSource) GetShard() *topo.ShardInfo
GetShard returns the *topo.ShardInfo for the migration source.
type MigrationTarget ¶ added in v0.11.0
type MigrationTarget struct { Sources map[uint32]*binlogdatapb.BinlogSource Position string // contains filtered or unexported fields }
MigrationTarget contains the metadata for each migration target.
func (*MigrationTarget) GetPrimary ¶ added in v0.11.0
func (target *MigrationTarget) GetPrimary() *topo.TabletInfo
GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration target.
func (*MigrationTarget) GetShard ¶ added in v0.11.0
func (target *MigrationTarget) GetShard() *topo.ShardInfo
GetShard returns the *topo.ShardInfo for the migration target.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server provides an API to work with Vitess workflows, like vreplication workflows (MoveTables, Reshard, etc) and schema migration workflows.
NB: This is in alpha, and you probably don't want to depend on it (yet!). Currently, it provides only a read-only API to vreplication workflows. Write actions on vreplication workflows, and schema migration workflows entirely, are not yet supported, but planned.
func NewServer ¶
func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server
NewServer returns a new server instance with the given topo.Server and TabletManagerClient.
func (*Server) CheckReshardingJournalExistsOnTablet ¶ added in v0.11.0
func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)
CheckReshardingJournalExistsOnTablet returns the journal (or an empty journal) and a boolean to indicate if the resharding_journal table exists on the given tablet.
(TODO:@ajm188) This should not be part of the final public API, and should be un-exported after all places in package wrangler that call this have been migrated over.
func (*Server) GetCellsWithShardReadsSwitched ¶ added in v0.11.0
func (s *Server) GetCellsWithShardReadsSwitched( ctx context.Context, keyspace string, si *topo.ShardInfo, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error)
GetCellsWithShardReadsSwitched returns the topo cells partitioned into two slices: one with the cells where shard reads have been switched for the given tablet type and one with the cells where shard reads have not been switched for the given tablet type.
This function is for use in Reshard, and "switched reads" is defined as if any one of the source shards has the query service disabled in its tablet control record.
func (*Server) GetCellsWithTableReadsSwitched ¶ added in v0.11.0
func (s *Server) GetCellsWithTableReadsSwitched( ctx context.Context, keyspace string, table string, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error)
GetCellsWithTableReadsSwitched returns the topo cells partitioned into two slices: one with the cells where table reads have been switched for the given tablet type and one with the cells where table reads have not been switched for the given tablet type.
This function is for use in MoveTables, and "switched reads" is defined as if the routing rule for a (table, tablet_type) is pointing to the target keyspace.
func (*Server) GetWorkflows ¶
func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error)
GetWorkflows returns a list of all workflows that exist in a given keyspace, with some additional filtering depending on the request parameters (for example, ActiveOnly=true restricts the search to only workflows that are currently running).
It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows rpc, and grpcvtctldserver delegates to this function.
type State ¶ added in v0.11.0
type State struct { Workflow string SourceKeyspace string TargetKeyspace string WorkflowType Type ReplicaCellsSwitched []string ReplicaCellsNotSwitched []string RdonlyCellsSwitched []string RdonlyCellsNotSwitched []string WritesSwitched bool }
State represents the state of a workflow.
type StreamMigrator ¶ added in v0.11.0
type StreamMigrator struct {
// contains filtered or unexported fields
}
StreamMigrator contains information needed to migrate a stream
func BuildStreamMigrator ¶ added in v0.11.0
func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool) (*StreamMigrator, error)
BuildStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher.
func (*StreamMigrator) CancelMigration ¶ added in v0.11.0
func (sm *StreamMigrator) CancelMigration(ctx context.Context)
CancelMigration cancels a migration
func (*StreamMigrator) MigrateStreams ¶ added in v0.11.0
func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error
MigrateStreams migrates N streams
func (*StreamMigrator) StopStreams ¶ added in v0.11.0
func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)
StopStreams stops streams
func (*StreamMigrator) Streams ¶ added in v0.11.0
func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream
Streams returns a deep-copy of the StreamMigrator's streams map.
func (*StreamMigrator) Templates ¶ added in v0.11.0
func (sm *StreamMigrator) Templates() []*VReplicationStream
Templates returns a copy of the StreamMigrator's template streams.
type StreamType ¶ added in v0.11.0
type StreamType int
StreamType is an enum representing the kind of stream.
(TODO:@ajm188) This should be made package-private once the last references in package wrangler are removed.
type TableRemovalType ¶ added in v0.11.0
type TableRemovalType int
TableRemovalType specifies the way the a table will be removed during a DropSource for a MoveTables workflow.
func (TableRemovalType) String ¶ added in v0.11.0
func (trt TableRemovalType) String() string
String returns a string representation of a TableRemovalType
type TargetInfo ¶ added in v0.11.0
type TargetInfo struct { Targets map[string]*MigrationTarget Frozen bool OptCells string OptTabletTypes string }
TargetInfo contains the metadata for a set of targets involved in a workflow.
func BuildTargets ¶ added in v0.11.0
func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)
BuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace.
It returns ErrNoStreams if there are no targets found for the workflow.
type TrafficSwitchDirection ¶ added in v0.11.0
type TrafficSwitchDirection int
TrafficSwitchDirection specifies the switching direction.
type VReplicationStream ¶ added in v0.11.0
type VReplicationStream struct { ID uint32 Workflow string BinlogSource *binlogdatapb.BinlogSource Position mysql.Position }
VReplicationStream represents a single stream of a vreplication workflow.
type VReplicationStreams ¶ added in v0.11.0
type VReplicationStreams []*VReplicationStream
VReplicationStreams wraps a slice of VReplicationStream objects to provide some aggregate functionality.
func (VReplicationStreams) Copy ¶ added in v0.11.0
func (streams VReplicationStreams) Copy() VReplicationStreams
Copy returns a copy of the list of streams. All fields except .Position are copied.
func (VReplicationStreams) ToSlice ¶ added in v0.11.0
func (streams VReplicationStreams) ToSlice() []*VReplicationStream
ToSlice unwraps a VReplicationStreams object into the underlying slice of VReplicationStream objects.
func (VReplicationStreams) Values ¶ added in v0.11.0
func (streams VReplicationStreams) Values() string
Values returns a string representing the IDs of the VReplicationStreams for use in an IN clause.
(TODO|@ajm188) This currently returns the literal ")" if len(streams) == 0. We should probably update this function to return the full "IN" clause, and then if len(streams) == 0, return "1 != 1" so that we can still execute a valid SQL query.
func (VReplicationStreams) Workflows ¶ added in v0.11.0
func (streams VReplicationStreams) Workflows() []string
Workflows returns a list of unique workflow names in the list of vreplication streams.