Documentation ¶
Index ¶
- Variables
- func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, ...) sendTransactionFunc
- func TablesFilterFunc(tables []string, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc
- type EventStreamer
- type FullBinlogStatement
- type RegisterUpdateStreamServiceFunc
- type SlaveConnection
- func (sc *SlaveConnection) Close()
- func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error)
- func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error)
- func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error)
- type StreamList
- type Streamer
- type UpdateStream
- type UpdateStreamControl
- type UpdateStreamControlMock
- type UpdateStreamImpl
- func (updateStream *UpdateStreamImpl) Disable()
- func (updateStream *UpdateStreamImpl) Enable()
- func (updateStream *UpdateStreamImpl) HandlePanic(err *error)
- func (updateStream *UpdateStreamImpl) IsEnabled() bool
- func (updateStream *UpdateStreamImpl) RegisterService()
- func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, ...) (err error)
- func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position string, tables []string, ...) (err error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClientEOF is returned by Streamer if the stream ended because the // consumer of the stream indicated it doesn't want any more events. ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream") // ErrServerEOF is returned by Streamer if the stream ended because the // connection to the mysqld server was lost, or the stream was terminated by // mysqld. ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld") )
var ( // cannot find a suitable binlog to satisfy the request. ErrBinlogUnavailable = fmt.Errorf("cannot find relevant binlogs on this server") )
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
RegisterUpdateStreamServices is the list of all registration callbacks to invoke
Functions ¶
func KeyRangeFilterFunc ¶
func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc
KeyRangeFilterFunc returns a function that calls callback only if statements in the transaction match the specified keyrange. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))
func TablesFilterFunc ¶
func TablesFilterFunc(tables []string, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc
TablesFilterFunc returns a function that calls callback only if statements in the transaction match the specified tables. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))
Types ¶
type EventStreamer ¶
type EventStreamer struct {
// contains filtered or unexported fields
}
EventStreamer is an adapter on top of a binlog Streamer that convert the events into StreamEvent objects.
func NewEventStreamer ¶
func NewEventStreamer(cp *mysql.ConnParams, se *schema.Engine, startPos mysql.Position, timestamp int64, sendEvent sendEventFunc) *EventStreamer
NewEventStreamer returns a new EventStreamer on top of a Streamer
type FullBinlogStatement ¶
type FullBinlogStatement struct { Statement *binlogdatapb.BinlogTransaction_Statement Table string KeyspaceID []byte PKNames []*querypb.Field PKValues []sqltypes.Value }
FullBinlogStatement has all the information we can gather for an event. Some fields are only set if asked for, and if RBR is used. Otherwise we'll revert back to using the SQL comments, for SBR.
type RegisterUpdateStreamServiceFunc ¶
type RegisterUpdateStreamServiceFunc func(UpdateStream)
RegisterUpdateStreamServiceFunc is the type to use for delayed registration of RPC servers until we have all the objects
type SlaveConnection ¶
SlaveConnection represents a connection to mysqld that pretends to be a slave connecting for replication. Each such connection must identify itself to mysqld with a server ID that is unique both among other SlaveConnections and among actual slaves in the topology.
func NewSlaveConnection ¶
func NewSlaveConnection(cp *mysql.ConnParams) (*SlaveConnection, error)
NewSlaveConnection creates a new slave connection to the mysqld instance. It uses a pools.IDPool to ensure that the server IDs used to connect are unique within this process. This is done with the assumptions that:
- No other processes are making fake slave connections to our mysqld.
- No real slave servers will have IDs in the range 1-N where N is the peak number of concurrent fake slave connections we will ever make.
func (*SlaveConnection) Close ¶
func (sc *SlaveConnection) Close()
Close closes the slave connection, which also signals an ongoing dump started with StartBinlogDump() to stop and close its BinlogEvent channel. The ID for the slave connection is recycled back into the pool.
func (*SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp ¶
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error)
StartBinlogDumpFromBinlogBeforeTimestamp requests a replication binlog dump from the master mysqld starting with a file that has timestamps smaller than the provided timestamp, and then sends binlog events to the provided channel.
The startup phase will list all the binary logs, and find the one that has events starting strictly before the provided timestamp. It will then start from there, and stream all events. It is the responsability of the calling site to filter the events more.
MySQL 5.6+ note: we need to do it that way because of the way the GTIDSet works. In the previous two streaming functions, we pass in the full GTIDSet (that has the list of all transactions seen in the replication stream). In this case, we don't know it, all we have is the binlog file names. We depend on parsing the first PREVIOUS_GTIDS_EVENT event in the logs to get it. So we need the caller to parse that event, and it can't be skipped because its timestamp is lower. Then, for each subsequent event, the caller also needs to add the event GTID to its GTIDSet. Otherwise it won't be correct ever. So the caller really needs to build up its GTIDSet along the entire file, not just for events whose timestamp is in a given range.
The stream will continue in the background, waiting for new events if necessary, until the connection is closed, either by the master or by canceling the context.
Note the context is valid and used until eventChan is closed.
func (*SlaveConnection) StartBinlogDumpFromCurrent ¶
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error)
StartBinlogDumpFromCurrent requests a replication binlog dump from the current position.
func (*SlaveConnection) StartBinlogDumpFromPosition ¶
func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error)
StartBinlogDumpFromPosition requests a replication binlog dump from the master mysqld at the given Position and then sends binlog events to the provided channel. The stream will continue in the background, waiting for new events if necessary, until the connection is closed, either by the master or by canceling the context.
Note the context is valid and used until eventChan is closed.
type StreamList ¶
StreamList is a map of context.CancelFunc to mass-interrupt ongoing calls.
func (*StreamList) Add ¶
func (sl *StreamList) Add(c context.CancelFunc) int
Add adds a CancelFunc to the map.
func (*StreamList) Delete ¶
func (sl *StreamList) Delete(i int)
Delete removes a CancelFunc from the list.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer streams binlog events from MySQL by connecting as a slave. A Streamer should only be used once. To start another stream, call NewStreamer() again.
func NewStreamer ¶
func NewStreamer(cp *mysql.ConnParams, se *schema.Engine, clientCharset *binlogdatapb.Charset, startPos mysql.Position, timestamp int64, sendTransaction sendTransactionFunc) *Streamer
NewStreamer creates a binlog Streamer.
dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. Incompatible with timestamp. timestamp is the timestamp to start streaming at. Incompatible with startPos. sendTransaction is called each time a transaction is committed or rolled back.
type UpdateStream ¶
type UpdateStream interface { // StreamKeyRange streams events related to a KeyRange only StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) error // StreamTables streams events related to a set of Tables only StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) error // HandlePanic should be called in a defer, // first thing in the RPC implementation. HandlePanic(*error) }
UpdateStream is the interface for the binlog server
type UpdateStreamControl ¶
type UpdateStreamControl interface { // Enable will allow any new RPC calls Enable() // Disable will interrupt all current calls, and disallow any new call Disable() // IsEnabled returns true iff the service is enabled IsEnabled() bool }
UpdateStreamControl is the interface an UpdateStream service implements to bring it up or down.
type UpdateStreamControlMock ¶
UpdateStreamControlMock is an implementation of UpdateStreamControl to be used in tests
func NewUpdateStreamControlMock ¶
func NewUpdateStreamControlMock() *UpdateStreamControlMock
NewUpdateStreamControlMock creates a new UpdateStreamControlMock
func (*UpdateStreamControlMock) Disable ¶
func (m *UpdateStreamControlMock) Disable()
Disable is part of UpdateStreamControl
func (*UpdateStreamControlMock) Enable ¶
func (m *UpdateStreamControlMock) Enable()
Enable is part of UpdateStreamControl
func (*UpdateStreamControlMock) IsEnabled ¶
func (m *UpdateStreamControlMock) IsEnabled() bool
IsEnabled is part of UpdateStreamControl
type UpdateStreamImpl ¶
type UpdateStreamImpl struct {
// contains filtered or unexported fields
}
UpdateStreamImpl is the real implementation of UpdateStream and UpdateStreamControl
func NewUpdateStream ¶
func NewUpdateStream(ts *topo.Server, keyspace string, cell string, cp *mysql.ConnParams, se *schema.Engine) *UpdateStreamImpl
NewUpdateStream returns a new UpdateStreamImpl object
func (*UpdateStreamImpl) Disable ¶
func (updateStream *UpdateStreamImpl) Disable()
Disable will disallow any connection to the service
func (*UpdateStreamImpl) Enable ¶
func (updateStream *UpdateStreamImpl) Enable()
Enable will allow connections to the service
func (*UpdateStreamImpl) HandlePanic ¶
func (updateStream *UpdateStreamImpl) HandlePanic(err *error)
HandlePanic is part of the UpdateStream interface
func (*UpdateStreamImpl) IsEnabled ¶
func (updateStream *UpdateStreamImpl) IsEnabled() bool
IsEnabled returns true if UpdateStreamImpl is enabled
func (*UpdateStreamImpl) RegisterService ¶
func (updateStream *UpdateStreamImpl) RegisterService()
RegisterService needs to be called to publish stats, and to start listening to clients. Only once instance can call this in a process.
func (*UpdateStreamImpl) StreamKeyRange ¶
func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) (err error)
StreamKeyRange is part of the UpdateStream interface
func (*UpdateStreamImpl) StreamTables ¶
func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset, callback func(trans *binlogdatapb.BinlogTransaction) error) (err error)
StreamTables is part of the UpdateStream interface
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
|
Package binlogplayer contains the code that plays a filtered replication stream on a client database. |
Package eventtoken includes utility methods for event token handling.
|
Package eventtoken includes utility methods for event token handling. |
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.
|
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component. |