Documentation ¶
Index ¶
- Constants
- Variables
- func DisableUpdateStreamService()
- func EnableUpdateStreamService(dbname string, mysqld mysqlctl.MysqlDaemon)
- func GetReplicationPosition() (myproto.ReplicationPosition, error)
- func IsUpdateStreamEnabled() bool
- func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
- func RegisterUpdateStreamService(mycnf *mysqlctl.Mycnf)
- func ServeUpdateStream(position string, sendReply func(reply *proto.StreamEvent) error) error
- func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
- type BinlogStreamer
- type EventStreamer
- type RegisterUpdateStreamServiceFunc
- type UpdateStream
- func (updateStream *UpdateStream) HandlePanic(err *error)
- func (updateStream *UpdateStream) ServeUpdateStream(position string, sendReply func(reply *proto.StreamEvent) error) (err error)
- func (updateStream *UpdateStream) StreamKeyRange(position string, keyspaceIdType key.KeyspaceIdType, keyRange *pb.KeyRange, ...) (err error)
- func (updateStream *UpdateStream) StreamTables(position string, tables []string, charset *mproto.Charset, ...) (err error)
Constants ¶
const ( DISABLED int64 = iota ENABLED )
Variables ¶
var ( // ErrClientEOF is returned by BinlogStreamer if the stream ended because the // consumer of the stream indicated it doesn't want any more events. ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream") // ErrServerEOF is returned by BinlogStreamer if the stream ended because the // connection to the mysqld server was lost, or the stream was terminated by // mysqld. ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld") )
var KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:")
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
RegisterUpdateStreamServices is the list of all registration callbacks to invoke
var SPACE = []byte(" ")
var STREAM_COMMENT = []byte("/* _stream ")
Functions ¶
func DisableUpdateStreamService ¶
func DisableUpdateStreamService()
DisableUpdateStreamService disables the RPC service for UpdateStream
func EnableUpdateStreamService ¶
func EnableUpdateStreamService(dbname string, mysqld mysqlctl.MysqlDaemon)
EnableUpdateStreamService enables the RPC service for UpdateStream
func GetReplicationPosition ¶
func GetReplicationPosition() (myproto.ReplicationPosition, error)
GetReplicationPosition returns the current replication position of the service
func IsUpdateStreamEnabled ¶
func IsUpdateStreamEnabled() bool
IsUpdateStreamEnabled returns true if the RPC service is enabled
func KeyRangeFilterFunc ¶
func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange *pb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
KeyRangeFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified keyrange. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(sendTransaction))
func RegisterUpdateStreamService ¶
RegisterUpdateStreamService needs to be called to start listening to clients
func ServeUpdateStream ¶
func ServeUpdateStream(position string, sendReply func(reply *proto.StreamEvent) error) error
ServeUpdateStream sill serve one UpdateStream
func TablesFilterFunc ¶
func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
TablesFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified tables. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))
Types ¶
type BinlogStreamer ¶
type BinlogStreamer struct {
// contains filtered or unexported fields
}
BinlogStreamer streams binlog events from MySQL by connecting as a slave. A BinlogStreamer should only be used once. To start another stream, call NewBinlogStreamer() again.
func NewBinlogStreamer ¶
func NewBinlogStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, clientCharset *mproto.Charset, startPos myproto.ReplicationPosition, sendTransaction sendTransactionFunc) *BinlogStreamer
NewBinlogStreamer creates a BinlogStreamer.
dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. sendTransaction is called each time a transaction is committed or rolled back.
func (*BinlogStreamer) Stream ¶
func (bls *BinlogStreamer) Stream(ctx *sync2.ServiceContext) (err error)
Stream starts streaming binlog events using the settings from NewBinlogStreamer().
type EventStreamer ¶
type EventStreamer struct {
// contains filtered or unexported fields
}
EventStreamer is an adapter on top of a BinlogStreamer that convert the events into StreamEvent objects.
func NewEventStreamer ¶
func NewEventStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, startPos myproto.ReplicationPosition, sendEvent sendEventFunc) *EventStreamer
NewEventStreamer returns a new EventStreamer on top of a BinlogStreamer
func (*EventStreamer) Stream ¶
func (evs *EventStreamer) Stream(ctx *sync2.ServiceContext) error
Stream starts streaming updates
type RegisterUpdateStreamServiceFunc ¶
type RegisterUpdateStreamServiceFunc func(proto.UpdateStream)
RegisterUpdateStreamServiceFunc is the type to use for delayed registration of RPC servers until we have all the objects
type UpdateStream ¶
type UpdateStream struct {
// contains filtered or unexported fields
}
UpdateStream is the real implementation of proto.UpdateStream
var UpdateStreamRpcService *UpdateStream
UpdateStream is the singleton that gets initialized during startup and that gets called by all RPC server implementations
func (*UpdateStream) HandlePanic ¶
func (updateStream *UpdateStream) HandlePanic(err *error)
HandlePanic is part of the proto.UpdateStream interface
func (*UpdateStream) ServeUpdateStream ¶
func (updateStream *UpdateStream) ServeUpdateStream(position string, sendReply func(reply *proto.StreamEvent) error) (err error)
ServeUpdateStream is part of the proto.UpdateStream interface
func (*UpdateStream) StreamKeyRange ¶
func (updateStream *UpdateStream) StreamKeyRange(position string, keyspaceIdType key.KeyspaceIdType, keyRange *pb.KeyRange, charset *mproto.Charset, sendReply func(reply *proto.BinlogTransaction) error) (err error)
StreamKeyRange is part of the proto.UpdateStream interface
func (*UpdateStream) StreamTables ¶
func (updateStream *UpdateStream) StreamTables(position string, tables []string, charset *mproto.Charset, sendReply func(reply *proto.BinlogTransaction) error) (err error)
StreamTables is part of the proto.UpdateStream interface
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
|
Package binlogplayer contains the code that plays a filtered replication stream on a client database. |
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.
|
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component. |