Documentation ¶
Overview ¶
Package worker contains the framework, utility methods and core functions for long running actions. 'vtworker' binary will use these.
Index ¶
- Constants
- Variables
- func AddCommand(groupName string, c Command)
- func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltypes.Value) (int, error)
- func FindHealthyRdonlyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, ...) (*topodatapb.TabletAlias, error)
- func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, ...) (*topodatapb.TabletAlias, error)
- func PrintAllCommands(logger logutil.Logger)
- func RowsEqual(left, right []sqltypes.Value) int
- type BaseQueryBuilder
- type BlockWorker
- type Command
- type DeletesQueryBuilder
- type DiffReport
- type DiffType
- type InsertsQueryBuilder
- type Instance
- func (wi *Instance) Cancel() bool
- func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler
- func (wi *Instance) InitInteractiveMode()
- func (wi *Instance) InitStatusHandling()
- func (wi *Instance) InstallSignalHandlers()
- func (wi *Instance) Reset() error
- func (wi *Instance) RunCommand(ctx context.Context, args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error)
- func (wi *Instance) WaitForCommand(wrk Worker, done chan struct{}) error
- type LegacySplitCloneWorker
- type Pair
- type PairList
- type PanicWorker
- type PingWorker
- type QueryBuilder
- type QueryResultReader
- func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, ...) (*QueryResultReader, error)
- func TableScan(ctx context.Context, log logutil.Logger, ts *topo.Server, ...) (*QueryResultReader, error)
- func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts *topo.Server, ...) (*QueryResultReader, error)
- type RestartableResultReader
- type ResultMerger
- type ResultReader
- type RowAggregator
- type RowDiffer
- type RowDiffer2
- type RowReader
- type RowRouter
- type RowSplitter
- type SplitCloneWorker
- type SplitDiffWorker
- type StatusWorker
- type StatusWorkerState
- type TabletTracker
- type UpdatesQueryBuilder
- type VerticalSplitDiffWorker
- type Worker
- func NewBlockWorker(wr *wrangler.Wrangler) (Worker, error)
- func NewLegacySplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, ...) (Worker, error)
- func NewPanicWorker(wr *wrangler.Wrangler) (Worker, error)
- func NewPingWorker(wr *wrangler.Wrangler, message string) (Worker, error)
- func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, sourceUID uint32, ...) Worker
- func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ...) Worker
Constants ¶
const ResultSizeRows = 64
ResultSizeRows specifies how many rows should be merged together per returned Result. Higher values will improve the performance of the overall pipeline but increase the memory usage. The current value of 64 rows tries to aim at a total size of 4k bytes for the returned Result (i.e. we assume an average row size of 64 bytes).
Variables ¶
var DiffFoundTypes = []DiffType{DiffMissing, DiffNotEqual, DiffExtraneous}
DiffFoundTypes has the list of DiffType values which represent that a difference was found. The list is ordered by the values of the types.
var DiffTypes = []DiffType{DiffMissing, DiffNotEqual, DiffExtraneous, DiffEqual}
DiffTypes has the list of available DiffType values, ordered by their value.
var ErrStoppedRowReader = errors.New("RowReader won't advance to the next Result because StopAfterCurrentResult() was called")
ErrStoppedRowReader is returned by RowReader.Next() when StopAfterCurrentResult() and it finished the current result.
Functions ¶
func AddCommand ¶
AddCommand registers a command and makes it available.
func CompareRows ¶
func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltypes.Value) (int, error)
CompareRows returns: -1 if left is smaller than right 0 if left and right are equal +1 if left is bigger than right It compares only up to and including the first "compareCount" columns of each row. TODO: This can panic if types for left and right don't match.
func FindHealthyRdonlyTablet ¶
func FindHealthyRdonlyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int) (*topodatapb.TabletAlias, error)
FindHealthyRdonlyTablet returns a random healthy RDONLY tablet. Since we don't want to use them all, we require at least minHealthyRdonlyTablets servers to be healthy. May block up to -wait_for_healthy_rdonly_tablets_timeout.
func FindWorkerTablet ¶
func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int) (*topodatapb.TabletAlias, error)
FindWorkerTablet will: - find a rdonly instance in the keyspace / shard - mark it as worker - tag it with our worker process
func PrintAllCommands ¶
PrintAllCommands prints a help text for all registered commands to the given Logger.
Types ¶
type BaseQueryBuilder ¶
type BaseQueryBuilder struct {
// contains filtered or unexported fields
}
BaseQueryBuilder partially implements the QueryBuilder interface. It can be used by other QueryBuilder implementations to avoid repeating code.
func (*BaseQueryBuilder) WriteHead ¶
func (b *BaseQueryBuilder) WriteHead(buffer *bytes.Buffer)
WriteHead implements the QueryBuilder interface.
func (*BaseQueryBuilder) WriteSeparator ¶
func (b *BaseQueryBuilder) WriteSeparator(buffer *bytes.Buffer)
WriteSeparator implements the QueryBuilder interface.
func (*BaseQueryBuilder) WriteTail ¶
func (b *BaseQueryBuilder) WriteTail(buffer *bytes.Buffer)
WriteTail implements the QueryBuilder interface.
type BlockWorker ¶
type BlockWorker struct { StatusWorker // contains filtered or unexported fields }
BlockWorker will block infinitely until its context is canceled.
func (*BlockWorker) Run ¶
func (bw *BlockWorker) Run(ctx context.Context) error
Run implements the Worker interface.
func (*BlockWorker) StatusAsHTML ¶
func (bw *BlockWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface.
func (*BlockWorker) StatusAsText ¶
func (bw *BlockWorker) StatusAsText() string
StatusAsText implements the Worker interface.
type Command ¶
type Command struct { Name string Method func(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) Interactive func(ctx context.Context, wi *Instance, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) Params string Help string // if help is empty, won't list the command }
Command contains the detail of a command which can be run in vtworker. While "Method" is run from the command line or RPC, "Interactive" may contain special logic to parse a web form and return templated HTML output.
type DeletesQueryBuilder ¶
type DeletesQueryBuilder struct { BaseQueryBuilder // contains filtered or unexported fields }
DeletesQueryBuilder implements the QueryBuilder interface for DELETE queries.
func NewDeletesQueryBuilder ¶
func NewDeletesQueryBuilder(dbName string, td *tabletmanagerdatapb.TableDefinition) *DeletesQueryBuilder
NewDeletesQueryBuilder creates a new DeletesQueryBuilder.
type DiffReport ¶
type DiffReport struct {
// contains filtered or unexported fields
}
DiffReport has the stats for a diff job
func (*DiffReport) ComputeQPS ¶
func (dr *DiffReport) ComputeQPS()
ComputeQPS fills in processingQPS
func (*DiffReport) HasDifferences ¶
func (dr *DiffReport) HasDifferences() bool
HasDifferences returns true if the diff job recorded any difference
func (*DiffReport) String ¶
func (dr *DiffReport) String() string
type DiffType ¶
type DiffType int
DiffType specifies why a specific row was found as different when comparing a left and right side.
const ( // DiffMissing is returned when the row is missing on the right side. DiffMissing DiffType = iota // DiffNotEqual is returned when the row on the left and right side are // not equal. DiffNotEqual // DiffExtraneous is returned when the row exists on the right side, but not // on the left side. DiffExtraneous // DiffEqual is returned when the rows left and right are equal. DiffEqual )
type InsertsQueryBuilder ¶
type InsertsQueryBuilder struct {
BaseQueryBuilder
}
InsertsQueryBuilder implements the QueryBuilder interface for INSERT queries.
func NewInsertsQueryBuilder ¶
func NewInsertsQueryBuilder(dbName string, td *tabletmanagerdatapb.TableDefinition) *InsertsQueryBuilder
NewInsertsQueryBuilder creates a new InsertsQueryBuilder.
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
Instance encapsulate the execution state of vtworker.
func NewInstance ¶
NewInstance creates a new Instance.
func (*Instance) Cancel ¶
Cancel calls the cancel function of the current vtworker job. It returns true, if a job was running. False otherwise. NOTE: Cancel won't reset the state as well. Use Reset() to do so.
func (*Instance) CreateWrangler ¶
CreateWrangler creates a new wrangler using the instance specific configuration.
func (*Instance) InitInteractiveMode ¶
func (wi *Instance) InitInteractiveMode()
InitInteractiveMode installs webserver handlers for each known command.
func (*Instance) InitStatusHandling ¶
func (wi *Instance) InitStatusHandling()
InitStatusHandling installs webserver handlers for global actions like /status, /reset and /cancel.
func (*Instance) InstallSignalHandlers ¶
func (wi *Instance) InstallSignalHandlers()
InstallSignalHandlers installs signal handler which exit vtworker gracefully.
func (*Instance) Reset ¶
Reset resets the state of a finished worker. It returns an error if the worker is still running.
func (*Instance) RunCommand ¶
func (wi *Instance) RunCommand(ctx context.Context, args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error)
RunCommand executes the vtworker command specified by "args". Use WaitForCommand() to block on the returned done channel. If wr is nil, the default wrangler will be used. If you pass a wr wrangler, note that a MemoryLogger will be added to its current logger. The returned worker and done channel may be nil if no worker was started e.g. in case of a "Reset".
func (*Instance) WaitForCommand ¶
WaitForCommand blocks until "done" is closed. In the meantime, it logs the status of "wrk".
type LegacySplitCloneWorker ¶
type LegacySplitCloneWorker struct { StatusWorker // contains filtered or unexported fields }
LegacySplitCloneWorker will clone the data within a keyspace from a source set of shards to a destination set of shards.
func (*LegacySplitCloneWorker) Run ¶
func (scw *LegacySplitCloneWorker) Run(ctx context.Context) error
Run implements the Worker interface
func (*LegacySplitCloneWorker) StatusAsHTML ¶
func (scw *LegacySplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*LegacySplitCloneWorker) StatusAsText ¶
func (scw *LegacySplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type PairList ¶
type PairList []Pair
PairList is a slice of Pairs that implements sort.Interface to sort by Value.
type PanicWorker ¶
type PanicWorker struct { StatusWorker // contains filtered or unexported fields }
PanicWorker will run panic() when executed. For internal tests only.
func (*PanicWorker) Run ¶
func (pw *PanicWorker) Run(ctx context.Context) error
Run implements the Worker interface.
func (*PanicWorker) StatusAsHTML ¶
func (pw *PanicWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*PanicWorker) StatusAsText ¶
func (pw *PanicWorker) StatusAsText() string
StatusAsText implements the Worker interface.
type PingWorker ¶
type PingWorker struct { StatusWorker // contains filtered or unexported fields }
PingWorker will log a message with level CONSOLE.
func (*PingWorker) Run ¶
func (pw *PingWorker) Run(ctx context.Context) error
Run implements the Worker interface.
func (*PingWorker) StatusAsHTML ¶
func (pw *PingWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*PingWorker) StatusAsText ¶
func (pw *PingWorker) StatusAsText() string
StatusAsText implements the Worker interface.
type QueryBuilder ¶
type QueryBuilder interface { // WriteHead writes the beginning of the query into the buffer. WriteHead(*bytes.Buffer) // WriteTail writes any required tailing string into the buffer. WriteTail(*bytes.Buffer) // Write the separator between two rows. WriteSeparator(*bytes.Buffer) // Write the row itself. WriteRow(*bytes.Buffer, []sqltypes.Value) }
QueryBuilder defines for a given reconciliation type how we have to build the SQL query for one or more rows.
type QueryResultReader ¶
type QueryResultReader struct {
// contains filtered or unexported fields
}
QueryResultReader will stream rows towards the output channel. TODO(mberlin): Delete this in favor of RestartableResultReader once we are confident that the new SplitClone code produces the same diff results as the old diff code.
func NewQueryResultReaderForTablet ¶
func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error)
NewQueryResultReaderForTablet creates a new QueryResultReader for the provided tablet / sql query
func TableScan ¶
func TableScan(ctx context.Context, log logutil.Logger, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error)
TableScan returns a QueryResultReader that gets all the rows from a table, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front.
func TableScanByKeyRange ¶
func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, td *tabletmanagerdatapb.TableDefinition, keyRange *topodatapb.KeyRange, keyspaceSchema *vindexes.KeyspaceSchema, shardingColumnName string, shardingColumnType topodatapb.KeyspaceIdType) (*QueryResultReader, error)
TableScanByKeyRange returns a QueryResultReader that gets all the rows from a table that match the supplied KeyRange, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front. If keyspaceSchema is passed in, we go into v3 mode, and we ask for all source data, and filter here. Otherwise we stick with v2 mode, where we can ask the source tablet to do the filtering.
func (*QueryResultReader) Close ¶
func (qrr *QueryResultReader) Close(ctx context.Context) error
Close closes the connection to the tablet.
func (*QueryResultReader) Fields ¶
func (qrr *QueryResultReader) Fields() []*querypb.Field
Fields returns the field data. It implements ResultReader.
type RestartableResultReader ¶
type RestartableResultReader struct {
// contains filtered or unexported fields
}
RestartableResultReader will stream all rows within a chunk. If the streaming query gets interrupted, it can resume the stream after the last row which was read.
func NewRestartableResultReader ¶
func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool) (*RestartableResultReader, error)
NewRestartableResultReader creates a new RestartableResultReader for the provided tablet and chunk. It will automatically create the necessary query to read all rows within the chunk. NOTE: We assume that the Columns field in "td" was ordered by a preceding call to reorderColumnsPrimaryKeyFirst().
func (*RestartableResultReader) Close ¶
func (r *RestartableResultReader) Close(ctx context.Context)
Close closes the connection to the tablet.
func (*RestartableResultReader) Fields ¶
func (r *RestartableResultReader) Fields() []*querypb.Field
Fields returns the field data. It implements ResultReader.
type ResultMerger ¶
type ResultMerger struct {
// contains filtered or unexported fields
}
ResultMerger returns a sorted stream of multiple ResultReader input streams. The output stream will be sorted by ascending primary key order. It implements the ResultReader interface.
func NewResultMerger ¶
func NewResultMerger(inputs []ResultReader, pkFieldCount int) (*ResultMerger, error)
NewResultMerger returns a new ResultMerger.
func (*ResultMerger) Fields ¶
func (rm *ResultMerger) Fields() []*querypb.Field
Fields returns the field information for the columns in the result. It is part of the ResultReader interface.
type ResultReader ¶
type ResultReader interface { // Fields returns the field information for the columns in the result. Fields() []*querypb.Field // Next is identical to sqltypes.ResultStream.Recv(). // It returns the next result on the stream. // It will return io.EOF if the stream ended. Next() (*sqltypes.Result, error) }
ResultReader is an advanced version of sqltypes.ResultStream. In addition to the streamed Result messages (which contain a set of rows), it will expose the Fields (columns information) of the result separately.
Note that some code in the worker package checks if instances of ResultReader are equal. In consequence, any ResultReader implementation must always use pointer receivers. This way, implementations are always referred by their pointer type and the equal comparison of ResultReader instances behaves as expected.
type RowAggregator ¶
type RowAggregator struct {
// contains filtered or unexported fields
}
RowAggregator aggregates SQL reconciliation statements into one statement. Once a limit (maxRows or maxSize) is reached, the statement will be sent to the destination's insertChannel. RowAggregator is also aware of the type of statement (DiffType) and constructs the necessary SQL command based on that. Aggregating multiple statements is done to improve the overall performance. One RowAggregator instance is specific to one destination shard and diff type. Important: The SQL statement generation assumes that the fields of the provided row are in the same order as "td.Columns".
func NewRowAggregator ¶
func NewRowAggregator(ctx context.Context, maxRows, maxSize int, insertChannel chan string, dbName string, td *tabletmanagerdatapb.TableDefinition, diffType DiffType, statsCounters *stats.CountersWithSingleLabel) *RowAggregator
NewRowAggregator returns a RowAggregator. The index of the elements in statCounters must match the elements in "DiffTypes" i.e. the first counter is for inserts, second for updates and the third for deletes.
func (*RowAggregator) Add ¶
func (ra *RowAggregator) Add(row []sqltypes.Value) error
Add will add a new row which must be reconciled. If an error is returned, RowAggregator will be in an undefined state and must not be used any longer.
func (*RowAggregator) Flush ¶
func (ra *RowAggregator) Flush() error
Flush sends out the current aggregation buffer.
type RowDiffer ¶
type RowDiffer struct {
// contains filtered or unexported fields
}
RowDiffer will consume rows on both sides, and compare them. It assumes left and right are sorted by ascending primary key. it will record errors if extra rows exist on either side.
func NewRowDiffer ¶
func NewRowDiffer(left, right *QueryResultReader, tableDefinition *tabletmanagerdatapb.TableDefinition) (*RowDiffer, error)
NewRowDiffer returns a new RowDiffer
type RowDiffer2 ¶
type RowDiffer2 struct {
// contains filtered or unexported fields
}
RowDiffer2 will compare and reconcile two sides. It assumes that the left side is the source of truth and necessary reconciliations have to be applied to the right side. It also assumes left and right are sorted by ascending primary key.
func NewRowDiffer2 ¶
func NewRowDiffer2(ctx context.Context, left, right ResultReader, td *tabletmanagerdatapb.TableDefinition, tableStatusList *tableStatusList, tableIndex int, destinationShards []*topo.ShardInfo, keyResolver keyspaceIDResolver, insertChannels []chan string, abort <-chan struct{}, dbNames []string, writeQueryMaxRows, writeQueryMaxSize int, statsCounters []*stats.CountersWithSingleLabel) (*RowDiffer2, error)
NewRowDiffer2 returns a new RowDiffer2. We assume that the indexes of the slice parameters always correspond to the same shard e.g. insertChannels[0] refers to destinationShards[0] and so on. The column list td.Columns must be have all primary key columns first and then the non-primary-key columns. The columns in the rows returned by both ResultReader must have the same order as td.Columns.
func (*RowDiffer2) Diff ¶
func (rd *RowDiffer2) Diff() (DiffReport, error)
Diff runs the diff and reconcile. If an error occurs, it will return and stop.
type RowReader ¶
type RowReader struct {
// contains filtered or unexported fields
}
RowReader returns individual rows from a ResultReader.
func NewRowReader ¶
func NewRowReader(resultReader ResultReader) *RowReader
NewRowReader returns a RowReader based on the QueryResultReader
func (*RowReader) Next ¶
Next will return: (row, nil) for the next row (nil, nil) for EOF (nil, error) if an error occurred
func (*RowReader) StopAfterCurrentResult ¶
func (rr *RowReader) StopAfterCurrentResult()
StopAfterCurrentResult tells RowReader to keep returning rows in Next() until it has finished the current Result. Once there, Next() will always return the "StoppedRowReader" error. This is feature is necessary for an optimization where the underlying ResultReader is the last input in a merge and we want to switch from reading rows to reading Results.
type RowRouter ¶
type RowRouter struct {
// contains filtered or unexported fields
}
RowRouter allows to find out which shard's key range contains a given keyspace ID.
func NewRowRouter ¶
NewRowRouter creates a RowRouter. We assume that the key ranges in shardInfo cover the full keyrange i.e. for any possible keyspaceID there is a shard we can route to.
type RowSplitter ¶
type RowSplitter struct { KeyResolver keyspaceIDResolver KeyRanges []*topodatapb.KeyRange }
RowSplitter is a helper class to split rows into multiple subsets targeted to different shards.
func NewRowSplitter ¶
func NewRowSplitter(shardInfos []*topo.ShardInfo, keyResolver keyspaceIDResolver) *RowSplitter
NewRowSplitter returns a new row splitter for the given shard distribution.
func (*RowSplitter) Send ¶
func (rs *RowSplitter) Send(fields []*querypb.Field, result [][][]sqltypes.Value, baseCmds []string, insertChannels []chan string, abort <-chan struct{}) bool
Send will send the rows to the list of channels. Returns true if aborted.
func (*RowSplitter) StartSplit ¶
func (rs *RowSplitter) StartSplit() [][][]sqltypes.Value
StartSplit starts a new split. Split can then be called multiple times.
type SplitCloneWorker ¶
type SplitCloneWorker struct { StatusWorker // contains filtered or unexported fields }
SplitCloneWorker will clone the data within a keyspace from a source set of shards to a destination set of shards.
func (*SplitCloneWorker) FormattedOfflineSources ¶
func (scw *SplitCloneWorker) FormattedOfflineSources() string
FormattedOfflineSources returns a space separated list of tablets which are in use during the offline clone phase.
func (*SplitCloneWorker) Run ¶
func (scw *SplitCloneWorker) Run(ctx context.Context) error
Run implements the Worker interface
func (*SplitCloneWorker) StatsUpdate ¶
func (scw *SplitCloneWorker) StatsUpdate(ts *discovery.TabletStats)
StatsUpdate receives replication lag updates for each destination master and forwards them to the respective throttler instance. It also forwards any update to the TabletStatsCache to keep it up to date. It is part of the discovery.HealthCheckStatsListener interface.
func (*SplitCloneWorker) StatusAsHTML ¶
func (scw *SplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*SplitCloneWorker) StatusAsText ¶
func (scw *SplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type SplitDiffWorker ¶
type SplitDiffWorker struct { StatusWorker // contains filtered or unexported fields }
SplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*SplitDiffWorker) Run ¶
func (sdw *SplitDiffWorker) Run(ctx context.Context) error
Run is mostly a wrapper to run the cleanup at the end.
func (*SplitDiffWorker) StatusAsHTML ¶
func (sdw *SplitDiffWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface
func (*SplitDiffWorker) StatusAsText ¶
func (sdw *SplitDiffWorker) StatusAsText() string
StatusAsText is part of the Worker interface
type StatusWorker ¶
type StatusWorker struct {
// contains filtered or unexported fields
}
StatusWorker is the base type for a worker which keeps a status. The status is protected by a mutex. StatusWorker also provides default implementations for StatusAsHTML and StatusAsText to make it easier on workers if they don't need to export more.
func NewStatusWorker ¶
func NewStatusWorker() StatusWorker
NewStatusWorker returns a StatusWorker in state WorkerStateNotStarted.
func (*StatusWorker) SetState ¶
func (w *StatusWorker) SetState(state StatusWorkerState)
SetState is a convenience function for workers.
func (*StatusWorker) State ¶
func (w *StatusWorker) State() StatusWorkerState
State is part of the Worker interface.
func (*StatusWorker) StatusAsHTML ¶
func (w *StatusWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface.
func (*StatusWorker) StatusAsText ¶
func (w *StatusWorker) StatusAsText() string
StatusAsText is part of the Worker interface.
type StatusWorkerState ¶
type StatusWorkerState string
StatusWorkerState is the type for a StatusWorker's status
const ( // WorkerStateNotStarted is the initial state. WorkerStateNotStarted StatusWorkerState = "not started" // WorkerStateDone is set when the worker successfully finished. WorkerStateDone StatusWorkerState = "done" // WorkerStateError is set when the worker failed. WorkerStateError StatusWorkerState = "error" // WorkerStateInit is set when the worker does initialize its state. WorkerStateInit StatusWorkerState = "initializing" // WorkerStateFindTargets is set when the worker searches healthy RDONLY tablets. WorkerStateFindTargets StatusWorkerState = "finding target instances" // WorkerStateSyncReplication is set when the worker ensures that source and // destination tablets are at the same GTID during the diff. WorkerStateSyncReplication StatusWorkerState = "synchronizing replication" // WorkerStateCloneOnline is set when the worker copies the data in the online phase. WorkerStateCloneOnline StatusWorkerState = "cloning the data (online)" // WorkerStateCloneOffline is set when the worker copies the data in the offline phase. WorkerStateCloneOffline StatusWorkerState = "cloning the data (offline)" // WorkerStateDiff is set when the worker compares the data. WorkerStateDiff StatusWorkerState = "running the diff" // WorkerStateDebugRunning is set when an internal command (e.g. Block or Ping) is currently running. WorkerStateDebugRunning StatusWorkerState = "running an internal debug command" // WorkerStateCleanUp is set when the worker reverses the initialization e.g. // the type of a taken out RDONLY tablet is changed back from "worker" to "spare". WorkerStateCleanUp StatusWorkerState = "cleaning up" )
func (StatusWorkerState) String ¶
func (state StatusWorkerState) String() string
type TabletTracker ¶
type TabletTracker struct {
// contains filtered or unexported fields
}
TabletTracker tracks for each tablet alias how often it is currently in use for a streaming read query. By using this information, all streaming queries should be balanced across all available tablets.
func NewTabletTracker ¶
func NewTabletTracker() *TabletTracker
NewTabletTracker returns a new TabletTracker.
func (*TabletTracker) TabletsInUse ¶
func (t *TabletTracker) TabletsInUse() string
TabletsInUse returns a string of all tablet aliases currently in use. The tablets are separated by a space.
func (*TabletTracker) Track ¶
func (t *TabletTracker) Track(stats []discovery.TabletStats) *topodatapb.Tablet
Track will pick the least used tablet from "stats", increment its usage by 1 and return it. "stats" must not be empty.
func (*TabletTracker) Untrack ¶
func (t *TabletTracker) Untrack(alias *topodatapb.TabletAlias)
Untrack decrements the usage of "alias" by 1.
type UpdatesQueryBuilder ¶
type UpdatesQueryBuilder struct { BaseQueryBuilder // contains filtered or unexported fields }
UpdatesQueryBuilder implements the QueryBuilder interface for UPDATE queries.
func NewUpdatesQueryBuilder ¶
func NewUpdatesQueryBuilder(dbName string, td *tabletmanagerdatapb.TableDefinition) *UpdatesQueryBuilder
NewUpdatesQueryBuilder creates a new UpdatesQueryBuilder.
func (*UpdatesQueryBuilder) WriteRow ¶
func (b *UpdatesQueryBuilder) WriteRow(buffer *bytes.Buffer, row []sqltypes.Value)
WriteRow implements the QueryBuilder interface.
func (*UpdatesQueryBuilder) WriteSeparator ¶
func (b *UpdatesQueryBuilder) WriteSeparator(buffer *bytes.Buffer)
WriteSeparator implements the QueryBuilder interface and overrides the BaseQueryBuilder implementation.
type VerticalSplitDiffWorker ¶
type VerticalSplitDiffWorker struct { StatusWorker // contains filtered or unexported fields }
VerticalSplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*VerticalSplitDiffWorker) Run ¶
func (vsdw *VerticalSplitDiffWorker) Run(ctx context.Context) error
Run is mostly a wrapper to run the cleanup at the end.
func (*VerticalSplitDiffWorker) StatusAsHTML ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsHTML() template.HTML
StatusAsHTML is part of the Worker interface.
func (*VerticalSplitDiffWorker) StatusAsText ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsText() string
StatusAsText is part of the Worker interface.
type Worker ¶
type Worker interface { // State returns the current state using the internal representation. State() StatusWorkerState // StatusAsHTML returns the current worker status in HTML. StatusAsHTML() template.HTML // StatusAsText returns the current worker status in plain text. StatusAsText() string // Run is the main entry point for the worker. It will be // called in a go routine. When the passed in context is canceled, Run() // should exit as soon as possible. Run(context.Context) error }
Worker is the base interface for all long running workers.
func NewBlockWorker ¶
NewBlockWorker returns a new BlockWorker object.
func NewLegacySplitCloneWorker ¶
func NewLegacySplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, strategyStr string, sourceReaderCount, destinationPackCount, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS int64) (Worker, error)
NewLegacySplitCloneWorker returns a new LegacySplitCloneWorker object.
func NewPanicWorker ¶
NewPanicWorker returns a new PanicWorker object.
func NewPingWorker ¶
NewPingWorker returns a new PingWorker object.
Source Files ¶
- block.go
- block_cmd.go
- chunk.go
- clone_utils.go
- command.go
- defaults.go
- diff_utils.go
- executor.go
- instance.go
- interactive.go
- key_resolver.go
- legacy_row_splitter.go
- legacy_split_clone.go
- legacy_split_clone_cmd.go
- panic.go
- panic_cmd.go
- ping.go
- ping_cmd.go
- restartable_result_reader.go
- result_merger.go
- result_reader.go
- row_aggregator.go
- row_differ.go
- split_clone.go
- split_clone_cmd.go
- split_diff.go
- split_diff_cmd.go
- split_strategy.go
- status.go
- status_worker.go
- table_status.go
- tablet_provider.go
- tablet_tracker.go
- topo_utils.go
- vertical_split_clone_cmd.go
- vertical_split_diff.go
- vertical_split_diff_cmd.go
- worker.go
Directories ¶
Path | Synopsis |
---|---|
Package fakevtworkerclient contains a fake for the vtworkerclient interface.
|
Package fakevtworkerclient contains a fake for the vtworkerclient interface. |
Package grpcvtworkerclient contains the gRPC version of the vtworker client protocol.
|
Package grpcvtworkerclient contains the gRPC version of the vtworker client protocol. |
Package grpcvtworkerserver contains the gRPC implementation of the server side of the remote execution of vtworker commands.
|
Package grpcvtworkerserver contains the gRPC implementation of the server side of the remote execution of vtworker commands. |
Package vtworkerclient contains the generic client side of the remote vtworker protocol.
|
Package vtworkerclient contains the generic client side of the remote vtworker protocol. |
Package vtworkerclienttest contains the testsuite against which each RPC implementation of the vtworkerclient interface must be tested.
|
Package vtworkerclienttest contains the testsuite against which each RPC implementation of the vtworkerclient interface must be tested. |