Documentation ¶
Index ¶
- Constants
- Variables
- func AdjustPacketSize(size int) func()
- func GetBinlogRotationThreshold() int64
- func SetBinlogRotationThreshold(threshold int64)
- type ColExpr
- type Engine
- func (vse *Engine) Close()
- func (vse *Engine) InitDBConfig(keyspace, shard string)
- func (vse *Engine) IsOpen() bool
- func (vse *Engine) Open()
- func (vse *Engine) ServeHTTP(response http.ResponseWriter, request *http.Request)
- func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, ...) error
- func (vse *Engine) StreamResults(ctx context.Context, query string, ...) error
- func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltypes.Value, ...) error
- func (vse *Engine) StreamTables(ctx context.Context, send func(*binlogdatapb.VStreamTablesResponse) error) error
- type Filter
- type Opcode
- type PacketSizer
- type Plan
- type RowStreamer
- type RowStreamerMode
- type Table
- type TableStreamer
Constants ¶
const ( // Equal is used to filter a comparable column on a specific value Equal = Opcode(iota) // VindexMatch is used for an in_keyrange() construct VindexMatch // LessThan is used to filter a comparable column if < specific value LessThan // LessThanEqual is used to filter a comparable column if <= specific value LessThanEqual // GreaterThan is used to filter a comparable column if > specific value GreaterThan // GreaterThanEqual is used to filter a comparable column if >= specific value GreaterThanEqual // NotEqual is used to filter a comparable column if != specific value NotEqual // IsNotNull is used to filter a column if it is NULL IsNotNull )
Variables ¶
var HeartbeatTime = 900 * time.Millisecond
HeartbeatTime is set to slightly below 1s, compared to idleTimeout set by VPlayer at slightly above 1s. This minimizes conflicts between the two timeouts.
Functions ¶
func AdjustPacketSize ¶ added in v0.11.0
func AdjustPacketSize(size int) func()
AdjustPacketSize temporarily adjusts the default packet sizes to the given value. Calling the returned cleanup function resets them to their original value. This function is only used for testing.
func GetBinlogRotationThreshold ¶ added in v0.15.0
func GetBinlogRotationThreshold() int64
GetBinlogRotationThreshold returns the current byte size at which a VStreamer will attempt to rotate the binary log before starting a GTID snapshot based stream (e.g. a ResultStreamer or RowStreamer).
func SetBinlogRotationThreshold ¶ added in v0.15.0
func SetBinlogRotationThreshold(threshold int64)
SetBinlogRotationThreshold sets the byte size at which a VStreamer will attempt to rotate the binary log before starting a GTID snapshot based stream (e.g. a ResultStreamer or RowStreamer).
Types ¶
type ColExpr ¶
type ColExpr struct { // ColNum specifies the source column value. ColNum int // Vindex and VindexColumns, if set, will be used to generate // a keyspace_id. If so, ColNum is ignored. // VindexColumns contains the column numbers of the table, // and not the column numbers of the stream to be sent. Vindex vindexes.Vindex VindexColumns []int Field *querypb.Field FixedValue sqltypes.Value }
ColExpr represents a column expression.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the engine for handling vreplication streaming requests.
func NewEngine ¶
func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrottler *throttle.Throttler, cell string) *Engine
NewEngine creates a new Engine. Initialization sequence is: NewEngine->InitDBConfig->Open. Open and Close can be called multiple times and are idempotent.
func (*Engine) InitDBConfig ¶
InitDBConfig initializes the target parameters for the Engine.
func (*Engine) ServeHTTP ¶
func (vse *Engine) ServeHTTP(response http.ResponseWriter, request *http.Request)
ServeHTTP shows the current VSchema.
func (*Engine) Stream ¶
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
Stream starts a new stream. This streams events from the binary logs
func (*Engine) StreamResults ¶
func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error
StreamResults streams results of the query with the gtid.
func (*Engine) StreamRows ¶
func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltypes.Value, send func(*binlogdatapb.VStreamRowsResponse) error) error
StreamRows streams rows. This streams the table data rows (so we can copy the table data snapshot)
func (*Engine) StreamTables ¶ added in v0.18.0
func (vse *Engine) StreamTables(ctx context.Context, send func(*binlogdatapb.VStreamTablesResponse) error) error
StreamTables streams all tables.
type Filter ¶
type Filter struct { Opcode Opcode ColNum int Value sqltypes.Value // Parameters for VindexMatch. // Vindex, VindexColumns and KeyRange, if set, will be used // to filter the row. // VindexColumns contains the column numbers of the table, // and not the column numbers of the stream to be sent. Vindex vindexes.Vindex VindexColumns []int KeyRange *topodatapb.KeyRange }
Filter contains opcodes for filtering.
type PacketSizer ¶ added in v0.11.0
type PacketSizer interface { ShouldSend(byteCount int) bool Record(byteCount int, duration time.Duration) Limit() int }
PacketSizer is a controller that adjusts the size of the packets being sent by the vstreamer at runtime
func DefaultPacketSizer ¶ added in v0.11.0
func DefaultPacketSizer() PacketSizer
DefaultPacketSizer creates a new PacketSizer using the default settings. If dynamic packet sizing is enabled, this will return a dynamicPacketSizer.
type Plan ¶
type Plan struct { Table *Table // ColExprs is the list of column expressions to be sent // in the stream. ColExprs []ColExpr // Filters is the list of filters to be applied to the columns // of the table. Filters []Filter // contains filtered or unexported fields }
Plan represents the plan for a table.
type RowStreamer ¶
type RowStreamer interface { Stream() error Cancel() }
RowStreamer exposes an externally usable interface to rowStreamer.
func NewRowStreamer ¶
func NewRowStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, query string, lastpk []sqltypes.Value, send func(*binlogdatapb.VStreamRowsResponse) error, vse *Engine, mode RowStreamerMode) RowStreamer
NewRowStreamer returns a RowStreamer
type RowStreamerMode ¶ added in v0.18.0
type RowStreamerMode int32
const ( RowStreamerModeSingleTable RowStreamerMode = iota RowStreamerModeAllTables )
type Table ¶
Table contains the metadata for a table.
func (*Table) FindColumn ¶ added in v0.12.0
func (ta *Table) FindColumn(name sqlparser.IdentifierCI) int
FindColumn finds a column in the table. It returns the index if found. Otherwise, it returns -1.
type TableStreamer ¶ added in v0.18.0
type TableStreamer interface { Stream() error Cancel() }
TableStreamer exposes an externally usable interface to tableStreamer.