Documentation ¶
Index ¶
- Variables
- func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
- func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
- type EventStreamer
- type RegisterUpdateStreamServiceFunc
- type Streamer
- type UpdateStream
- type UpdateStreamControl
- type UpdateStreamControlMock
- type UpdateStreamImpl
- func (updateStream *UpdateStreamImpl) Disable()
- func (updateStream *UpdateStreamImpl) Enable()
- func (updateStream *UpdateStreamImpl) HandlePanic(err *error)
- func (updateStream *UpdateStreamImpl) IsEnabled() bool
- func (updateStream *UpdateStreamImpl) RegisterService()
- func (updateStream *UpdateStreamImpl) ServeUpdateStream(position string, sendReply func(reply *binlogdatapb.StreamEvent) error) (err error)
- func (updateStream *UpdateStreamImpl) StreamKeyRange(position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, ...) (err error)
- func (updateStream *UpdateStreamImpl) StreamTables(position string, tables []string, charset *binlogdatapb.Charset, ...) (err error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClientEOF is returned by Streamer if the stream ended because the // consumer of the stream indicated it doesn't want any more events. ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream") // ErrServerEOF is returned by Streamer if the stream ended because the // connection to the mysqld server was lost, or the stream was terminated by // mysqld. ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld") )
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
RegisterUpdateStreamServices is the list of all registration callbacks to invoke
Functions ¶
func KeyRangeFilterFunc ¶
func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
KeyRangeFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified keyrange. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))
func TablesFilterFunc ¶
func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
TablesFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified tables. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))
Types ¶
type EventStreamer ¶
type EventStreamer struct {
// contains filtered or unexported fields
}
EventStreamer is an adapter on top of a binlog Streamer that convert the events into StreamEvent objects.
func NewEventStreamer ¶
func NewEventStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, startPos replication.Position, sendEvent sendEventFunc) *EventStreamer
NewEventStreamer returns a new EventStreamer on top of a Streamer
func (*EventStreamer) Stream ¶
func (evs *EventStreamer) Stream(ctx *sync2.ServiceContext) error
Stream starts streaming updates
type RegisterUpdateStreamServiceFunc ¶
type RegisterUpdateStreamServiceFunc func(UpdateStream)
RegisterUpdateStreamServiceFunc is the type to use for delayed registration of RPC servers until we have all the objects
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer streams binlog events from MySQL by connecting as a slave. A Streamer should only be used once. To start another stream, call NewStreamer() again.
func NewStreamer ¶
func NewStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, clientCharset *binlogdatapb.Charset, startPos replication.Position, sendTransaction sendTransactionFunc) *Streamer
NewStreamer creates a binlog Streamer.
dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. sendTransaction is called each time a transaction is committed or rolled back.
type UpdateStream ¶
type UpdateStream interface { // ServeUpdateStream serves the query and streams the result // for the full update stream ServeUpdateStream(position string, sendReply func(reply *binlogdatapb.StreamEvent) error) error // StreamKeyRange streams events related to a KeyRange only StreamKeyRange(position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) error // StreamTables streams events related to a set of Tables only StreamTables(position string, tables []string, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) error // HandlePanic should be called in a defer, // first thing in the RPC implementation. HandlePanic(*error) }
UpdateStream is the interface for the binlog server
type UpdateStreamControl ¶
type UpdateStreamControl interface { // Enable will allow any new RPC calls Enable() // Disable will interrupt all current calls, and disallow any new call Disable() // IsEnabled returns true iff the service is enabled IsEnabled() bool }
UpdateStreamControl is the interface an UpdateStream service implements to bring it up or down.
type UpdateStreamControlMock ¶
UpdateStreamControlMock is an implementation of UpdateStreamControl to be used in tests
func NewUpdateStreamControlMock ¶
func NewUpdateStreamControlMock() *UpdateStreamControlMock
NewUpdateStreamControlMock creates a new UpdateStreamControlMock
func (*UpdateStreamControlMock) Disable ¶
func (m *UpdateStreamControlMock) Disable()
Disable is part of UpdateStreamControl
func (*UpdateStreamControlMock) Enable ¶
func (m *UpdateStreamControlMock) Enable()
Enable is part of UpdateStreamControl
func (*UpdateStreamControlMock) IsEnabled ¶
func (m *UpdateStreamControlMock) IsEnabled() bool
IsEnabled is part of UpdateStreamControl
type UpdateStreamImpl ¶
type UpdateStreamImpl struct {
// contains filtered or unexported fields
}
UpdateStreamImpl is the real implementation of UpdateStream and UpdateStreamControl
func NewUpdateStream ¶
func NewUpdateStream(mysqld mysqlctl.MysqlDaemon, dbname string) *UpdateStreamImpl
NewUpdateStream returns a new UpdateStreamImpl object
func (*UpdateStreamImpl) Disable ¶
func (updateStream *UpdateStreamImpl) Disable()
Disable will disallow any connection to the service
func (*UpdateStreamImpl) Enable ¶
func (updateStream *UpdateStreamImpl) Enable()
Enable will allow connections to the service
func (*UpdateStreamImpl) HandlePanic ¶
func (updateStream *UpdateStreamImpl) HandlePanic(err *error)
HandlePanic is part of the UpdateStream interface
func (*UpdateStreamImpl) IsEnabled ¶
func (updateStream *UpdateStreamImpl) IsEnabled() bool
IsEnabled returns true if UpdateStreamImpl is enabled
func (*UpdateStreamImpl) RegisterService ¶
func (updateStream *UpdateStreamImpl) RegisterService()
RegisterService needs to be called to publish stats, and to start listening to clients. Only once instance can call this in a process.
func (*UpdateStreamImpl) ServeUpdateStream ¶
func (updateStream *UpdateStreamImpl) ServeUpdateStream(position string, sendReply func(reply *binlogdatapb.StreamEvent) error) (err error)
ServeUpdateStream is part of the UpdateStream interface
func (*UpdateStreamImpl) StreamKeyRange ¶
func (updateStream *UpdateStreamImpl) StreamKeyRange(position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) (err error)
StreamKeyRange is part of the UpdateStream interface
func (*UpdateStreamImpl) StreamTables ¶
func (updateStream *UpdateStreamImpl) StreamTables(position string, tables []string, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) (err error)
StreamTables is part of the UpdateStream interface
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
|
Package binlogplayer contains the code that plays a filtered replication stream on a client database. |
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.
|
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component. |