Documentation ¶
Index ¶
- Constants
- Variables
- func DisableUpdateStreamService()
- func EnableUpdateStreamService(dbcfgs *dbconfigs.DBConfigs)
- func GetReplicationPosition() (int64, error)
- func IsUpdateStreamEnabled() bool
- func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange key.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
- func RegisterUpdateStreamService(mycnf *mysqlctl.Mycnf)
- func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error
- func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
- type BinlogStreamer
- type EventStreamer
- type MysqlBinlog
- type RegisterUpdateStreamServiceFunc
- type UpdateStream
- func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) (err error)
- func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, ...) (err error)
- func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)
Constants ¶
const ( DISABLED int64 = iota ENABLED )
Variables ¶
var ( // Misc vars. HASH_COMMENT = []byte("#") SLASH_COMMENT = []byte("/*") DELIM_STMT = []byte("DELIMITER") DEFAULT_DELIM = []byte(";") )
var ( BINLOG_SET_TIMESTAMP = []byte("SET TIMESTAMP=") BINLOG_SET_INSERT = []byte("SET INSERT_ID=") STREAM_COMMENT_START = []byte("/* _stream ") )
var KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:")
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
var SPACE = []byte(" ")
var STREAM_COMMENT = []byte("/* _stream ")
Functions ¶
func DisableUpdateStreamService ¶
func DisableUpdateStreamService()
func GetReplicationPosition ¶
func IsUpdateStreamEnabled ¶
func IsUpdateStreamEnabled() bool
func KeyRangeFilterFunc ¶
func KeyRangeFilterFunc(kit key.KeyspaceIdType, keyrange key.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc
KeyRangeFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified keyrange. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(sendTransaction))
func RegisterUpdateStreamService ¶
RegisterUpdateStreamService needs to be called to start listening to clients
func ServeUpdateStream ¶
func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error
func TablesFilterFunc ¶
func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc
TablesFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified tables. The resulting function can be passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))
Types ¶
type BinlogStreamer ¶
type BinlogStreamer struct {
// contains filtered or unexported fields
}
BinlogStreamer streamer streams binlog events grouped by transactions.
func NewBinlogStreamer ¶
func NewBinlogStreamer(dbname, binlogPrefix string) *BinlogStreamer
NewBinlogStreamer creates a BinlogStreamer. dbname specifes the db to stream events for, and binlogPrefix is as defined by the mycnf variable.
func (*BinlogStreamer) Stop ¶
func (bls *BinlogStreamer) Stop()
Stop stops the currently executing Stream if there is one.
type EventStreamer ¶
type EventStreamer struct {
// contains filtered or unexported fields
}
func NewEventStreamer ¶
func NewEventStreamer(dbname, binlogPrefix string) *EventStreamer
func (*EventStreamer) Stop ¶
func (evs *EventStreamer) Stop()
type MysqlBinlog ¶
type MysqlBinlog struct {
// contains filtered or unexported fields
}
func (*MysqlBinlog) Kill ¶
func (mbl *MysqlBinlog) Kill()
Kill terminates the current mysqlbinlog process.
func (*MysqlBinlog) Launch ¶
func (mbl *MysqlBinlog) Launch(dbname, filename string, pos int64) (stdout io.ReadCloser, err error)
MysqlBinlog launches mysqlbinlog and returns a ReadCloser into which its output will be piped. The stderr will be redirected to the log.
func (*MysqlBinlog) Wait ¶
func (mbl *MysqlBinlog) Wait() error
Wait waits for the mysqlbinlog process to terminate and returns an error if there was any.
type RegisterUpdateStreamServiceFunc ¶
type RegisterUpdateStreamServiceFunc func(*UpdateStream)
Glue to delay registration of RPC servers until we have all the objects
type UpdateStream ¶
type UpdateStream struct {
// contains filtered or unexported fields
}
var UpdateStreamRpcService *UpdateStream
UpdateStreamRpcService is the singleton that gets initialized during startup and that gets called by all RPC server implementations
func (*UpdateStream) ServeUpdateStream ¶
func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) (err error)
func (*UpdateStream) StreamKeyRange ¶
func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)
func (*UpdateStream) StreamTables ¶
func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error)