Documentation ¶
Overview ¶
Package binlogplayer contains the code that plays a filtered replication stream on a client database. It usually runs inside the destination master vttablet process.
Index ¶
- Variables
- func CreateBlpCheckpoint() []string
- func PopulateBlpCheckpoint(index uint32, position string, maxTPS int64, maxReplicationLag int64, ...) string
- func QueryBlpCheckpoint(index uint32) string
- func QueryBlpThrottlerSettings(index uint32) string
- func ReadStartPosition(dbClient VtClient, uid uint32) (string, string, error)
- func RegisterClientFactory(name string, factory ClientFactory)
- type BinlogPlayer
- type BinlogTransactionStream
- type Client
- type ClientFactory
- type DBClient
- type Stats
- type VtClient
- type VtClientMock
- func (dc *VtClientMock) AddResult(result *sqltypes.Result)
- func (dc *VtClientMock) Begin() error
- func (dc *VtClientMock) Close()
- func (dc *VtClientMock) Commit() error
- func (dc *VtClientMock) Connect() error
- func (dc *VtClientMock) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
- func (dc *VtClientMock) Rollback() error
Constants ¶
This section is empty.
Variables ¶
var ( // BinlogPlayerConnTimeout is the flag for binlog player connection // timeout. It is public so the discovery module can also use it. BinlogPlayerConnTimeout = flag.Duration("binlog_player_conn_timeout", 5*time.Second, "binlog player connection timeout") // SlowQueryThreshold will cause we logging anything that's higher than it. SlowQueryThreshold = time.Duration(100 * time.Millisecond) // BlplQuery is the key for the stats map. BlplQuery = "Query" // BlplTransaction is the key for the stats map. BlplTransaction = "Transaction" // BlpFlagDontStart means don't start a BinlogPlayer BlpFlagDontStart = "DontStart" )
Functions ¶
func CreateBlpCheckpoint ¶
func CreateBlpCheckpoint() []string
CreateBlpCheckpoint returns the statements required to create the _vt.blp_checkpoint table
func PopulateBlpCheckpoint ¶
func PopulateBlpCheckpoint(index uint32, position string, maxTPS int64, maxReplicationLag int64, timeUpdated int64, flags string) string
PopulateBlpCheckpoint returns a statement to populate the first value into the _vt.blp_checkpoint table.
func QueryBlpCheckpoint ¶
QueryBlpCheckpoint returns a statement to query the gtid and flags for a given shard from the _vt.blp_checkpoint table.
func QueryBlpThrottlerSettings ¶
QueryBlpThrottlerSettings returns a statement to query the throttler settings (used by filtered replication) for a given shard from the_vt.blp_checkpoint table.
func ReadStartPosition ¶
ReadStartPosition will return the current start position and the flags for the provided binlog player.
func RegisterClientFactory ¶
func RegisterClientFactory(name string, factory ClientFactory)
RegisterClientFactory adds a new factory. Call during init().
Types ¶
type BinlogPlayer ¶
type BinlogPlayer struct {
// contains filtered or unexported fields
}
BinlogPlayer is handling reading a stream of updates from BinlogServer
func NewBinlogPlayerKeyRange ¶
func NewBinlogPlayerKeyRange(dbClient VtClient, tablet *topodatapb.Tablet, keyRange *topodatapb.KeyRange, uid uint32, startPosition string, stopPosition string, blplStats *Stats) (*BinlogPlayer, error)
NewBinlogPlayerKeyRange returns a new BinlogPlayer pointing at the server replicating the provided keyrange, starting at the startPosition, and updating _vt.blp_checkpoint with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func NewBinlogPlayerTables ¶
func NewBinlogPlayerTables(dbClient VtClient, tablet *topodatapb.Tablet, tables []string, uid uint32, startPosition string, stopPosition string, blplStats *Stats) (*BinlogPlayer, error)
NewBinlogPlayerTables returns a new BinlogPlayer pointing at the server replicating the provided tables, starting at the startPosition, and updating _vt.blp_checkpoint with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func (*BinlogPlayer) ApplyBinlogEvents ¶
func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error
ApplyBinlogEvents makes an RPC request to BinlogServer and processes the events. It will return nil if the provided context was canceled, or if we reached the stopping point. It will return io.EOF if the server stops sending us updates. It may return any other error it encounters.
type BinlogTransactionStream ¶
type BinlogTransactionStream interface { // Recv returns the next BinlogTransaction, or an error if the RPC was // interrupted. Recv() (*binlogdatapb.BinlogTransaction, error) }
BinlogTransactionStream is the interface of the object returned by StreamTables and StreamKeyRange
type Client ¶
type Client interface { // Dial a server Dial(tablet *topodatapb.Tablet, connTimeout time.Duration) error // Close the connection Close() // Ask the server to stream updates related to the provided tables. // Should return context.Canceled if the context is canceled. StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) // Ask the server to stream updates related to the provided keyrange. // Should return context.Canceled if the context is canceled. StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) }
Client is the interface all clients must satisfy
type ClientFactory ¶
type ClientFactory func() Client
ClientFactory is the factory method to create a Client
type DBClient ¶
type DBClient struct {
// contains filtered or unexported fields
}
DBClient is a real VtClient backed by a mysql connection
func NewDbClient ¶
func NewDbClient(params *sqldb.ConnParams) *DBClient
NewDbClient creates a DBClient instance
func (*DBClient) ExecuteFetch ¶
ExecuteFetch sends query to the db server and fetch the result
type Stats ¶
type Stats struct { // Stats about the player, keys used are BlplQuery and BlplTransaction Timings *stats.Timings Rates *stats.Rates SecondsBehindMaster sync2.AtomicInt64 // contains filtered or unexported fields }
Stats is the internal stats of a player. It is a different structure that is passed in so stats can be collected over the life of multiple individual players.
func (*Stats) GetLastPosition ¶
func (bps *Stats) GetLastPosition() replication.Position
GetLastPosition gets the last replication position.
func (*Stats) SetLastPosition ¶
func (bps *Stats) SetLastPosition(pos replication.Position)
SetLastPosition sets the last replication position.
type VtClient ¶
type VtClient interface { Connect() error Begin() error Commit() error Rollback() error Close() ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) }
VtClient is a high level interface to the database.
type VtClientMock ¶
type VtClientMock struct { Stdout []string CommitChannel chan []string // contains filtered or unexported fields }
VtClientMock is a VtClient that writes to a writer instead of executing anything. It allows to mock out query results for queries. See AddResult().
func NewVtClientMock ¶
func NewVtClientMock() *VtClientMock
NewVtClientMock returns a new VtClientMock
func (*VtClientMock) AddResult ¶
func (dc *VtClientMock) AddResult(result *sqltypes.Result)
AddResult appends a mocked query result to the end of the list. It will be returned exactly once to a client when it's up.
func (*VtClientMock) Begin ¶
func (dc *VtClientMock) Begin() error
Begin is part of the VtClient interface
func (*VtClientMock) Close ¶
func (dc *VtClientMock) Close()
Close is part of the VtClient interface
func (*VtClientMock) Commit ¶
func (dc *VtClientMock) Commit() error
Commit is part of the VtClient interface
func (*VtClientMock) Connect ¶
func (dc *VtClientMock) Connect() error
Connect is part of the VtClient interface
func (*VtClientMock) ExecuteFetch ¶
ExecuteFetch is part of the VtClient interface
func (*VtClientMock) Rollback ¶
func (dc *VtClientMock) Rollback() error
Rollback is part of the VtClient interface