workflow

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Overview

Package workflow defines types and functions for working with Vitess workflows.

This is still a very rough sketch, far from a final API, but I want to document some things here as I go:

(1) The lines between package workflow and package workflow/vexec are, uh,

blurry at best, and definitely need serious thinking and refinement. Maybe
there shouldn't even be two separate packages at all. The reason I have the
two packages right now is because I'm operating under the assumption that
there are workflows that are vexec, and then there are other workflows. If
it's true that all workflows are vexec workflows, then probably one single
package could make more sense. For now, two packages seems the way to go,
but like I said, the boundaries are blurry, and things that belong in one
package are in the other, because I haven't gone back and moved things
around.

(2) I'm aiming for this to be a drop-in replacement (more or less) for the

function calls in go/vt/wrangler. However, I'd rather define a better
abstraction if it means having to rewrite even significant portions of the
existing wrangler code to adapt to it, than make a subpar API in the name of
backwards compatibility. I'm not sure if that's a tradeoff I'll even need to
consider in the future, but I'm putting a stake in the ground on which side
of that tradeoff I intend to fall, should it come to it.

(3) Eventually we'll need to consider how the online schema migration workflows

fit into this. I'm trying to at least be somewhat abstract in the
vexec / queryplanner APIs to fit with the QueryParams thing that wrangler
uses, which _should_ work, but who knows?? Time will tell.

Index

Constants

View Source
const (
	MoveTablesWorkflow = VReplicationWorkflowType(iota)
	ReshardWorkflow
	MigrateWorkflow
)

VReplicationWorkflowType enums.

View Source
const (
	StreamTypeUnknown = StreamType(iota)
	StreamTypeSharded
	StreamTypeReference
)

StreamType values.

View Source
const (
	// Frozen is the message value of frozen vreplication streams.
	Frozen = "FROZEN"
	// Running is the state value of a vreplication stream in the
	// replicating state.
	Running = "RUNNING"
)
View Source
const (
	DirectionForward = TrafficSwitchDirection(iota)
	DirectionBackward
)

The following constants define the switching direction.

View Source
const (
	DropTable = TableRemovalType(iota)
	RenameTable
)

The following consts define if DropSource will drop or rename the table.

Variables

View Source
var (
	// ErrInvalidWorkflow is a catchall error type for conditions that should be
	// impossible when operating on a workflow.
	ErrInvalidWorkflow = errors.New("invalid workflow")
	// ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple
	// source keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow")
	// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
	// target keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleTargetKeyspaces   = errors.New("multiple target keyspaces for a single workflow")
	ErrWorkflowNotFullySwitched  = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
	ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic")
)
View Source
var (
	// ErrNoStreams occurs when no target streams are found for a workflow in a
	// target keyspace.
	ErrNoStreams = errors.New("no streams found")
)

Functions

func CompareShards added in v0.13.0

func CompareShards(ctx context.Context, keyspace string, shards []*topo.ShardInfo, ts *topo.Server) error

CompareShards compares the list of shards in a workflow with the shards in that keyspace according to the topo. It returns an error if they do not match.

This function is used to validate MoveTables workflows.

(TODO|@ajm188): This function is temporarily-exported until *wrangler.trafficSwitcher has been fully moved over to this package. Once that refactor is finished, this function should be unexported. Consequently, YOU SHOULD NOT DEPEND ON THIS FUNCTION EXTERNALLY.

func HashStreams added in v0.11.0

func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64

HashStreams produces a stable hash based on the target keyspace and migration targets.

func Materialize added in v0.18.0

func ReverseWorkflowName added in v0.11.0

func ReverseWorkflowName(workflow string) string

ReverseWorkflowName returns the "reversed" name of a workflow. For a "forward" workflow, this is the workflow name with "_reverse" appended, and for a "reversed" workflow, this is the workflow name with the "_reverse" suffix removed.

func StreamMigratorFinalize added in v0.11.0

func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error

StreamMigratorFinalize finalizes the stream migration.

(TODO:@ajm88) in the original implementation, "it's a standalone function because it does not use the streamMigrater state". That's still true, but moving this to a method on StreamMigrator would provide a cleaner namespacing in package workflow. But, we would have to update more callers in order to do that (*wrangler.switcher's streamMigrateFinalize would need to change its signature to also take a *workflow.StreamMigrator), so we will do that in a second PR.

Types

type ITrafficSwitcher added in v0.11.0

type ITrafficSwitcher interface {
	TopoServer() *topo.Server
	TabletManagerClient() tmclient.TabletManagerClient
	Logger() logutil.Logger
	// VReplicationExec here is used when we want the (*wrangler.Wrangler)
	// implementation, which does a topo lookup on the tablet alias before
	// calling the underlying TabletManagerClient RPC.
	VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)

	ExternalTopo() *topo.Server
	MigrationType() binlogdatapb.MigrationType
	ReverseWorkflowName() string
	SourceKeyspaceName() string
	SourceKeyspaceSchema() *vindexes.KeyspaceSchema
	Sources() map[string]*MigrationSource
	Tables() []string
	TargetKeyspaceName() string
	Targets() map[string]*MigrationTarget
	WorkflowName() string
	SourceTimeZone() string

	ForAllSources(f func(source *MigrationSource) error) error
	ForAllTargets(f func(target *MigrationTarget) error) error
	ForAllUIDs(f func(target *MigrationTarget, uid int32) error) error
	SourceShards() []*topo.ShardInfo
	TargetShards() []*topo.ShardInfo
}

ITrafficSwitcher is a hack to allow us to maintain the legacy wrangler package for vtctl/vtctlclient while migrating most of the TrafficSwitcher related code to the workflow package for vtctldclient usage.

After moving TrafficSwitcher to this package and removing the implementation in wrangler, this type should be removed, and StreamMigrator should be updated to contain a field of type *TrafficSwitcher instead of ITrafficSwitcher.

type LogRecorder added in v0.18.0

type LogRecorder struct {
	// contains filtered or unexported fields
}

LogRecorder is used to collect logs for a specific purpose. Not thread-safe since it is expected to be generated in repeatable sequence

func NewLogRecorder added in v0.18.0

func NewLogRecorder() *LogRecorder

NewLogRecorder creates a new instance of LogRecorder

func (*LogRecorder) GetLogs added in v0.18.0

func (lr *LogRecorder) GetLogs() []string

GetLogs returns all recorded logs in sequence

func (*LogRecorder) Log added in v0.18.0

func (lr *LogRecorder) Log(log string)

Log records a new log message

func (*LogRecorder) LogSlice added in v0.18.0

func (lr *LogRecorder) LogSlice(logs []string)

LogSlice sorts a given slice using natural sort, so that the result is predictable. Useful when logging arrays or maps where order of objects can vary

func (*LogRecorder) Logf added in v0.18.0

func (lr *LogRecorder) Logf(log string, args ...any)

Logf records a new log message with interpolation parameters using fmt.Sprintf.

type MigrationSource added in v0.11.0

type MigrationSource struct {
	Position  string
	Journaled bool
	// contains filtered or unexported fields
}

MigrationSource contains the metadata for each migration source.

func NewMigrationSource added in v0.11.0

func NewMigrationSource(si *topo.ShardInfo, primary *topo.TabletInfo) *MigrationSource

NewMigrationSource returns a MigrationSource for the given shard and primary.

(TODO|@ajm188): do we always want to start with (position:"", journaled:false)?

func (*MigrationSource) GetPrimary added in v0.11.0

func (source *MigrationSource) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration source.

func (*MigrationSource) GetShard added in v0.11.0

func (source *MigrationSource) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration source.

type MigrationTarget added in v0.11.0

type MigrationTarget struct {
	Sources  map[int32]*binlogdatapb.BinlogSource
	Position string
	// contains filtered or unexported fields
}

MigrationTarget contains the metadata for each migration target.

func (*MigrationTarget) GetPrimary added in v0.11.0

func (target *MigrationTarget) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration target.

func (*MigrationTarget) GetShard added in v0.11.0

func (target *MigrationTarget) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration target.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server provides an API to work with Vitess workflows, like vreplication workflows (MoveTables, Reshard, etc) and schema migration workflows.

func NewServer

func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server

NewServer returns a new server instance with the given topo.Server and TabletManagerClient.

func (*Server) CheckReshardingJournalExistsOnTablet added in v0.11.0

func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)

CheckReshardingJournalExistsOnTablet returns the journal (or an empty journal) and a boolean to indicate if the resharding_journal table exists on the given tablet.

(TODO:@ajm188) This should not be part of the final public API, and should be un-exported after all places in package wrangler that call this have been migrated over.

func (*Server) CopySchemaShard added in v0.18.0

func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitReplicasTimeout time.Duration, skipVerify bool) error

CopySchemaShard copies the schema from a source tablet to the specified shard. The schema is applied directly on the primary of the destination shard, and is propagated to the replicas through binlogs.

func (*Server) DeleteShard added in v0.18.0

func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recursive, evenIfServing bool) error

DeleteShard will do all the necessary changes in the topology server to entirely remove a shard.

func (*Server) DropTargets added in v0.18.0

func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, keepRoutingRules, dryRun bool) (*[]string, error)

DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard is cancelled.

func (*Server) GetCellsWithShardReadsSwitched added in v0.11.0

func (s *Server) GetCellsWithShardReadsSwitched(
	ctx context.Context,
	keyspace string,
	si *topo.ShardInfo,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithShardReadsSwitched returns the topo cells partitioned into two slices: one with the cells where shard reads have been switched for the given tablet type and one with the cells where shard reads have not been switched for the given tablet type.

This function is for use in Reshard, and "switched reads" is defined as if any one of the source shards has the query service disabled in its tablet control record.

func (*Server) GetCellsWithTableReadsSwitched added in v0.11.0

func (s *Server) GetCellsWithTableReadsSwitched(
	ctx context.Context,
	keyspace string,
	table string,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithTableReadsSwitched returns the topo cells partitioned into two slices: one with the cells where table reads have been switched for the given tablet type and one with the cells where table reads have not been switched for the given tablet type.

This function is for use in MoveTables, and "switched reads" is defined as if the routing rule for a (table, tablet_type) is pointing to the target keyspace.

func (*Server) GetCopyProgress added in v0.18.0

func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error)

GetCopyProgress returns the progress of all tables being copied in the workflow.

func (*Server) GetWorkflow added in v0.18.0

func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool) (*vtctldatapb.Workflow, error)

func (*Server) GetWorkflows

GetWorkflows returns a list of all workflows that exist in a given keyspace, with some additional filtering depending on the request parameters (for example, ActiveOnly=true restricts the search to only workflows that are currently running).

It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows rpc, and grpcvtctldserver delegates to this function.

func (*Server) LookupVindexCreate added in v0.18.0

LookupVindexCreate creates the lookup vindex in the specified keyspace and creates a VReplication workflow to backfill that vindex from the keyspace to the target/lookup table specified.

func (*Server) LookupVindexExternalize added in v0.18.0

LookupVindexExternalize externalizes a lookup vindex that's finished backfilling or has caught up. If the vindex has an owner then the workflow will also be deleted.

func (*Server) Materialize added in v0.18.0

func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error

Materialize performs the steps needed to materialize a list of tables based on the materialization specs.

func (*Server) MigrateCreate added in v0.18.0

func (*Server) MountList added in v0.18.0

func (*Server) MountRegister added in v0.18.0

func (*Server) MountShow added in v0.18.0

func (*Server) MountUnregister added in v0.18.0

func (*Server) MoveTablesComplete added in v0.18.0

MoveTablesComplete is part of the vtctlservicepb.VtctldServer interface. It cleans up a successful MoveTables workflow and its related artifacts. Note: this is currently re-used for Reshard as well.

func (*Server) MoveTablesCreate added in v0.18.0

MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that will be executing the workflow.

func (*Server) ReshardCreate added in v0.18.0

ReshardCreate is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffCreate added in v0.18.0

VDiffCreate is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow and VDiff.

func (*Server) VDiffDelete added in v0.18.0

VDiffDelete is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffResume added in v0.18.0

VDiffResume is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffShow added in v0.18.0

VDiffShow is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VDiffStop added in v0.18.0

VDiffStop is part of the vtctlservicepb.VtctldServer interface.

func (*Server) VReplicationExec added in v0.18.0

func (s *Server) VReplicationExec(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)

VReplicationExec executes a query remotely using the DBA pool.

func (*Server) WorkflowDelete added in v0.18.0

WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. It passes on the request to the target primary tablets that are participating in the given workflow.

func (*Server) WorkflowStatus added in v0.18.0

func (*Server) WorkflowSwitchTraffic added in v0.18.0

WorkflowSwitchTraffic switches traffic in the direction passed for specified tablet types.

func (*Server) WorkflowUpdate added in v0.17.0

WorkflowUpdate is part of the vtctlservicepb.VtctldServer interface. It passes the embedded TabletRequest object to the given keyspace's target primary tablets that are participating in the given workflow.

type State added in v0.11.0

type State struct {
	Workflow       string
	SourceKeyspace string
	TargetKeyspace string
	WorkflowType   Type

	ReplicaCellsSwitched    []string
	ReplicaCellsNotSwitched []string

	RdonlyCellsSwitched    []string
	RdonlyCellsNotSwitched []string

	WritesSwitched bool

	// Partial MoveTables info
	IsPartialMigration    bool
	ShardsAlreadySwitched []string
	ShardsNotYetSwitched  []string
}

State represents the state of a workflow.

func (*State) String added in v0.18.0

func (s *State) String() string

type StreamMigrator added in v0.11.0

type StreamMigrator struct {
	// contains filtered or unexported fields
}

StreamMigrator contains information needed to migrate a stream

func BuildStreamMigrator added in v0.11.0

func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool) (*StreamMigrator, error)

BuildStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher.

func (*StreamMigrator) CancelMigration added in v0.11.0

func (sm *StreamMigrator) CancelMigration(ctx context.Context)

CancelMigration cancels a migration

func (*StreamMigrator) MigrateStreams added in v0.11.0

func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error

MigrateStreams migrates N streams

func (*StreamMigrator) StopStreams added in v0.11.0

func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)

StopStreams stops streams

func (*StreamMigrator) Streams added in v0.11.0

func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream

Streams returns a deep-copy of the StreamMigrator's streams map.

func (*StreamMigrator) Templates added in v0.11.0

func (sm *StreamMigrator) Templates() []*VReplicationStream

Templates returns a copy of the StreamMigrator's template streams.

type StreamType added in v0.11.0

type StreamType int

StreamType is an enum representing the kind of stream.

(TODO:@ajm188) This should be made package-private once the last references in package wrangler are removed.

type TableRemovalType added in v0.11.0

type TableRemovalType int

TableRemovalType specifies the way the a table will be removed during a DropSource for a MoveTables workflow.

func (TableRemovalType) String added in v0.11.0

func (trt TableRemovalType) String() string

String returns a string representation of a TableRemovalType

type TargetInfo added in v0.11.0

type TargetInfo struct {
	Targets         map[string]*MigrationTarget
	Frozen          bool
	OptCells        string
	OptTabletTypes  string
	WorkflowType    binlogdatapb.VReplicationWorkflowType
	WorkflowSubType binlogdatapb.VReplicationWorkflowSubType
}

TargetInfo contains the metadata for a set of targets involved in a workflow.

func BuildTargets added in v0.11.0

func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)

BuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace.

It returns ErrNoStreams if there are no targets found for the workflow.

func LegacyBuildTargets added in v0.18.0

func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)

LegacyBuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace. It uses VReplicationExec to get the workflow details rather than the new TabletManager ReadVReplicationWorkflow RPC. This is being used to slowly transition all of the older code, including unit tests, over to the new RPC and limit the impact of the new implementation to vtctldclient. You can see how the unit tests were being migrated here: https://gist.github.com/mattlord/738c12befe951f8d09304ff7fdc47c46

New callers should instead use the new BuildTargets function.

It returns ErrNoStreams if there are no targets found for the workflow.

type TrafficSwitchDirection added in v0.11.0

type TrafficSwitchDirection int

TrafficSwitchDirection specifies the switching direction.

type Type added in v0.11.0

type Type string

Type is the type of a workflow as a string and maps directly to what is provided and presented to the user.

const (
	TypeMoveTables Type = "MoveTables"
	TypeReshard    Type = "Reshard"
	TypeMigrate    Type = "Migrate"
)

Workflow string types.

type VReplicationStream added in v0.11.0

type VReplicationStream struct {
	ID                 int32
	Workflow           string
	BinlogSource       *binlogdatapb.BinlogSource
	Position           replication.Position
	WorkflowType       binlogdatapb.VReplicationWorkflowType
	WorkflowSubType    binlogdatapb.VReplicationWorkflowSubType
	DeferSecondaryKeys bool
}

VReplicationStream represents a single stream of a vreplication workflow.

type VReplicationStreams added in v0.11.0

type VReplicationStreams []*VReplicationStream

VReplicationStreams wraps a slice of VReplicationStream objects to provide some aggregate functionality.

func (VReplicationStreams) Copy added in v0.11.0

Copy returns a copy of the list of streams. All fields except .Position are copied.

func (VReplicationStreams) ToSlice added in v0.11.0

func (streams VReplicationStreams) ToSlice() []*VReplicationStream

ToSlice unwraps a VReplicationStreams object into the underlying slice of VReplicationStream objects.

func (VReplicationStreams) Values added in v0.11.0

func (streams VReplicationStreams) Values() string

Values returns a string representing the IDs of the VReplicationStreams for use in an IN clause.

(TODO|@ajm188) This currently returns the literal ")" if len(streams) == 0. We should probably update this function to return the full "IN" clause, and then if len(streams) == 0, return "1 != 1" so that we can still execute a valid SQL query.

func (VReplicationStreams) Workflows added in v0.11.0

func (streams VReplicationStreams) Workflows() []string

Workflows returns a list of unique workflow names in the list of vreplication streams.

type VReplicationWorkflowType added in v0.18.0

type VReplicationWorkflowType int

VReplicationWorkflowType specifies whether workflow is MoveTables or Reshard and maps directly to what is stored in the backend database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL