Documentation ¶
Index ¶
- Constants
- Variables
- func DisableUpdateStreamService()
- func EnableUpdateStreamService(dbname string, mysqld *mysqlctl.Mysqld)
- func GetReplicationPosition() (myproto.ReplicationPosition, error)
- func IsUpdateStreamEnabled() bool
- func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange key.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
- func RegisterUpdateStreamService(mycnf *mysqlctl.Mycnf)
- func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error
- func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
- type BinlogStreamer
- type EventNode
- type EventStreamer
- type RegisterUpdateStreamServiceFunc
- type UpdateStream
- func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) (err error)
- func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, ...) (err error)
- func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)
Constants ¶
const ( DISABLED int64 = iota ENABLED )
Variables ¶
var ( ClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream") ServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld") )
var ( BINLOG_SET_TIMESTAMP = []byte("SET TIMESTAMP=") BINLOG_SET_TIMESTAMP_LEN = len(BINLOG_SET_TIMESTAMP) BINLOG_SET_INSERT = []byte("SET INSERT_ID=") BINLOG_SET_INSERT_LEN = len(BINLOG_SET_INSERT) STREAM_COMMENT_START = []byte("/* _stream ") )
var KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:")
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
var SPACE = []byte(" ")
var STREAM_COMMENT = []byte("/* _stream ")
Functions ¶
func DisableUpdateStreamService ¶
func DisableUpdateStreamService()
func GetReplicationPosition ¶
func GetReplicationPosition() (myproto.ReplicationPosition, error)
func IsUpdateStreamEnabled ¶
func IsUpdateStreamEnabled() bool
func KeyRangeFilterFunc ¶
func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange key.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
KeyRangeFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified keyrange. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(sendTransaction))
func RegisterUpdateStreamService ¶
RegisterUpdateStreamService needs to be called to start listening to clients
func ServeUpdateStream ¶
func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error
func TablesFilterFunc ¶
func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
TablesFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified tables. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))
Types ¶
type BinlogStreamer ¶
type BinlogStreamer struct {
// contains filtered or unexported fields
}
BinlogStreamer streams binlog events from MySQL by connecting as a slave. A BinlogStreamer should only be used once. To start another stream, call NewBinlogStreamer() again.
func NewBinlogStreamer ¶
func NewBinlogStreamer(dbname string, mysqld *mysqlctl.Mysqld, clientCharset *mproto.Charset, startPos myproto.ReplicationPosition, sendTransaction sendTransactionFunc) *BinlogStreamer
NewBinlogStreamer creates a BinlogStreamer.
dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. sendTransaction is called each time a transaction is committed or rolled back.
func (*BinlogStreamer) Stream ¶
func (bls *BinlogStreamer) Stream(ctx *sync2.ServiceContext) (err error)
Stream starts streaming binlog events using the settings from NewBinlogStreamer().
type EventStreamer ¶
type EventStreamer struct {
// contains filtered or unexported fields
}
func NewEventStreamer ¶
func NewEventStreamer(dbname string, mysqld *mysqlctl.Mysqld, startPos myproto.ReplicationPosition, sendEvent sendEventFunc) *EventStreamer
func (*EventStreamer) Stream ¶
func (evs *EventStreamer) Stream(ctx *sync2.ServiceContext) error
type RegisterUpdateStreamServiceFunc ¶
type RegisterUpdateStreamServiceFunc func(*UpdateStream)
Glue to delay registration of RPC servers until we have all the objects
type UpdateStream ¶
type UpdateStream struct {
// contains filtered or unexported fields
}
var UpdateStreamRpcService *UpdateStream
UpdateStreamRpcService is the singleton that gets initialized during startup and that gets called by all RPC server implementations
func (*UpdateStream) ServeUpdateStream ¶
func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) (err error)
func (*UpdateStream) StreamKeyRange ¶
func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)
func (*UpdateStream) StreamTables ¶
func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
|
Package binlogplayer contains the code that plays a filtered replication stream on a client database. |