Documentation ¶
Overview ¶
Package worker contains the framework, utility methods and core functions for long running actions. 'vtworker' binary will use these.
Index ¶
- Variables
- func AddCommand(groupName string, c Command)
- func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltypes.Value) (int, error)
- func FindChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, ...) ([]string, error)
- func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell, keyspace, shard string) (*topodatapb.TabletAlias, error)
- func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, ...) (*topodatapb.TabletAlias, error)
- func PrintAllCommands(logger logutil.Logger)
- func RowsEqual(left, right []sqltypes.Value) int
- type Command
- type DiffReport
- type Instance
- func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler
- func (wi *Instance) InitInteractiveMode()
- func (wi *Instance) InitStatusHandling()
- func (wi *Instance) InstallSignalHandlers()
- func (wi *Instance) Reset() error
- func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error)
- func (wi *Instance) WaitForCommand(wrk Worker, done chan struct{}) error
- type PanicWorker
- type PingWorker
- type QueryResultReader
- func NewQueryResultReaderForTablet(ctx context.Context, ts topo.Server, tabletAlias *topodatapb.TabletAlias, ...) (*QueryResultReader, error)
- func TableScan(ctx context.Context, log logutil.Logger, ts topo.Server, ...) (*QueryResultReader, error)
- func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts topo.Server, ...) (*QueryResultReader, error)
- type Resolver
- type RowDiffer
- type RowReader
- type RowSplitter
- type RowSubsetDiffer
- type SplitCloneWorker
- func (scw *SplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
- func (scw *SplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error
- func (scw *SplitCloneWorker) Run(ctx context.Context) error
- func (scw *SplitCloneWorker) StatusAsHTML() template.HTML
- func (scw *SplitCloneWorker) StatusAsText() string
- type SplitDiffWorker
- type StatusWorker
- type StatusWorkerState
- type VerticalSplitCloneWorker
- func (vscw *VerticalSplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
- func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error
- func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error
- func (vscw *VerticalSplitCloneWorker) StatusAsHTML() template.HTML
- func (vscw *VerticalSplitCloneWorker) StatusAsText() string
- type VerticalSplitDiffWorker
- type Worker
- func NewPanicWorker(wr *wrangler.Wrangler) (Worker, error)
- func NewPingWorker(wr *wrangler.Wrangler, message string) (Worker, error)
- func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, ...) (Worker, error)
- func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string) Worker
- func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspace, destinationShard string, ...) (Worker, error)
- func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker
Constants ¶
This section is empty.
Variables ¶
var ( // WaitForHealthyEndPointsTimeout intent is to wait for the // healthcheck to automatically return rdonly instances which // have been taken out by previous *Clone or *Diff runs. // Therefore, the default for this variable must be higher // than -health_check_interval. // (it is public for tests to override it) WaitForHealthyEndPointsTimeout = flag.Duration("wait_for_healthy_rdonly_endpoints_timeout", 60*time.Second, "maximum time to wait if less than --min_healthy_rdonly_endpoints are available") )
Functions ¶
func AddCommand ¶
AddCommand registers a command and makes it available.
func CompareRows ¶
func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltypes.Value) (int, error)
CompareRows returns: -1 if left is smaller than right 0 if left and right are equal +1 if left is bigger than right TODO: This can panic if types for left and right don't match.
func FindChunks ¶
func FindChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, td *tabletmanagerdatapb.TableDefinition, minTableSizeForSplit uint64, sourceReaderCount int) ([]string, error)
FindChunks returns an array of chunks to use for splitting up a table into multiple data chunks. It only works for tables with a primary key (and the primary key first column is an integer type). The array will always look like: "", "value1", "value2", "" A non-split tablet will just return: "", ""
func FindHealthyRdonlyEndPoint ¶
func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell, keyspace, shard string) (*topodatapb.TabletAlias, error)
FindHealthyRdonlyEndPoint returns a random healthy endpoint. Since we don't want to use them all, we require at least minHealthyEndPoints servers to be healthy. May block up to -wait_for_healthy_rdonly_endpoints_timeout.
func FindWorkerTablet ¶
func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, cell, keyspace, shard string) (*topodatapb.TabletAlias, error)
FindWorkerTablet will: - find a rdonly instance in the keyspace / shard - mark it as worker - tag it with our worker process
func PrintAllCommands ¶
PrintAllCommands prints a help text for all registered commands to the given Logger.
Types ¶
type Command ¶
type Command struct { Name string Method func(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) Interactive func(ctx context.Context, wi *Instance, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) Params string Help string // if help is empty, won't list the command }
Command contains the detail of a command which can be run in vtworker. While "Method" is run from the command line or RPC, "Interactive" may contain special logic to parse a web form and return templated HTML output.
type DiffReport ¶
type DiffReport struct {
// contains filtered or unexported fields
}
DiffReport has the stats for a diff job
func (*DiffReport) ComputeQPS ¶
func (dr *DiffReport) ComputeQPS()
ComputeQPS fills in processingQPS
func (*DiffReport) HasDifferences ¶
func (dr *DiffReport) HasDifferences() bool
HasDifferences returns true if the diff job recorded any difference
func (*DiffReport) String ¶
func (dr *DiffReport) String() string
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
Instance encapsulate the execution state of vtworker.
func NewInstance ¶
func NewInstance(ctx context.Context, ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance
NewInstance creates a new Instance.
func (*Instance) CreateWrangler ¶
CreateWrangler creates a new wrangler using the instance specific configuration.
func (*Instance) InitInteractiveMode ¶
func (wi *Instance) InitInteractiveMode()
InitInteractiveMode installs webserver handlers for each known command.
func (*Instance) InitStatusHandling ¶
func (wi *Instance) InitStatusHandling()
InitStatusHandling installs webserver handlers for global actions like /status, /reset and /cancel.
func (*Instance) InstallSignalHandlers ¶
func (wi *Instance) InstallSignalHandlers()
InstallSignalHandlers installs signal handler which exit vtworker gracefully.
func (*Instance) Reset ¶
Reset resets the state of a finished worker. It returns an error if the worker is still running.
func (*Instance) RunCommand ¶
func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error)
RunCommand executes the vtworker command specified by "args". Use WaitForCommand() to block on the returned done channel. If wr is nil, the default wrangler will be used. If you pass a wr wrangler, note that a MemoryLogger will be added to its current logger. The returned worker and done channel may be nil if no worker was started e.g. in case of a "Reset".
func (*Instance) WaitForCommand ¶
WaitForCommand blocks until "done" is closed. In the meantime, it logs the status of "wrk".
type PanicWorker ¶
type PanicWorker struct { StatusWorker // contains filtered or unexported fields }
PanicWorker will run panic() when executed. For internal tests only.
func (*PanicWorker) Run ¶
func (pw *PanicWorker) Run(ctx context.Context) error
Run implements the Worker interface.
func (*PanicWorker) StatusAsHTML ¶
func (pw *PanicWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*PanicWorker) StatusAsText ¶
func (pw *PanicWorker) StatusAsText() string
StatusAsText implements the Worker interface.
type PingWorker ¶
type PingWorker struct { StatusWorker // contains filtered or unexported fields }
PingWorker will log a message with level CONSOLE.
func (*PingWorker) Run ¶
func (pw *PingWorker) Run(ctx context.Context) error
Run implements the Worker interface.
func (*PingWorker) StatusAsHTML ¶
func (pw *PingWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*PingWorker) StatusAsText ¶
func (pw *PingWorker) StatusAsText() string
StatusAsText implements the Worker interface.
type QueryResultReader ¶
type QueryResultReader struct { Output <-chan *sqltypes.Result Fields []*querypb.Field // contains filtered or unexported fields }
QueryResultReader will stream rows towards the output channel.
func NewQueryResultReaderForTablet ¶
func NewQueryResultReaderForTablet(ctx context.Context, ts topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error)
NewQueryResultReaderForTablet creates a new QueryResultReader for the provided tablet / sql query
func TableScan ¶
func TableScan(ctx context.Context, log logutil.Logger, ts topo.Server, tabletAlias *topodatapb.TabletAlias, tableDefinition *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error)
TableScan returns a QueryResultReader that gets all the rows from a table, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front.
func TableScanByKeyRange ¶
func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts topo.Server, tabletAlias *topodatapb.TabletAlias, tableDefinition *tabletmanagerdatapb.TableDefinition, keyRange *topodatapb.KeyRange, keyspaceIDType topodatapb.KeyspaceIdType) (*QueryResultReader, error)
TableScanByKeyRange returns a QueryResultReader that gets all the rows from a table that match the supplied KeyRange, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front.
func (*QueryResultReader) Close ¶
func (qrr *QueryResultReader) Close()
Close closes the connection to the tablet.
func (*QueryResultReader) Error ¶
func (qrr *QueryResultReader) Error() error
type Resolver ¶
type Resolver interface { // ResolveDestinationMasters forces the worker to (re)resolve the topology and update // the destination masters that it knows about. ResolveDestinationMasters(ctx context.Context) error // GetDestinationMaster returns the most recently resolved destination master for a particular shard. GetDestinationMaster(shardName string) (*topo.TabletInfo, error) }
Resolver is an interface that should be implemented by any workers that need to resolve the topology.
type RowDiffer ¶
type RowDiffer struct {
// contains filtered or unexported fields
}
RowDiffer will consume rows on both sides, and compare them. It assumes left and right are sorted by ascending primary key. it will record errors if extra rows exist on either side.
func NewRowDiffer ¶
func NewRowDiffer(left, right *QueryResultReader, tableDefinition *tabletmanagerdatapb.TableDefinition) (*RowDiffer, error)
NewRowDiffer returns a new RowDiffer
type RowReader ¶
type RowReader struct {
// contains filtered or unexported fields
}
RowReader returns individual rows from a QueryResultReader
func NewRowReader ¶
func NewRowReader(queryResultReader *QueryResultReader) *RowReader
NewRowReader returns a RowReader based on the QueryResultReader
type RowSplitter ¶
type RowSplitter struct { KeyspaceIdType topodatapb.KeyspaceIdType ValueIndex int KeyRanges []*topodatapb.KeyRange }
RowSplitter is a helper class to split rows into multiple subsets targeted to different shards.
func NewRowSplitter ¶
func NewRowSplitter(shardInfos []*topo.ShardInfo, keyspaceIdType topodatapb.KeyspaceIdType, valueIndex int) *RowSplitter
NewRowSplitter returns a new row splitter for the given shard distribution.
func (*RowSplitter) Send ¶
func (rs *RowSplitter) Send(fields []*querypb.Field, result [][][]sqltypes.Value, baseCmd string, insertChannels []chan string, abort <-chan struct{}) bool
Send will send the rows to the list of channels. Returns true if aborted.
func (*RowSplitter) StartSplit ¶
func (rs *RowSplitter) StartSplit() [][][]sqltypes.Value
StartSplit starts a new split. Split can then be called multiple times.
type RowSubsetDiffer ¶
type RowSubsetDiffer struct {
// contains filtered or unexported fields
}
RowSubsetDiffer will consume rows on both sides, and compare them. It assumes superset and subset are sorted by ascending primary key. It will record errors in DiffReport.extraRowsRight if extra rows exist on the subset side, and DiffReport.extraRowsLeft will always be zero.
func NewRowSubsetDiffer ¶
func NewRowSubsetDiffer(superset, subset *QueryResultReader, pkFieldCount int) (*RowSubsetDiffer, error)
NewRowSubsetDiffer returns a new RowSubsetDiffer
func (*RowSubsetDiffer) Go ¶
func (rd *RowSubsetDiffer) Go(log logutil.Logger) (dr DiffReport, err error)
Go runs the diff. If there is no error, it will drain both sides. If an error occurs, it will just return it and stop.
type SplitCloneWorker ¶
type SplitCloneWorker struct { StatusWorker // contains filtered or unexported fields }
SplitCloneWorker will clone the data within a keyspace from a source set of shards to a destination set of shards.
func (*SplitCloneWorker) GetDestinationMaster ¶
func (scw *SplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
GetDestinationMaster implements the Resolver interface
func (*SplitCloneWorker) ResolveDestinationMasters ¶
func (scw *SplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error
ResolveDestinationMasters implements the Resolver interface. It will attempt to resolve all shards and update scw.destinationShardsToTablets; if it is unable to do so, it will not modify scw.destinationShardsToTablets at all.
func (*SplitCloneWorker) Run ¶
func (scw *SplitCloneWorker) Run(ctx context.Context) error
Run implements the Worker interface
func (*SplitCloneWorker) StatusAsHTML ¶
func (scw *SplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*SplitCloneWorker) StatusAsText ¶
func (scw *SplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type SplitDiffWorker ¶
type SplitDiffWorker struct { StatusWorker // contains filtered or unexported fields }
SplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*SplitDiffWorker) Run ¶
func (sdw *SplitDiffWorker) Run(ctx context.Context) error
Run is mostly a wrapper to run the cleanup at the end.
func (*SplitDiffWorker) StatusAsHTML ¶
func (sdw *SplitDiffWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface
func (*SplitDiffWorker) StatusAsText ¶
func (sdw *SplitDiffWorker) StatusAsText() string
StatusAsText is part of the Worker interface
type StatusWorker ¶
type StatusWorker struct { // Mu is protecting the state variable, and can also be used // by implementations to protect their own variables. Mu *sync.Mutex // State contains the worker's current state, and should only // be accessed under Mu. State StatusWorkerState }
StatusWorker is the base type for a worker which keeps a status. The status is protected by a mutex. Any other internal variable can also be protected by that mutex. StatusWorker also provides default implementations for StatusAsHTML and StatusAsText to make it easier on workers if they don't need to export more.
func NewStatusWorker ¶
func NewStatusWorker() StatusWorker
NewStatusWorker returns a StatusWorker in state WorkerStateNotSarted
func (*StatusWorker) SetState ¶
func (worker *StatusWorker) SetState(state StatusWorkerState)
SetState is a convenience function for workers
func (*StatusWorker) StatusAsHTML ¶
func (worker *StatusWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface
func (*StatusWorker) StatusAsText ¶
func (worker *StatusWorker) StatusAsText() string
StatusAsText is part of the Worker interface
type StatusWorkerState ¶
type StatusWorkerState string
StatusWorkerState is the type for a StatusWorker's status
const ( WorkerStateNotSarted StatusWorkerState = "not started" WorkerStateDone StatusWorkerState = "done" WorkerStateError StatusWorkerState = "error" WorkerStateInit StatusWorkerState = "initializing" WorkerStateFindTargets StatusWorkerState = "finding target instances" WorkerStateSyncReplication StatusWorkerState = "synchronizing replication" WorkerStateCopy StatusWorkerState = "copying the data" WorkerStateDiff StatusWorkerState = "running the diff" WorkerStateCleanUp StatusWorkerState = "cleaning up" )
func (StatusWorkerState) String ¶
func (state StatusWorkerState) String() string
type VerticalSplitCloneWorker ¶
type VerticalSplitCloneWorker struct { StatusWorker // contains filtered or unexported fields }
VerticalSplitCloneWorker will clone the data from a source keyspace/shard to a destination keyspace/shard.
func (*VerticalSplitCloneWorker) GetDestinationMaster ¶
func (vscw *VerticalSplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
GetDestinationMaster implements the Resolver interface
func (*VerticalSplitCloneWorker) ResolveDestinationMasters ¶
func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error
ResolveDestinationMasters implements the Resolver interface. It will attempt to resolve all shards and update vscw.destinationShardsToTablets; if it is unable to do so, it will not modify vscw.destinationShardsToTablets at all.
func (*VerticalSplitCloneWorker) Run ¶
func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error
Run implements the Worker interface
func (*VerticalSplitCloneWorker) StatusAsHTML ¶
func (vscw *VerticalSplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*VerticalSplitCloneWorker) StatusAsText ¶
func (vscw *VerticalSplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type VerticalSplitDiffWorker ¶
type VerticalSplitDiffWorker struct { StatusWorker // contains filtered or unexported fields }
VerticalSplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*VerticalSplitDiffWorker) Run ¶
func (vsdw *VerticalSplitDiffWorker) Run(ctx context.Context) error
Run is mostly a wrapper to run the cleanup at the end.
func (*VerticalSplitDiffWorker) StatusAsHTML ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface.
func (*VerticalSplitDiffWorker) StatusAsText ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsText() string
StatusAsText is part of the Worker interface.
type Worker ¶
type Worker interface { // StatusAsHTML returns the current worker status in HTML StatusAsHTML() template.HTML // StatusAsText returns the current worker status in plain text StatusAsText() string // Run is the main entry point for the worker. It will be // called in a go routine. When the passed in context is // cancelled, Run should exit as soon as possible. Run(context.Context) error }
Worker is the base interface for all long running workers.
func NewPanicWorker ¶
NewPanicWorker returns a new PanicWorker object.
func NewPingWorker ¶
NewPingWorker returns a new PingWorker object.
func NewSplitCloneWorker ¶
func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, strategyStr string, sourceReaderCount, destinationPackCount int, minTableSizeForSplit uint64, destinationWriterCount int) (Worker, error)
NewSplitCloneWorker returns a new SplitCloneWorker object.
func NewSplitDiffWorker ¶
func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string) Worker
NewSplitDiffWorker returns a new SplitDiffWorker object.
func NewVerticalSplitCloneWorker ¶
func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspace, destinationShard string, tables []string, strategyStr string, sourceReaderCount, destinationPackCount int, minTableSizeForSplit uint64, destinationWriterCount int) (Worker, error)
NewVerticalSplitCloneWorker returns a new VerticalSplitCloneWorker object.
Source Files ¶
- clone_utils.go
- command.go
- defaults.go
- diff_utils.go
- instance.go
- interactive.go
- panic.go
- panic_cmd.go
- ping.go
- ping_cmd.go
- row_splitter.go
- split_clone.go
- split_clone_cmd.go
- split_diff.go
- split_diff_cmd.go
- split_strategy.go
- status.go
- status_worker.go
- topo_utils.go
- vertical_split_clone.go
- vertical_split_clone_cmd.go
- vertical_split_diff.go
- vertical_split_diff_cmd.go
- worker.go
Directories ¶
Path | Synopsis |
---|---|
Package fakevtworkerclient contains a fake for the vtworkerclient interface.
|
Package fakevtworkerclient contains a fake for the vtworkerclient interface. |
Package grpcvtworkerclient contains the gRPC version of the vtworker client protocol.
|
Package grpcvtworkerclient contains the gRPC version of the vtworker client protocol. |
Package grpcvtworkerserver contains the gRPC implementation of the server side of the remote execution of vtworker commands.
|
Package grpcvtworkerserver contains the gRPC implementation of the server side of the remote execution of vtworker commands. |
Package vtworkerclient contains the generic client side of the remote vtworker protocol.
|
Package vtworkerclient contains the generic client side of the remote vtworker protocol. |
Package vtworkerclienttest contains the testsuite against which each RPC implementation of the vtworkerclient interface must be tested.
|
Package vtworkerclienttest contains the testsuite against which each RPC implementation of the vtworkerclient interface must be tested. |