Documentation ¶
Overview ¶
'worker' package contains the framework, utility methods and core functions for long running actions. 'vtworker' binary will use these.
Index ¶
- Constants
- func CompareRows(fields []mproto.Field, compareCount int, left, right []sqltypes.Value) (int, error)
- func RowsEqual(left, right []sqltypes.Value) int
- func SignalInterrupt()
- type DiffReport
- type QueryResultReader
- func NewQueryResultReaderForTablet(ts topo.Server, tabletAlias topo.TabletAlias, sql string) (*QueryResultReader, error)
- func TableScan(log logutil.Logger, ts topo.Server, tabletAlias topo.TabletAlias, ...) (*QueryResultReader, error)
- func TableScanByKeyRange(log logutil.Logger, ts topo.Server, tabletAlias topo.TabletAlias, ...) (*QueryResultReader, error)
- type RowDiffer
- type RowReader
- type RowSplitter
- type RowSubsetDiffer
- type SQLDiffWorker
- type SourceSpec
- type SplitCloneWorker
- type SplitDiffWorker
- type VerticalSplitCloneWorker
- type VerticalSplitDiffWorker
- type Worker
- func NewSQLDiffWorker(wr *wrangler.Wrangler, cell string, superset, subset SourceSpec) Worker
- func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, ...) (Worker, error)
- func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker
- func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspace, destinationShard string, ...) (Worker, error)
- func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker
Constants ¶
const ( // all the states for the worker SQLDiffNotSarted sqlDiffWorkerState = "not started" SQLDiffDone sqlDiffWorkerState = "done" SQLDiffError sqlDiffWorkerState = "error" SQLDiffFindTargets sqlDiffWorkerState = "finding target instances" SQLDiffSynchronizeReplication sqlDiffWorkerState = "synchronizing replication" SQLDiffRunning sqlDiffWorkerState = "running the diff" SQLDiffCleanUp sqlDiffWorkerState = "cleaning up" )
Variables ¶
This section is empty.
Functions ¶
func CompareRows ¶
func CompareRows(fields []mproto.Field, compareCount int, left, right []sqltypes.Value) (int, error)
CompareRows returns: -1 if left is smaller than right 0 if left and right are equal +1 if left is bigger than right TODO: This can panic if types for left and right don't match.
func RowsEqual ¶
RowsEqual returns the index of the first different fields, or -1 if both rows are the same
func SignalInterrupt ¶
func SignalInterrupt()
Types ¶
type DiffReport ¶
type DiffReport struct {
// contains filtered or unexported fields
}
DiffReport has the stats for a diff job
func (*DiffReport) ComputeQPS ¶
func (dr *DiffReport) ComputeQPS()
ComputeQPS fills in processingQPS
func (*DiffReport) HasDifferences ¶
func (dr *DiffReport) HasDifferences() bool
HasDifferences returns true if the diff job recorded any difference
func (*DiffReport) String ¶
func (dr *DiffReport) String() string
type QueryResultReader ¶
type QueryResultReader struct { Output <-chan *mproto.QueryResult Fields []mproto.Field // contains filtered or unexported fields }
QueryResultReader will stream rows towards the output channel.
func NewQueryResultReaderForTablet ¶
func NewQueryResultReaderForTablet(ts topo.Server, tabletAlias topo.TabletAlias, sql string) (*QueryResultReader, error)
NewQueryResultReaderForTablet creates a new QueryResultReader for the provided tablet / sql query
func TableScan ¶
func TableScan(log logutil.Logger, ts topo.Server, tabletAlias topo.TabletAlias, tableDefinition *myproto.TableDefinition) (*QueryResultReader, error)
TableScan returns a QueryResultReader that gets all the rows from a table, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front.
func TableScanByKeyRange ¶
func TableScanByKeyRange(log logutil.Logger, ts topo.Server, tabletAlias topo.TabletAlias, tableDefinition *myproto.TableDefinition, keyRange key.KeyRange, keyspaceIdType key.KeyspaceIdType) (*QueryResultReader, error)
TableScanByKeyRange returns a QueryResultReader that gets all the rows from a table that match the supplied KeyRange, ordered by Primary Key. The returned columns are ordered with the Primary Key columns in front.
func (*QueryResultReader) Close ¶
func (qrr *QueryResultReader) Close()
func (*QueryResultReader) Error ¶
func (qrr *QueryResultReader) Error() error
type RowDiffer ¶
type RowDiffer struct {
// contains filtered or unexported fields
}
RowDiffer will consume rows on both sides, and compare them. It assumes left and right are sorted by ascending primary key. it will record errors if extra rows exist on either side.
func NewRowDiffer ¶
func NewRowDiffer(left, right *QueryResultReader, tableDefinition *myproto.TableDefinition) (*RowDiffer, error)
NewRowDiffer returns a new RowDiffer
type RowReader ¶
type RowReader struct {
// contains filtered or unexported fields
}
RowReader returns individual rows from a QueryResultReader
func NewRowReader ¶
func NewRowReader(queryResultReader *QueryResultReader) *RowReader
NewRowReader returns a RowReader based on the QueryResultReader
type RowSplitter ¶
type RowSplitter struct { Type key.KeyspaceIdType ValueIndex int KeyRanges []key.KeyRange }
RowSplitter is a helper class to split rows into multiple subsets targeted to different shards.
func NewRowSplitter ¶
func NewRowSplitter(shardInfos []*topo.ShardInfo, typ key.KeyspaceIdType, valueIndex int) *RowSplitter
NewRowSplitter returns a new row splitter for the given shard distribution.
func (*RowSplitter) Send ¶
func (rs *RowSplitter) Send(fields []mproto.Field, result [][][]sqltypes.Value, baseCmd string, insertChannels [][]chan string, abort chan struct{}) bool
Send will send the rows to the list of channels. Returns true if aborted.
func (*RowSplitter) StartSplit ¶
func (rs *RowSplitter) StartSplit() [][][]sqltypes.Value
StartSplit starts a new split. Split can then be called multiple times.
type RowSubsetDiffer ¶
type RowSubsetDiffer struct {
// contains filtered or unexported fields
}
RowSubsetDiffer will consume rows on both sides, and compare them. It assumes superset and subset are sorted by ascending primary key. It will record errors in DiffReport.extraRowsRight if extra rows exist on the subset side, and DiffReport.extraRowsLeft will always be zero.
func NewRowSubsetDiffer ¶
func NewRowSubsetDiffer(superset, subset *QueryResultReader, pkFieldCount int) (*RowSubsetDiffer, error)
NewRowSubsetDiffer returns a new RowSubsetDiffer
func (*RowSubsetDiffer) Go ¶
func (rd *RowSubsetDiffer) Go(log logutil.Logger) (dr DiffReport, err error)
Go runs the diff. If there is no error, it will drain both sides. If an error occurs, it will just return it and stop.
type SQLDiffWorker ¶
type SQLDiffWorker struct {
// contains filtered or unexported fields
}
SQLDiffWorker runs a sanity check in in a system with a lookup database: any row in the subset spec needs to have a conuterpart in the superset spec.
func (*SQLDiffWorker) CheckInterrupted ¶
func (worker *SQLDiffWorker) CheckInterrupted() bool
func (*SQLDiffWorker) Error ¶
func (worker *SQLDiffWorker) Error() error
func (*SQLDiffWorker) Run ¶
func (worker *SQLDiffWorker) Run()
Run is mostly a wrapper to run the cleanup at the end.
func (*SQLDiffWorker) StatusAsHTML ¶
func (worker *SQLDiffWorker) StatusAsHTML() template.HTML
func (*SQLDiffWorker) StatusAsText ¶
func (worker *SQLDiffWorker) StatusAsText() string
type SourceSpec ¶
type SourceSpec struct { Keyspace string Shard string SQL string // contains filtered or unexported fields }
SourceSpec specifies a SQL query in some keyspace and shard.
type SplitCloneWorker ¶
type SplitCloneWorker struct {
// contains filtered or unexported fields
}
SplitCloneWorker will clone the data within a keyspace from a source set of shards to a destination set of shards.
func (*SplitCloneWorker) CheckInterrupted ¶
func (scw *SplitCloneWorker) CheckInterrupted() bool
func (*SplitCloneWorker) Error ¶
func (scw *SplitCloneWorker) Error() error
func (*SplitCloneWorker) Run ¶
func (scw *SplitCloneWorker) Run()
Run implements the Worker interface
func (*SplitCloneWorker) StatusAsHTML ¶
func (scw *SplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*SplitCloneWorker) StatusAsText ¶
func (scw *SplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type SplitDiffWorker ¶
type SplitDiffWorker struct {
// contains filtered or unexported fields
}
SplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*SplitDiffWorker) CheckInterrupted ¶
func (sdw *SplitDiffWorker) CheckInterrupted() bool
func (*SplitDiffWorker) Error ¶
func (sdw *SplitDiffWorker) Error() error
func (*SplitDiffWorker) Run ¶
func (sdw *SplitDiffWorker) Run()
Run is mostly a wrapper to run the cleanup at the end.
func (*SplitDiffWorker) StatusAsHTML ¶
func (sdw *SplitDiffWorker) StatusAsHTML() template.HTML
func (*SplitDiffWorker) StatusAsText ¶
func (sdw *SplitDiffWorker) StatusAsText() string
type VerticalSplitCloneWorker ¶
type VerticalSplitCloneWorker struct {
// contains filtered or unexported fields
}
VerticalSplitCloneWorker will clone the data from a source keyspace/shard to a destination keyspace/shard.
func (*VerticalSplitCloneWorker) CheckInterrupted ¶
func (vscw *VerticalSplitCloneWorker) CheckInterrupted() bool
func (*VerticalSplitCloneWorker) Error ¶
func (vscw *VerticalSplitCloneWorker) Error() error
func (*VerticalSplitCloneWorker) Run ¶
func (vscw *VerticalSplitCloneWorker) Run()
Run implements the Worker interface
func (*VerticalSplitCloneWorker) StatusAsHTML ¶
func (vscw *VerticalSplitCloneWorker) StatusAsHTML() template.HTML
StatusAsHTML implements the Worker interface
func (*VerticalSplitCloneWorker) StatusAsText ¶
func (vscw *VerticalSplitCloneWorker) StatusAsText() string
StatusAsText implements the Worker interface
type VerticalSplitDiffWorker ¶
type VerticalSplitDiffWorker struct {
// contains filtered or unexported fields
}
VerticalSplitDiffWorker executes a diff between a destination shard and its source shards in a shard split case.
func (*VerticalSplitDiffWorker) CheckInterrupted ¶
func (vsdw *VerticalSplitDiffWorker) CheckInterrupted() bool
func (*VerticalSplitDiffWorker) Error ¶
func (vsdw *VerticalSplitDiffWorker) Error() error
func (*VerticalSplitDiffWorker) Run ¶
func (vsdw *VerticalSplitDiffWorker) Run()
Run is mostly a wrapper to run the cleanup at the end.
func (*VerticalSplitDiffWorker) StatusAsHTML ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsHTML() template.HTML
func (*VerticalSplitDiffWorker) StatusAsText ¶
func (vsdw *VerticalSplitDiffWorker) StatusAsText() string
type Worker ¶
type Worker interface { // StatusAsHTML returns the current worker status in HTML StatusAsHTML() template.HTML // StatusAsText returns the current worker status in plain text StatusAsText() string // Run is the main entry point for the worker. It will be called // in a go routine. // When the SignalInterrupt() is called, Run should exit as soon as // possible. Run() // Error returns the error status of the job, if any. // It will only be called after Run() has completed. Error() error }
Worker is the base interface for all long running workers.
func NewSQLDiffWorker ¶
func NewSQLDiffWorker(wr *wrangler.Wrangler, cell string, superset, subset SourceSpec) Worker
NewSQLDiffWorker returns a new SQLDiffWorker object.
func NewSplitCloneWorker ¶
func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, strategyStr string, sourceReaderCount, destinationPackCount int, minTableSizeForSplit uint64, destinationWriterCount int) (Worker, error)
NewSplitCloneWorker returns a new SplitCloneWorker object.
func NewSplitDiffWorker ¶
NewSplitDiff returns a new SplitDiffWorker object.
func NewVerticalSplitCloneWorker ¶
func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspace, destinationShard string, tables []string, strategyStr string, sourceReaderCount, destinationPackCount int, minTableSizeForSplit uint64, destinationWriterCount int) (Worker, error)
NewVerticalSplitCloneWorker returns a new VerticalSplitCloneWorker object.