Documentation ¶
Index ¶
- func BatchTableWrites(ctx context.Context, diffs chan Diff, batches chan Batch) error
- func BatchWrites(ctx context.Context, diffs chan Diff, batches chan Batch) error
- func CloseConnections(conns []*sql.Conn)
- func IndefiniteExponentialBackOff() *backoff.ExponentialBackOff
- func OpenConnections(ctx context.Context, db *sql.DB, count int) ([]*sql.Conn, error)
- func OpenSyncedConnections(ctx context.Context, source *sql.DB, count int) ([]*sql.Conn, error)
- func PKSetString(t Transaction) string
- func RestartLoop(ctx context.Context, b backoff.BackOff, loop func(b backoff.BackOff) error) func() error
- func Retry(ctx context.Context, options RetryOptions, f func(context.Context) error) error
- func RowsEqual(sourceRow *Row, targetRow *Row) (bool, error)
- type Batch
- type Checksum
- type Chunk
- type ChunkSnapshot
- type Clone
- type Config
- type DBConfig
- func (c DBConfig) BinlogSyncerConfig(serverID uint32) (replication.BinlogSyncerConfig, error)
- func (c DBConfig) DB() (*sql.DB, error)
- func (c DBConfig) IsSharded() (bool, error)
- func (c DBConfig) ReaderDB() (*sql.DB, error)
- func (c DBConfig) Schema() (string, error)
- func (c DBConfig) ShardingKeyrange() ([]*topodata.KeyRange, error)
- func (c DBConfig) String() string
- func (c DBConfig) VitessTarget() (*query.Target, error)
- type DBReader
- type DBWriter
- type DataSourceType
- type DatabaseContainer
- type Diff
- type Heartbeat
- type IdStreamer
- type Mutation
- type MutationType
- type PeekingIdStreamer
- type Ping
- type Position
- type Reader
- type ReaderConfig
- type Replicate
- type Replicator
- type RetryOptions
- type Row
- type RowStream
- type Snapshotter
- type SourceTargetConfig
- type Table
- type TableConfig
- type Transaction
- type TransactionStream
- type TransactionWriter
- type Writer
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BatchTableWrites ¶
BatchTableWrites consumes diffs for a single table and batches them up into batches by type
func BatchWrites ¶
BatchWrites consumes diffs and batches them up into batches by type and table
func CloseConnections ¶
func IndefiniteExponentialBackOff ¶
func IndefiniteExponentialBackOff() *backoff.ExponentialBackOff
func OpenConnections ¶
OpenConnections opens count connections
func OpenSyncedConnections ¶
OpenSyncedConnections opens count connections that have a synchronized view of the database
func PKSetString ¶
func PKSetString(t Transaction) string
func RestartLoop ¶
Types ¶
type Batch ¶
type Batch struct { Type MutationType Table *Table Rows []*Row }
func BatchTableWritesSync ¶
BatchTableWritesSync synchronously batches up diffs into batches by type
type Checksum ¶
type Checksum struct {
ReaderConfig
}
type Chunk ¶
type Chunk struct { Table *Table // Seq is the sequence number of chunks for this table Seq int64 // Start is the first id of the chunk inclusive. If nil, chunk starts at -inf. Start []interface{} // End is the first id of the next chunk (i.e. the last id of this chunk exclusively). If nil, chunk ends at +inf. End []interface{} // exclusive // First chunk of a table First bool // Last chunk of a table Last bool // Size is the expected number of rows in the chunk Size int }
Chunk is an chunk of rows closed to the left [start,end)
func (*Chunk) ContainsKeys ¶
func (*Chunk) ContainsRow ¶
type ChunkSnapshot ¶
ChunkSnapshot is a mutable struct for representing the current reconciliation state of a chunk, it is used single threaded by the replication thread only
type Clone ¶
type Clone struct {
WriterConfig
}
type Config ¶
type Config struct {
Tables map[string]TableConfig `toml:"table"`
}
type DBConfig ¶
type DBConfig struct { Type DataSourceType `help:"Datasource name" enum:"mysql,vitess" optional:"" default:"mysql"` Host string `help:"Hostname" optional:""` EgressSocket string `help:"Use an egress socket when connecting to Vitess, for example '@egress.sock'" optional:""` Username string `help:"User" optional:""` Password string `help:"Password" optional:""` Database string `help:"Database or Vitess shard with format <keyspace>/<shard>" optional:""` MiskDatasource string `help:"Misk formatted config yaml file" optional:"" path:""` MiskReader bool `help:"Use the reader endpoint from Misk (defaults to writer)" optional:"" default:"false"` GrpcCustomHeader []string `help:"Custom GRPC headers separated by ="` CA string `help:"CA root file, if this is specified then TLS will be enabled (PEM encoded)"` Cert string `help:"Certificate file for client side authentication (PEM encoded)"` Key string `help:"Key file for client side authentication (PEM encoded)"` }
func (DBConfig) BinlogSyncerConfig ¶
func (c DBConfig) BinlogSyncerConfig(serverID uint32) (replication.BinlogSyncerConfig, error)
func (DBConfig) ShardingKeyrange ¶
type DBReader ¶
type DBReader interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
DBReader is an interface that can be implemented by sql.Conn or sql.Tx or sql.DB so that we can easily change synchronization method
type DataSourceType ¶
type DataSourceType string
const ( Vitess DataSourceType = "vitess" MySQL DataSourceType = "mysql" )
type DatabaseContainer ¶
type DatabaseContainer struct {
// contains filtered or unexported fields
}
func (*DatabaseContainer) Close ¶
func (c *DatabaseContainer) Close() error
func (*DatabaseContainer) Config ¶
func (c *DatabaseContainer) Config() DBConfig
type Diff ¶
type Diff struct { Type MutationType // Row is the row to update to or insert or delete Row *Row // Target is in case of the Update MutationType also set so that it can be compared Target *Row }
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots
func NewHeartbeat ¶
type IdStreamer ¶
type Mutation ¶
type Mutation struct { Type MutationType Table *Table Rows [][]interface{} // Chunk is only sent with a Repair mutation type Chunk Chunk }
type MutationType ¶
type MutationType byte
const ( Insert MutationType = iota Update Delete // Repair is a mutation which sends a full chunk which is then diffed against the target and the diffs are applied Repair )
func (MutationType) String ¶
func (m MutationType) String() string
type PeekingIdStreamer ¶
type Ping ¶
type Ping struct { SourceTargetConfig Table string `help:"If set select a row from this table" optional:""` }
type ReaderConfig ¶
type ReaderConfig struct { SourceTargetConfig ChunkSize int `help:"Default size of the chunks to diff (can also be overridden per table)" default:"5000"` ShuffleChunks bool `` /* 161-byte string literal not displayed */ TableParallelism int64 `help:"Number of tables to process concurrently" default:"10"` ReaderCount int `help:"Number of reader connections" default:"20"` ReaderParallelism int64 `help:"Number of reader goroutines" default:"200"` ReadTimeout time.Duration `help:"Timeout for faster reads like diffing a single chunk" default:"30s"` ReadRetries uint64 `help:"How many times to retry reading a single chunk (with backoff)" default:"10"` UseCRC32Checksum bool `` /* 126-byte string literal not displayed */ UseConcurrencyLimits bool `help:"Use concurrency limits to automatically find the throughput of the underlying databases" default:"false"` ConfigFile string `help:"TOML formatted config file" short:"f" optional:"" type:"path"` // WriteBatchSize doesn't belong to ReaderConfig but we put that in the TableConfig when we load the table which is // code reused by both checksum and clone so it's easier to put this here for now WriteBatchSize int `help:"Default size of the write batch per transaction (can also be overridden per table)" default:"100"` FailedChunkRetryCount int `help:"Retry a chunk if it fails the checksum, this can be used to checksum across a replica with a master" default:"0"` Config Config `kong:"-"` }
ReaderConfig are used to control the read side, shared between Clone and Checksum
func (*ReaderConfig) LoadConfig ¶
func (c *ReaderConfig) LoadConfig() error
LoadConfig loads the ConfigFile if specified
type Replicate ¶
type Replicate struct { WriterConfig TaskName string `` /* 163-byte string literal not displayed */ ServerID uint32 `help:"Unique identifier of this server, defaults to a hash of the TaskName" optional:""` // TODO should this just be ReadParallelism ChunkParallelism int `help:"Number of chunks to snapshot concurrently" default:"10"` CheckpointTable string `` /* 142-byte string literal not displayed */ WatermarkTable string `` /* 127-byte string literal not displayed */ HeartbeatTable string `` /* 163-byte string literal not displayed */ HeartbeatFrequency time.Duration `` /* 173-byte string literal not displayed */ CreateTables bool `help:"Create the required tables if they do not exist" default:"true"` ChunkBufferSize int `help:"Size of internal queues" default:"100"` ReconnectTimeout time.Duration `help:"How long to try to reconnect after a replication failure (set to 0 to retry forever)" default:"5m"` DoSnapshot bool `` /* 127-byte string literal not displayed */ DoSnapshotDelay time.Duration `help:"How long to wait until running a snapshot" default:"60s"` ReplicationParallelism int64 `help:"Many transactions to apply in parallel during replication" default:"1"` ParallelTransactionBatchMaxSize int `help:"How large batch of transactions to parallelize" default:"100"` ParallelTransactionBatchTimeout time.Duration `help:"How long to wait for a batch of transactions to fill up before executing them anyway" default:"5s"` }
func (*Replicate) ReconnectBackoff ¶
func (cmd *Replicate) ReconnectBackoff() backoff.BackOff
type Replicator ¶
type Replicator struct {
// contains filtered or unexported fields
}
Replicator replicates from source to target
func NewReplicator ¶
func NewReplicator(config Replicate) (*Replicator, error)
type RetryOptions ¶
type Row ¶
type Row struct { Table *Table Data []interface{} }
func (*Row) AppendKeyValues ¶
func (r *Row) AppendKeyValues(values []interface{}) []interface{}
func (*Row) PkAfterOrEqual ¶
PkAfterOrEqual returns true if the pk of the row is higher or equal to the PK of the receiver row
type RowStream ¶
type Snapshotter ¶
type Snapshotter struct {
// contains filtered or unexported fields
}
Snapshotter receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots
func NewSnapshotter ¶
func NewSnapshotter(config Replicate) (*Snapshotter, error)
func (*Snapshotter) Run ¶
func (s *Snapshotter) Run(ctx context.Context, b backoff.BackOff, source chan Transaction, sink chan Transaction) error
type SourceTargetConfig ¶
type Table ¶
type Table struct { Name string // KeyColumns is the columns the table is chunked by, by default the primary key columns KeyColumns []string // KeyColumnList is KeyColumns quoted and comma separated KeyColumnList string // KeyColumnIndexes is KeyColumns quoted and comma separated KeyColumnIndexes []int Config TableConfig Columns []string ColumnsQuoted []string CRC32Columns []string // ColumnList is a comma separated list of quoted strings ColumnList string EstimatedRows int64 // MysqlTable is the go-mysql schema, we should probably start using this one as much as possible MysqlTable *mysqlschema.Table }
func LoadTable ¶
func LoadTable(ctx context.Context, config ReaderConfig, databaseType DataSourceType, conn *sql.DB, schema, tableName string, tableConfig TableConfig) (*Table, error)
func LoadTables ¶
func LoadTables(ctx context.Context, config ReaderConfig) ([]*Table, error)
type TableConfig ¶
type TableConfig struct { IgnoreColumns []string `toml:"ignore_columns" help:"Ignore columns in table"` TargetWhere string `toml:"target_where" help:"Extra where clause that is added on the target"` TargetHint string `toml:"target_hint" help:"Hint placed after the SELECT on target reads"` SourceWhere string `toml:"source_where" help:"Extra where clause that is added on the source"` SourceHint string `toml:"source_hint" help:"Hint placed after the SELECT on target reads"` ChunkSize int `toml:"chunk_size" help:"Global chunk size if chunk size not specified on the table"` WriteBatchSize int `toml:"write_batch_size" help:"Global chunk size if chunk size not specified on the table"` WriteTimout duration `toml:"write_timeout" help:"Global chunk size if chunk size not specified on the table"` KeyColumns []string `toml:"keys" help:"Use these columns as a unique key for this table, defaults to primary key columns"` }
type Transaction ¶
type TransactionStream ¶
type TransactionStream struct {
// contains filtered or unexported fields
}
TransactionStream consumes binlog events and emits full transactions
func NewTransactionStreamer ¶
func NewTransactionStreamer(config Replicate) (*TransactionStream, error)
func (*TransactionStream) Run ¶
func (s *TransactionStream) Run(ctx context.Context, b backoff.BackOff, output chan Transaction) error
type TransactionWriter ¶
type TransactionWriter struct {
// contains filtered or unexported fields
}
TransactionWriter receives transactions and requests to snapshot and writes transactions and strongly consistent chunk snapshots
func NewTransactionWriter ¶
func NewTransactionWriter(config Replicate) (*TransactionWriter, error)
func (*TransactionWriter) Run ¶
func (w *TransactionWriter) Run(ctx context.Context, b backoff.BackOff, transactions chan Transaction) error
type WriterConfig ¶
type WriterConfig struct { ReaderConfig WriteBatchStatementSize int `help:"Size of the write batch per statement" default:"100"` WriterParallelism int64 `help:"Number of writer goroutines" default:"200"` WriterCount int `help:"Number of writer connections" default:"10"` WriteRetries uint64 `help:"Number of retries" default:"5"` WriteTimeout time.Duration `help:"Timeout for each write" default:"30s"` NoDiff bool `help:"Clone without diffing using INSERT IGNORE can be faster as a first pass" default:"false"` }