workflow

package
v0.15.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: Apache-2.0 Imports: 58 Imported by: 0

Documentation

Overview

Package workflow defines types and functions for working with Vitess workflows.

This is still a very rough sketch, far from a final API, but I want to document some things here as I go:

(1) The lines between package workflow and package workflow/vexec are, uh,

blurry at best, and definitely need serious thinking and refinement. Maybe
there shouldn't even be two separate packages at all. The reason I have the
two packages right now is because I'm operating under the assumption that
there are workflows that are vexec, and then there are other workflows. If
it's true that all workflows are vexec workflows, then probably one single
package could make more sense. For now, two packages seems the way to go,
but like I said, the boundaries are blurry, and things that belong in one
package are in the other, because I haven't gone back and moved things
around.

(2) I'm aiming for this to be a drop-in replacement (more or less) for the

function calls in go/vt/wrangler. However, I'd rather define a better
abstraction if it means having to rewrite even significant portions of the
existing wrangler code to adapt to it, than make a subpar API in the name of
backwards compatibility. I'm not sure if that's a tradeoff I'll even need to
consider in the future, but I'm putting a stake in the ground on which side
of that tradeoff I intend to fall, should it come to it.

(3) Eventually we'll need to consider how the online schema migration workflows

fit into this. I'm trying to at least be somewhat abstract in the
vexec / queryplanner APIs to fit with the QueryParams thing that wrangler
uses, which _should_ work, but who knows?? Time will tell.

Index

Constants

View Source
const (
	MoveTablesWorkflow = VReplicationWorkflowType(iota)
	ReshardWorkflow
	MigrateWorkflow
)

VReplicationWorkflowType enums.

View Source
const (
	StreamTypeUnknown = StreamType(iota)
	StreamTypeSharded
	StreamTypeReference
)

StreamType values.

View Source
const (
	// Frozen is the message value of frozen vreplication streams.
	Frozen = "FROZEN"
	// Running is the state value of a vreplication stream in the
	// replicating state.
	Running = "RUNNING"
)
View Source
const (
	DirectionForward = TrafficSwitchDirection(iota)
	DirectionBackward
)

The following constants define the switching direction.

View Source
const (
	DropTable = TableRemovalType(iota)
	RenameTable
)

The following consts define if DropSource will drop or rename the table.

Variables

View Source
var (
	// ErrInvalidWorkflow is a catchall error type for conditions that should be
	// impossible when operating on a workflow.
	ErrInvalidWorkflow = errors.New("invalid workflow")
	// ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple
	// source keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow")
	// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
	// target keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleTargetKeyspaces   = errors.New("multiple target keyspaces for a single workflow")
	ErrWorkflowNotFullySwitched  = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
	ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic")
)
View Source
var (
	// ErrNoStreams occurs when no target streams are found for a workflow in a
	// target keyspace.
	ErrNoStreams = errors.New("no streams found")
)

Functions

func CompareShards

func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error

CompareShards compares the list of shards in a workflow with the shards in that keyspace according to the topo. It returns an error if they do not match.

This function is used to validate MoveTables workflows.

(TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher has been fully moved over to this package. Once that refactor is finished, this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON THIS FUNCTION EXTERNALLY.

func HashStreams

func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64

HashStreams produces a stable hash based on the target keyspace and migration targets.

func ReverseWorkflowName

func ReverseWorkflowName(workflow string) string

ReverseWorkflowName returns the "reversed" name of a workflow. For a "forward" workflow, this is the workflow name with "_reverse" appended, and for a "reversed" workflow, this is the workflow name with the "_reverse" suffix removed.

func StreamMigratorFinalize

func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error

StreamMigratorFinalize finalizes the stream migration.

(TODO:@ajm88) in the original implementation, "it's a standalone function because it does not use the streamMigrater state". That's still true, but moving this to a method on StreamMigrator would provide a cleaner namespacing in package workflow. But, we would have to update more callers in order to do that (*wrangler.switcher's streamMigrateFinalize would need to change its signature to also take a *workflow.StreamMigrator), so we will do that in a second PR.

Types

type ITrafficSwitcher

type ITrafficSwitcher interface {
	TopoServer() *topo.Server
	TabletManagerClient() tmclient.TabletManagerClient
	Logger() logutil.Logger
	// VReplicationExec here is used when we want the (*wrangler.Wrangler)
	// implementation, which does a topo lookup on the tablet alias before
	// calling the underlying TabletManagerClient RPC.
	VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)

	ExternalTopo() *topo.Server
	MigrationType() binlogdatapb.MigrationType
	ReverseWorkflowName() string
	SourceKeyspaceName() string
	SourceKeyspaceSchema() *vindexes.KeyspaceSchema
	Sources() map[string]*MigrationSource
	Tables() []string
	TargetKeyspaceName() string
	Targets() map[string]*MigrationTarget
	WorkflowName() string
	SourceTimeZone() string

	ForAllSources(f func(source *MigrationSource) error) error
	ForAllTargets(f func(target *MigrationTarget) error) error
	ForAllUIDs(f func(target *MigrationTarget, uid int32) error) error
	SourceShards() []*topo.ShardInfo
	TargetShards() []*topo.ShardInfo
}

ITrafficSwitcher is a hack to allow us to maintain the legacy wrangler package for vtctl/vtctlclient while migrating most of the TrafficSwitcher related code to the workflow package for vtctldclient usage.

After moving TrafficSwitcher to this package and removing the implementation in wrangler, this type should be removed, and StreamMigrator should be updated to contain a field of type *TrafficSwitcher instead of ITrafficSwitcher.

type LogRecorder

type LogRecorder struct {
	// contains filtered or unexported fields
}

LogRecorder is used to collect logs for a specific purpose. Not thread-safe since it is expected to be generated in repeatable sequence

func NewLogRecorder

func NewLogRecorder() *LogRecorder

NewLogRecorder creates a new instance of LogRecorder

func (*LogRecorder) GetLogs

func (lr *LogRecorder) GetLogs() []string

GetLogs returns all recorded logs in sequence

func (*LogRecorder) Log

func (lr *LogRecorder) Log(log string)

Log records a new log message

func (*LogRecorder) LogSlice

func (lr *LogRecorder) LogSlice(logs []string)

LogSlice sorts a given slice using natural sort, so that the result is predictable. Useful when logging arrays or maps where order of objects can vary

func (*LogRecorder) Logf

func (lr *LogRecorder) Logf(log string, args ...any)

Logf records a new log message with interpolation parameters using fmt.Sprintf.

type MigrationSource

type MigrationSource struct {
	Position  string
	Journaled bool
	// contains filtered or unexported fields
}

MigrationSource contains the metadata for each migration source.

func NewMigrationSource

func NewMigrationSource(si *topo.ShardInfo, primary *topo.TabletInfo) *MigrationSource

NewMigrationSource returns a MigrationSource for the given shard and primary.

(TODO|@ajm188): do we always want to start with (position:"", journaled:false)?

func (*MigrationSource) GetPrimary

func (source *MigrationSource) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration source.

func (*MigrationSource) GetShard

func (source *MigrationSource) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration source.

type MigrationTarget

type MigrationTarget struct {
	Sources  map[int32]*binlogdatapb.BinlogSource
	Position string
	// contains filtered or unexported fields
}

MigrationTarget contains the metadata for each migration target.

func (*MigrationTarget) GetPrimary

func (target *MigrationTarget) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration target.

func (*MigrationTarget) GetShard

func (target *MigrationTarget) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration target.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server provides an API to work with Vitess workflows, like vreplication workflows (MoveTables, Reshard, etc) and schema migration workflows.

func NewServer

func NewServer(env *vtenv.Environment, ts *topo.Server, tmc tmclient.TabletManagerClient) *Server

NewServer returns a new server instance with the given topo.Server and TabletManagerClient.

func (*Server) CheckReshardingJournalExistsOnTablet

func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)

CheckReshardingJournalExistsOnTablet returns the journal (or an empty journal) and a boolean to indicate if the resharding_journal table exists on the given tablet.

(TODO:@ajm188) This should not be part of the final public API, and should be un-exported after all places in package wrangler that call this have been migrated over.

func (*Server) CopySchemaShard

func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitReplicasTimeout time.Duration, skipVerify bool) error

CopySchemaShard copies the schema from a source tablet to the specified shard. The schema is applied directly on the primary of the destination shard, and is propagated to the replicas through binlogs.

func (*Server) DeleteShard

func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recursive, evenIfServing bool) error

DeleteShard will do all the necessary changes in the topology server to entirely remove a shard.

func (*Server) DropTargets

func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error)

DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard is cancelled.

func (*Server) GetCellsWithShardReadsSwitched

func (s *Server) GetCellsWithShardReadsSwitched(
	ctx context.Context,
	keyspace string,
	si *topo.ShardInfo,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithShardReadsSwitched returns the topo cells partitioned into two slices: one with the cells where shard reads have been switched for the given tablet type and one with the cells where shard reads have not been switched for the given tablet type.

This function is for use in Reshard, and "switched reads" is defined as if any one of the source shards has the query service disabled in its tablet control record.

func (*Server) GetCellsWithTableReadsSwitched

func (s *Server) GetCellsWithTableReadsSwitched(
	ctx context.Context,
	keyspace string,
	table string,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithTableReadsSwitched returns the topo cells partitioned into two slices: one with the cells where table reads have been switched for the given tablet type and one with the cells where table reads have not been switched for the given tablet type.

This function is for use in MoveTables, and "switched reads" is defined as if the routing rule for a (table, tablet_type) is pointing to the target keyspace.

func (*Server) GetCopyProgress

func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error)

GetCopyProgress returns the progress of all tables being copied in the workflow.

func (*Server) GetWorkflow

func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool, shards []string) (*vtctldatapb.Workflow, error)

func (*Server) GetWorkflows

GetWorkflows returns a list of all workflows that exist in a given keyspace, with some additional filtering depending on the request parameters (for example, ActiveOnly=true restricts the search to only workflows that are currently running).

It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows rpc, and grpcvtctldserver delegates to this function.

func (*Server) LookupVindexCreate

LookupVindexCreate creates the lookup vindex in the specified keyspace and creates a VReplication workflow to backfill that vindex from the keyspace to the target/lookup table specified.

func (*Server) LookupVindexExternalize

LookupVindexExternalize externalizes a lookup vindex that's finished backfilling or has caught up. If the vindex has an owner then the workflow will also be deleted.

func (*Server) Materialize

func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error

Materialize performs the steps needed to materialize a list of tables based on the materialization specs.

func (*Server) MountList

func (*Server) MountShow

func (*Server) MoveTablesComplete

MoveTablesComplete is part of the vtctlservicepb.VtctldServer interface. It cleans up a successful MoveTables workflow and its related artifacts. Note: this is currently re-used for Reshard as well.

func (*Server) MoveTablesCreate

MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that will be executing the workflow.

func (*Server) ReshardCreate

ReshardCreate is part of the vtctlservicepb.VtctldServer interface.

func (*Server) SQLParser

func (s *Server) SQLParser() *sqlparser.Parser

func (*Server) VDiffCreate

VDiffCreate is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow and VDiff.

func (*Server) VDiffDelete

VDiffDelete is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffResume

VDiffResume is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffShow

VDiffShow is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffStop

VDiffStop is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VReplicationExec

func (s *Server) VReplicationExec(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)

VReplicationExec executes a query remotely using the DBA pool.

func (*Server) WorkflowDelete

WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow.

func (*Server) WorkflowSwitchTraffic

WorkflowSwitchTraffic switches traffic in the direction passed for specified tablet types.

func (*Server) WorkflowUpdate

WorkflowUpdate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that are participating in the given workflow.

type State

type State struct {
	Workflow       string
	SourceKeyspace string
	TargetKeyspace string
	WorkflowType   Type

	ReplicaCellsSwitched    []string
	ReplicaCellsNotSwitched []string

	RdonlyCellsSwitched    []string
	RdonlyCellsNotSwitched []string

	WritesSwitched bool

	// Partial MoveTables info
	IsPartialMigration    bool
	ShardsAlreadySwitched []string
	ShardsNotYetSwitched  []string
}

State represents the state of a workflow.

func (*State) String

func (s *State) String() string

type StreamMigrator

type StreamMigrator struct {
	// contains filtered or unexported fields
}

StreamMigrator contains information needed to migrate VReplication streams during Reshard workflows when the keyspace's VReplication workflows need to be migrated from one set of shards to another.

func BuildLegacyStreamMigrator

func BuildLegacyStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool, parser *sqlparser.Parser) (*StreamMigrator, error)

BuildLegacyStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher using the legacy VReplicationExec method. Note: this should be removed along with the vtctl client code / wrangler.

func BuildStreamMigrator

func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool, parser *sqlparser.Parser) (*StreamMigrator, error)

BuildStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher.

func (*StreamMigrator) CancelStreamMigrations

func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context)

CancelStreamMigrations cancels the stream migrations.

func (*StreamMigrator) LegacyStopStreams

func (sm *StreamMigrator) LegacyStopStreams(ctx context.Context) ([]string, error)

LegacyStopStreams stops streams using the legacy VReplicationExec method. Note: this should be removed along with the vtctl client code / wrangler.

func (*StreamMigrator) MigrateStreams

func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error

MigrateStreams migrates N streams

func (*StreamMigrator) StopStreams

func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)

StopStreams stops streams.

func (*StreamMigrator) Streams

func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream

Streams returns a deep-copy of the StreamMigrator's streams map.

func (*StreamMigrator) Templates

func (sm *StreamMigrator) Templates() []*VReplicationStream

Templates returns a copy of the StreamMigrator's template streams.

type StreamType

type StreamType int

StreamType is an enum representing the kind of stream.

(TODO:@ajm188) This should be made package-private once the last references in package wrangler are removed.

type TableRemovalType

type TableRemovalType int

TableRemovalType specifies the way the a table will be removed during a DropSource for a MoveTables workflow.

func (TableRemovalType) String

func (trt TableRemovalType) String() string

String returns a string representation of a TableRemovalType

type TargetInfo

type TargetInfo struct {
	Targets         map[string]*MigrationTarget
	Frozen          bool
	OptCells        string
	OptTabletTypes  string
	WorkflowType    binlogdatapb.VReplicationWorkflowType
	WorkflowSubType binlogdatapb.VReplicationWorkflowSubType
	Options         *vtctldatapb.WorkflowOptions
}

TargetInfo contains the metadata for a set of targets involved in a workflow.

func BuildTargets

func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)

BuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace.

It returns ErrNoStreams if there are no targets found for the workflow.

func LegacyBuildTargets

func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string,
	targetShards []string) (*TargetInfo, error)

LegacyBuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace. It uses VReplicationExec to get the workflow details rather than the new TabletManager ReadVReplicationWorkflow RPC. This is being used to slowly transition all of the older code, including unit tests, over to the new RPC and limit the impact of the new implementation to vtctldclient. You can see how the unit tests were being migrated here: https://gist.github.com/mattlord/738c12befe951f8d09304ff7fdc47c46

New callers should instead use the new BuildTargets function.

It returns ErrNoStreams if there are no targets found for the workflow.

type TrafficSwitchDirection

type TrafficSwitchDirection int

TrafficSwitchDirection specifies the switching direction.

func (TrafficSwitchDirection) String

func (tsd TrafficSwitchDirection) String() string

type Type

type Type string

Type is the type of a workflow as a string and maps directly to what is provided and presented to the user.

const (
	TypeMoveTables Type = "MoveTables"
	TypeReshard    Type = "Reshard"
	TypeMigrate    Type = "Migrate"
)

Workflow string types.

type VReplicationStream

type VReplicationStream struct {
	ID                 int32
	Workflow           string
	BinlogSource       *binlogdatapb.BinlogSource
	Position           replication.Position
	WorkflowType       binlogdatapb.VReplicationWorkflowType
	WorkflowSubType    binlogdatapb.VReplicationWorkflowSubType
	DeferSecondaryKeys bool
}

VReplicationStream represents a single stream of a vreplication workflow.

type VReplicationStreams

type VReplicationStreams []*VReplicationStream

VReplicationStreams wraps a slice of VReplicationStream objects to provide some aggregate functionality.

func (VReplicationStreams) Copy

Copy returns a copy of the list of streams. All fields except .Position are copied.

func (VReplicationStreams) IDs

func (streams VReplicationStreams) IDs() []int32

IDs returns the IDs of the VReplicationStreams.

func (VReplicationStreams) ToSlice

func (streams VReplicationStreams) ToSlice() []*VReplicationStream

ToSlice unwraps a VReplicationStreams object into the underlying slice of VReplicationStream objects.

func (VReplicationStreams) Values

func (streams VReplicationStreams) Values() string

Values returns a string representing the IDs of the VReplicationStreams for use in an IN clause.

(TODO|@ajm188) This currently returns the literal ")" if len(streams) == 0. We should probably update this function to return the full "IN" clause, and then if len(streams) == 0, return "1 != 1" so that we can still execute a valid SQL query.

func (VReplicationStreams) Workflows

func (streams VReplicationStreams) Workflows() []string

Workflows returns a list of unique workflow names in the list of vreplication streams.

type VReplicationWorkflowType

type VReplicationWorkflowType int

VReplicationWorkflowType specifies whether workflow is MoveTables or Reshard and maps directly to what is stored in the backend database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL