Documentation ¶
Overview ¶
Package binlogplayer contains the code that plays a filtered replication stream on a client database. It usually runs inside the destination master vttablet process.
Index ¶
- Variables
- func CreateBlpCheckpoint() []string
- func PopulateBlpCheckpoint(index uint32, groupId, timeUpdated int64, flags string) string
- func QueryBlpCheckpoint(index uint32) string
- func ReadStartPosition(dbClient VtClient, uid uint32) (*proto.BlpPosition, string, error)
- func RegisterBinlogPlayerClientFactory(name string, factory BinlogPlayerClientFactory)
- type BinlogPlayer
- type BinlogPlayerClient
- type BinlogPlayerClientFactory
- type BinlogPlayerResponse
- type BinlogPlayerStats
- type DBClient
- type DummyVtClient
- func (dc DummyVtClient) Begin() error
- func (dc DummyVtClient) Close()
- func (dc DummyVtClient) Commit() error
- func (dc DummyVtClient) Connect() error
- func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *mproto.QueryResult, err error)
- func (dc DummyVtClient) Rollback() error
- type VtClient
Constants ¶
This section is empty.
Variables ¶
var ( // we will log anything that's higher than that SLOW_QUERY_THRESHOLD = time.Duration(100 * time.Millisecond) // keys for the stats map BLPL_QUERY = "Query" BLPL_TRANSACTION = "Transaction" // flags for the blp_checkpoint table. The database entry is just // a join(",") of these flags. BLP_FLAG_DONT_START = "DontStart" )
Functions ¶
func CreateBlpCheckpoint ¶
func CreateBlpCheckpoint() []string
CreateBlpCheckpoint returns the statements required to create the _vt.blp_checkpoint table
func PopulateBlpCheckpoint ¶
PopulateBlpCheckpoint returns a statement to populate the first value into the _vt.blp_checkpoint table
func QueryBlpCheckpoint ¶
QueryBlpCheckpoint returns a statement to query the group_id and flags for a given shard from the _vt.blp_checkpoint table
func ReadStartPosition ¶
ReadStartPosition will return the current start position and the flags for the provided binlog player.
func RegisterBinlogPlayerClientFactory ¶
func RegisterBinlogPlayerClientFactory(name string, factory BinlogPlayerClientFactory)
Types ¶
type BinlogPlayer ¶
type BinlogPlayer struct {
// contains filtered or unexported fields
}
BinlogPlayer is handling reading a stream of updates from BinlogServer
func NewBinlogPlayerKeyRange ¶
func NewBinlogPlayerKeyRange(dbClient VtClient, addr string, keyspaceIdType key.KeyspaceIdType, keyRange key.KeyRange, startPosition *proto.BlpPosition, stopAtGroupId int64, blplStats *BinlogPlayerStats) *BinlogPlayer
NewBinlogPlayerKeyRange returns a new BinlogPlayer pointing at the server replicating the provided keyrange, starting at the startPosition.GroupId, and updating _vt.blp_checkpoint with uid=startPosition.Uid. If stopAtGroupId != 0, it will stop when reaching that GroupId.
func NewBinlogPlayerTables ¶
func NewBinlogPlayerTables(dbClient VtClient, addr string, tables []string, startPosition *proto.BlpPosition, stopAtGroupId int64, blplStats *BinlogPlayerStats) *BinlogPlayer
NewBinlogPlayerTables returns a new BinlogPlayer pointing at the server replicating the provided tables, starting at the startPosition.GroupId, and updating _vt.blp_checkpoint with uid=startPosition.Uid. If stopAtGroupId != 0, it will stop when reaching that GroupId.
func (*BinlogPlayer) ApplyBinlogEvents ¶
func (blp *BinlogPlayer) ApplyBinlogEvents(interrupted chan struct{}) error
ApplyBinlogEvents makes a gob rpc request to BinlogServer and processes the events. It will return nil if 'interrupted' was closed, or if we reached the stopping point. It will return io.EOF if the server stops sending us updates. It may return any other error it encounters.
type BinlogPlayerClient ¶
type BinlogPlayerClient interface { // Dial a server Dial(addr string, connTimeout time.Duration) error // Close the connection Close() // Ask the server to stream binlog updates ServeUpdateStream(*proto.UpdateStreamRequest, chan *proto.StreamEvent) BinlogPlayerResponse // Ask the server to stream updates related to the provided tables StreamTables(*proto.TablesRequest, chan *proto.BinlogTransaction) BinlogPlayerResponse // Ask the server to stream updates related to thee provided keyrange StreamKeyRange(*proto.KeyRangeRequest, chan *proto.BinlogTransaction) BinlogPlayerResponse }
BinlogPlayerClient is the interface all clients must satisfy
type BinlogPlayerClientFactory ¶
type BinlogPlayerClientFactory func() BinlogPlayerClient
type BinlogPlayerResponse ¶
type BinlogPlayerResponse interface {
Error() error
}
BinlogPlayerResponse is the return value for streaming events
type BinlogPlayerStats ¶
type BinlogPlayerStats struct { // Stats about the player, keys used are BLPL_QUERY and BLPL_TRANSACTION Timings *stats.Timings Rates *stats.Rates // Last saved status LastGroupId sync2.AtomicInt64 SecondsBehindMaster sync2.AtomicInt64 }
BinlogPlayerStats is the internal stats of a player. It is a different structure that is passed in so stats can be collected over the life of multiple individual players.
func NewBinlogPlayerStats ¶
func NewBinlogPlayerStats() *BinlogPlayerStats
NewBinlogPlayerStats creates a new BinlogPlayerStats structure
type DBClient ¶
type DBClient struct {
// contains filtered or unexported fields
}
DBClient is a real VtClient backed by a mysql connection
func NewDbClient ¶
func NewDbClient(params *mysql.ConnectionParams) *DBClient
func (*DBClient) ExecuteFetch ¶
type DummyVtClient ¶
type DummyVtClient struct {
// contains filtered or unexported fields
}
DummyVtClient is a VtClient that writes to a writer instead of executing anything
func NewDummyVtClient ¶
func NewDummyVtClient() *DummyVtClient
func (DummyVtClient) Begin ¶
func (dc DummyVtClient) Begin() error
func (DummyVtClient) Close ¶
func (dc DummyVtClient) Close()
func (DummyVtClient) Commit ¶
func (dc DummyVtClient) Commit() error
func (DummyVtClient) Connect ¶
func (dc DummyVtClient) Connect() error
func (DummyVtClient) ExecuteFetch ¶
func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *mproto.QueryResult, err error)
func (DummyVtClient) Rollback ¶
func (dc DummyVtClient) Rollback() error