binlog

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2016 License: BSD-3-Clause Imports: 19 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrClientEOF is returned by Streamer if the stream ended because the
	// consumer of the stream indicated it doesn't want any more events.
	ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream")
	// ErrServerEOF is returned by Streamer if the stream ended because the
	// connection to the mysqld server was lost, or the stream was terminated by
	// mysqld.
	ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld")
)
View Source
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc

RegisterUpdateStreamServices is the list of all registration callbacks to invoke

Functions

func KeyRangeFilterFunc

func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc

KeyRangeFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified keyrange. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))

func TablesFilterFunc

func TablesFilterFunc(tables []string, sendReply sendTransactionFunc) sendTransactionFunc

TablesFilterFunc returns a function that calls sendReply only if statements in the transaction match the specified tables. The resulting function can be passed into the Streamer: bls.Stream(file, pos, sendTransaction) -> bls.Stream(file, pos, TablesFilterFunc(sendTransaction))

Types

type EventStreamer

type EventStreamer struct {
	// contains filtered or unexported fields
}

EventStreamer is an adapter on top of a binlog Streamer that convert the events into StreamEvent objects.

func NewEventStreamer

func NewEventStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, startPos replication.Position, sendEvent sendEventFunc) *EventStreamer

NewEventStreamer returns a new EventStreamer on top of a Streamer

func (*EventStreamer) Stream

func (evs *EventStreamer) Stream(ctx *sync2.ServiceContext) error

Stream starts streaming updates

type RegisterUpdateStreamServiceFunc

type RegisterUpdateStreamServiceFunc func(UpdateStream)

RegisterUpdateStreamServiceFunc is the type to use for delayed registration of RPC servers until we have all the objects

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer streams binlog events from MySQL by connecting as a slave. A Streamer should only be used once. To start another stream, call NewStreamer() again.

func NewStreamer

func NewStreamer(dbname string, mysqld mysqlctl.MysqlDaemon, clientCharset *binlogdatapb.Charset, startPos replication.Position, sendTransaction sendTransactionFunc) *Streamer

NewStreamer creates a binlog Streamer.

dbname specifes the database to stream events for. mysqld is the local instance of mysqlctl.Mysqld. charset is the default character set on the BinlogPlayer side. startPos is the position to start streaming at. sendTransaction is called each time a transaction is committed or rolled back.

func (*Streamer) Stream

func (bls *Streamer) Stream(ctx *sync2.ServiceContext) (err error)

Stream starts streaming binlog events using the settings from NewStreamer().

type UpdateStream

type UpdateStream interface {
	// ServeUpdateStream serves the query and streams the result
	// for the full update stream
	ServeUpdateStream(position string, sendReply func(reply *binlogdatapb.StreamEvent) error) error

	// StreamKeyRange streams events related to a KeyRange only
	StreamKeyRange(position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) error

	// StreamTables streams events related to a set of Tables only
	StreamTables(position string, tables []string, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) error

	// HandlePanic should be called in a defer,
	// first thing in the RPC implementation.
	HandlePanic(*error)
}

UpdateStream is the interface for the binlog server

type UpdateStreamControl

type UpdateStreamControl interface {
	// Enable will allow any new RPC calls
	Enable()

	// Disable will interrupt all current calls, and disallow any new call
	Disable()

	// IsEnabled returns true iff the service is enabled
	IsEnabled() bool
}

UpdateStreamControl is the interface an UpdateStream service implements to bring it up or down.

type UpdateStreamControlMock

type UpdateStreamControlMock struct {
	sync.Mutex
	// contains filtered or unexported fields
}

UpdateStreamControlMock is an implementation of UpdateStreamControl to be used in tests

func NewUpdateStreamControlMock

func NewUpdateStreamControlMock() *UpdateStreamControlMock

NewUpdateStreamControlMock creates a new UpdateStreamControlMock

func (*UpdateStreamControlMock) Disable

func (m *UpdateStreamControlMock) Disable()

Disable is part of UpdateStreamControl

func (*UpdateStreamControlMock) Enable

func (m *UpdateStreamControlMock) Enable()

Enable is part of UpdateStreamControl

func (*UpdateStreamControlMock) IsEnabled

func (m *UpdateStreamControlMock) IsEnabled() bool

IsEnabled is part of UpdateStreamControl

type UpdateStreamImpl

type UpdateStreamImpl struct {
	// contains filtered or unexported fields
}

UpdateStreamImpl is the real implementation of UpdateStream and UpdateStreamControl

func NewUpdateStream

func NewUpdateStream(mysqld mysqlctl.MysqlDaemon, dbname string) *UpdateStreamImpl

NewUpdateStream returns a new UpdateStreamImpl object

func (*UpdateStreamImpl) Disable

func (updateStream *UpdateStreamImpl) Disable()

Disable will disallow any connection to the service

func (*UpdateStreamImpl) Enable

func (updateStream *UpdateStreamImpl) Enable()

Enable will allow connections to the service

func (*UpdateStreamImpl) HandlePanic

func (updateStream *UpdateStreamImpl) HandlePanic(err *error)

HandlePanic is part of the UpdateStream interface

func (*UpdateStreamImpl) IsEnabled

func (updateStream *UpdateStreamImpl) IsEnabled() bool

IsEnabled returns true if UpdateStreamImpl is enabled

func (*UpdateStreamImpl) RegisterService

func (updateStream *UpdateStreamImpl) RegisterService()

RegisterService needs to be called to publish stats, and to start listening to clients. Only once instance can call this in a process.

func (*UpdateStreamImpl) ServeUpdateStream

func (updateStream *UpdateStreamImpl) ServeUpdateStream(position string, sendReply func(reply *binlogdatapb.StreamEvent) error) (err error)

ServeUpdateStream is part of the UpdateStream interface

func (*UpdateStreamImpl) StreamKeyRange

func (updateStream *UpdateStreamImpl) StreamKeyRange(position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) (err error)

StreamKeyRange is part of the UpdateStream interface

func (*UpdateStreamImpl) StreamTables

func (updateStream *UpdateStreamImpl) StreamTables(position string, tables []string, charset *binlogdatapb.Charset, sendReply func(reply *binlogdatapb.BinlogTransaction) error) (err error)

StreamTables is part of the UpdateStream interface

Directories

Path Synopsis
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
Package binlogplayer contains the code that plays a filtered replication stream on a client database.
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.
Package grpcbinlogstreamer contains the gRPC implementation of the binlog streamer server component.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL