replica

package module
v0.0.0-...-4b1c1a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2023 License: MIT Imports: 28 Imported by: 0

README

M/S replica

like redis/mysql Master/Slaver replica mode, just simple classic impl (no learner to start, no linear read, slave replica pull bluk RESP with binlog[startLogID,LastLogID]/snapshot file from master);

  1. start with replicaof or exec replicaof cmd

    1. when start service, latest current commit log load to commitID
    2. then slave send replicaaof cmd to start connect master; between replica(master/slave), master start replica goroutine, create connect
    3. slave send replconf to master register slaves
    4. if use restart slave send fullsync cmd to master, master full sync from snapshot compress file to slave, more detail see 3
    5. then start sync loop, send sync/psync cmd with slave current latest logId(syncId),
    6. if slave's logId is less than master's firstLogId, master will tell slave log has been purged, the slave must do a full sync , more detail see 3
    7. else master send [lastLogID+binlog] from log store to slave, util send ack[lastLogID] sync ok
  2. For master, RESP w op cmd commit to save log, wait quorum slaves to ack(sync pull binlog ok), writeBatch to atomic commit to data kvstore, save commitID(logID) to latest current commit log; if save log OK but writeBatch atomic commit or update commitId error, it will also lock write until replication goroutine (runtime schedule thread) executes this log correctly. replay log below:

    1. get next commitId log from log store,(current commitId in commitedId.log)
    2. reset clear WriteBatch data, compress decode log store Data
    3. log store Data([]byte) new Batch replay to WriteBatch for commit
    4. lock to WriteBatch commit and update commitId(logId) to commited.log
  3. slave send fullsync cmd to master

    1. master if don't exists snapshot file, or snapshot file has expired , create new snapshot (one connect per goroutine)
    2. if not, use lastest snapshot file (init ticker job to purge expired snapshot)
    3. then lock snapshot, create new snapshot use data kvstore (FSM) lock write to gen snapshot and iter it save to snapshot file (format: [len(compress key) | compress key | len(compress value) | compress value ...])
    4. from snapshot file read snapshot send bluk([]byte) RESP to slave
    5. slave receive the bluk RESP to save the dump file(reply log)
    6. then lock write to load, clear all data and load dump file write(put) to data kvstore (FSM)

keyword:

  • LogID: a monotonically increasing integer for a log
  • FirstLogID: the oldest log id for a server, all the logs before this id have been purged.
  • LastLogID: the newest log id for a server.
  • CommitID: the last log committed to execute. If LastLogID is 10 and CommitID is 5, server needs to commit logs from 6 - 10 to catch the up to date status.

slave replica connect state: as the same redis role: https://redis.io/commands/role/

The state of the replication from the point of view of the master, that can be connect (the instance needs to connect to its master), connecting (the master-replica connection is in progress), sync (the master and replica are trying to perform the synchronization), connected (the replica is online).

  • RplConnectState: slave needs to connect to its master
  • RplConnectingState: slave-master connection is in progress
  • RplSyncState: perform the synchronization
  • RplConnectedState: slave is online

file:

  • commit.log: record current committed logId which has saved to the log store
  • store log file: replica store log (ILogStore impl eg: WAL)
  • snapshot file: replica snapshot file for fullsync, format [len(compress key) | compress key | len(compress value) | compress value ...]

Notice (keep HA, need a HA failover mechanism, majority select master):

  • if master down, need HA failover server to select a new master role; then slave slaveof/replicaof master to sync log.
  • if store node auto HA failover, need some transport collaboration protocol to select a new leader like raft/paxos consistency protocol, then leader sync log to followers, or like redsi cluster other majority masters use gossip protocol to select a master and notify other masters exchange meta message.
  • a new master seleted (in term/epoch) has done, then notify proxy(codis) to promote

so before do something, need think alternative (failover). Don't put all your eggs in one basket

feature

  1. support redis sentinel to keep M/S replica HA failover
    1. add pub/sub for __sentinel__:hello channel
    2. support info cmd, add Replication section

redis-sentinel log:

40717:X 22 Jul 2023 18:48:15.683 # Sentinel ID is ace6225b2a8faddaf6ad599a8db8b504e0ec2b9d
40717:X 22 Jul 2023 18:48:15.683 # +monitor master mymaster 127.0.0.1 6666 quorum 1
40717:X 22 Jul 2023 18:48:15.684 * +slave slave 127.0.0.1:6667 127.0.0.1 6667 @ mymaster 127.0.0.1 6666


40717:X 22 Jul 2023 19:03:34.694 # +sdown master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.694 # +odown master mymaster 127.0.0.1 6666 #quorum 1/1
40717:X 22 Jul 2023 19:03:34.697 # +new-epoch 9
40717:X 22 Jul 2023 19:03:34.697 # +try-failover master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.702 # +vote-for-leader ace6225b2a8faddaf6ad599a8db8b504e0ec2b9d 9
40717:X 22 Jul 2023 19:03:34.702 # +elected-leader master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.702 # +failover-state-select-slave master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.761 # +selected-slave slave 127.0.0.1:6667 127.0.0.1 6667 @ mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.761 * +failover-state-send-slaveof-noone slave 127.0.0.1:6667 127.0.0.1 6667 @ mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:34.852 * +failover-state-wait-promotion slave 127.0.0.1:6667 127.0.0.1 6667 @ mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:35.712 # +promoted-slave slave 127.0.0.1:6667 127.0.0.1 6667 @ mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:35.712 # +failover-state-reconf-slaves master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:35.779 # +failover-end master mymaster 127.0.0.1 6666
40717:X 22 Jul 2023 19:03:35.779 # +switch-master mymaster 127.0.0.1 6666 127.0.0.1 6667
40717:X 22 Jul 2023 19:03:35.780 * +slave slave 127.0.0.1:6666 127.0.0.1 6666 @ mymaster 127.0.0.1 6667
40717:X 22 Jul 2023 19:03:38.795 # +sdown slave 127.0.0.1:6666 127.0.0.1 6666 @ mymaster 127.0.0.1 6667


40717:X 22 Jul 2023 19:28:23.578 # -sdown slave 127.0.0.1:6666 127.0.0.1 6666 @ mymaster 127.0.0.1 6667
40717:X 22 Jul 2023 19:28:33.535 * +convert-to-slave slave 127.0.0.1:6666 127.0.0.1 6666 @ mymaster 127.0.0.1 6667
  1. 127.0.0.1 6666 is master, 127.0.0.1 6667 replicaof it
  2. 127.0.0.1 6666 down, from sdown->odown, then vote to select leader to failover select master (send replicaof no one to 127.0.0.1 6667 become new master), then promot slave to master role, odown's old master become slave
  3. 127.0.0.1 6666 up, conver to slave, replicaof 127.0.0.1 6667

use info Replication to check

reference

Documentation

Index

Constants

View Source
const (
	InvalidLogID uint64 = 0

	MaxReplLogSize = 1 * 1024 * 1024

	DefaultSlavePriority = 100
)
View Source
const (
	// slave needs to connect to its master
	RplConnectState int32 = iota + 1
	// slave-master connection is in progress
	RplConnectingState
	// perform the synchronization
	RplSyncState
	// slave is online
	RplConnectedState
)
View Source
const LogHeadSize = 17
View Source
const (
	MaxSlotNum = 1024
)

Variables

View Source
var (
	ErrLogNotFound    = errors.New("log not found")
	ErrLogMissed      = errors.New("log is pured in server")
	ErrStoreLogID     = errors.New("log id is less")
	ErrNoBehindLog    = errors.New("no behind commit log")
	ErrCommitIDBehind = errors.New("commit id is behind last log id")

	ErrWriteInROnly  = errors.New("write not support in readonly mode")
	ErrRplInRDWR     = errors.New("replication not support in read write mode")
	ErrRplNotSupport = errors.New("replication not support")
)

Functions

func RegisterExpiredLogStore

func RegisterExpiredLogStore(s IExpiredLogStore) error

Types

type CommitLog

type CommitLog struct {
	// contains filtered or unexported fields
}

CommitLog save current commited logId file

func (*CommitLog) Close

func (m *CommitLog) Close() error

func (*CommitLog) InitCommitLog

func (m *CommitLog) InitCommitLog(cfg *config.ReplicationConfig) (err error)

InitCommitLog open commit.log file to init latest commited logId

func (*CommitLog) UpdateCommitID

func (m *CommitLog) UpdateCommitID(id uint64, force bool) error

UpdateCommitID update current commited logId to commit.log file

type IExpiredLogStore

type IExpiredLogStore interface {
	ILogStore
	// Delete logs before n seconds
	PurgeExpired(n int64) error
}

func GetExpiredLogStore

func GetExpiredLogStore(name LogStoreName) (IExpiredLogStore, error)

type ILogStore

type ILogStore interface {
	Name() LogStoreName
	Open() error
	// FirstID get first logId
	FirstID() (uint64, error)
	// LastID get last logId
	LastID() (uint64, error)

	// GetLog get Log obj by logId
	GetLog(logId uint64, log *Log) error
	// StoreLog if log id is less than current last id, return error
	StoreLog(log *Log) error

	// Sync flush(fsync/msync/sync syscall) in-memory data to stable storage disk file
	Sync() error
	// Clear all logs
	Clear() error
	// Close
	Close() error
}

type Log

type Log struct {
	ID          uint64
	CreateTime  uint32
	Compression uint8

	Data []byte
}

func (*Log) Decode

func (l *Log) Decode(r io.Reader) error

Decode decode bin log by log head meta info (log len)

func (*Log) DecodeAt

func (l *Log) DecodeAt(r io.ReaderAt, pos int64) error

func (*Log) DecodeHead

func (l *Log) DecodeHead(r io.Reader) (uint32, error)

DecodeHead decode log head meta info (log len)

func (*Log) DecodeHeadAt

func (l *Log) DecodeHeadAt(r io.ReaderAt, pos int64) (uint32, error)

func (*Log) Encode

func (l *Log) Encode(w io.Writer) error

encode bin log

func (*Log) HeadSize

func (l *Log) HeadSize() int

func (*Log) Marshal

func (l *Log) Marshal() ([]byte, error)

func (*Log) Size

func (l *Log) Size() int

func (*Log) Unmarshal

func (l *Log) Unmarshal(b []byte) error

type LogStoreName

type LogStoreName string

func ListExpiredLogStore

func ListExpiredLogStore() []LogStoreName

type NewLogEventHandler

type NewLogEventHandler func(rl *Log)

NewLogEventHandler is the handler to handle new log event.

type OpenkvLogStore

type OpenkvLogStore struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewOpenkvLogStore

func NewOpenkvLogStore(cfg *config.OpenkvOptions) *OpenkvLogStore

func (*OpenkvLogStore) Clear

func (s *OpenkvLogStore) Clear() error

func (*OpenkvLogStore) Close

func (s *OpenkvLogStore) Close() error

func (*OpenkvLogStore) FirstID

func (s *OpenkvLogStore) FirstID() (uint64, error)

func (*OpenkvLogStore) GetLog

func (s *OpenkvLogStore) GetLog(id uint64, log *Log) error

func (*OpenkvLogStore) LastID

func (s *OpenkvLogStore) LastID() (uint64, error)

func (*OpenkvLogStore) Name

func (s *OpenkvLogStore) Name() LogStoreName

func (*OpenkvLogStore) Open

func (s *OpenkvLogStore) Open() error

func (*OpenkvLogStore) PurgeExpired

func (s *OpenkvLogStore) PurgeExpired(n int64) error

func (*OpenkvLogStore) Reset

func (s *OpenkvLogStore) Reset()

func (*OpenkvLogStore) StoreLog

func (s *OpenkvLogStore) StoreLog(log *Log) error

func (*OpenkvLogStore) Sync

func (s *OpenkvLogStore) Sync() error

type ReplicaSlave

type ReplicaSlave struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewReplicaSlave

func NewReplicaSlave(srv *RespCmdService) *ReplicaSlave

func (*ReplicaSlave) Close

func (m *ReplicaSlave) Close()

type Replication

type Replication struct {
	// replica w/r op lock
	sync.Mutex
	// contains filtered or unexported fields
}

func NewReplication

func NewReplication(cfg *config.ReplicationConfig) *Replication

func (*Replication) AddNewLogEventHandler

func (r *Replication) AddNewLogEventHandler(h NewLogEventHandler)

AddNewLogEventHandler adds the handler for the new log event

func (*Replication) Close

func (r *Replication) Close() error

func (*Replication) Commit

func (r *Replication) Commit(ctx context.Context, wb *openkv.WriteBatch) (err error)

Commit with WriteBatch for ICommitter Impl, when data batch atomic commit

func (*Replication) CommitIDBehind

func (r *Replication) CommitIDBehind() (bool, error)

CommitIDBehind check commitId behind last logId

func (*Replication) Dump

func (r *Replication) Dump(ctx context.Context, writer io.Writer) error

Dump dumps data to the Writer.

func (*Replication) DumpFile

func (r *Replication) DumpFile(ctx context.Context, path string) error

DumpFile dumps data to the file

func (*Replication) FirstLogID

func (r *Replication) FirstLogID() (uint64, error)

func (*Replication) LastCommitID

func (r *Replication) LastCommitID() (uint64, error)

func (*Replication) LastLogID

func (r *Replication) LastLogID() (uint64, error)

func (*Replication) LoadDump

func (r *Replication) LoadDump(ctx context.Context, read io.Reader) (h *storager.SnapshotHead, err error)

LoadDump clears all data and loads dump file to db

func (*Replication) LoadDumpFile

func (r *Replication) LoadDumpFile(ctx context.Context, path string) (*storager.SnapshotHead, error)

LoadDumpFile clears all data and loads dump file to db

func (*Replication) LogStore

func (r *Replication) LogStore(data []byte) (*Log, error)

Log store log and notify wait

func (*Replication) NextNeedCommitLog

func (r *Replication) NextNeedCommitLog(log *Log) error

NextNeedCommitLog get next commitId log from log store,(current commitId in commited.log)

func (*Replication) OnReplayLogToCommit

func (r *Replication) OnReplayLogToCommit()

func (*Replication) PubNewLogNotify

func (r *Replication) PubNewLogNotify()

PubNewLogNotify pub new log has saved notify

func (*Replication) ReadLogsTo

func (r *Replication) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error)

ReadLogsTo reads [startLogID, lastLogID] logs and write to the Writer. return total logs size n, n must <= MaxReplLogSize nextLogId next logId to sync if startLogID < logStore firtId, return ErrLogMissed, slave receive ErrLogMissed need full sync

func (*Replication) ReadLogsToTimeout

func (r *Replication) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout time.Duration) (n int, nextLogID uint64, err error)

ReadLogsToTimeout tries to read events, if no events read, wait a new log has been stored to read until timeout or processor quit just for sync log to slave

func (*Replication) SetStorager

func (r *Replication) SetStorager(store driver.IStorager)

func (*Replication) Start

func (r *Replication) Start(ctx context.Context) (err error)

func (*Replication) Stat

func (r *Replication) Stat() (s *Stat, err error)

func (*Replication) StoreLog

func (r *Replication) StoreLog(log *Log) error

func (*Replication) StoreLogsFromData

func (r *Replication) StoreLogsFromData(ctx context.Context, data []byte) error

StoreLogsFromData stores logs from data.

func (*Replication) StoreLogsFromReader

func (r *Replication) StoreLogsFromReader(ctx context.Context, rb io.Reader) error

StoreLogsFromReader stores logs from the Reader

func (*Replication) UpdateCommitID

func (r *Replication) UpdateCommitID(id uint64) error

UpdateCommitID update commitId(logId) to commited.log

func (*Replication) WaitNewLog

func (r *Replication) WaitNewLog() <-chan struct{}

WaitNewLog return sub new log has saved notify ch

func (*Replication) WaitReplication

func (r *Replication) WaitReplication() error

WaitReplication waits replication done when start init replica

type RespCmdConn

type RespCmdConn struct {
	*standalone.RespCmdConn
	// contains filtered or unexported fields
}

func (*RespCmdConn) FullSync

func (c *RespCmdConn) FullSync(ctx context.Context, needNew bool) (err error)

FullSync

func (*RespCmdConn) Replicaof

func (c *RespCmdConn) Replicaof(ctx context.Context, masterAddr string, restart bool, readonly bool) (err error)

Replicaof

func (*RespCmdConn) Sync

func (c *RespCmdConn) Sync(syncLogID uint64) (buf []byte, err error)

Sync

type RespCmdService

type RespCmdService struct {

	// standalone RespCmdService
	*standalone.RespCmdService
	// contains filtered or unexported fields
}

func (*RespCmdService) Close

func (s *RespCmdService) Close() (err error)

Close resp service

func (*RespCmdService) InitRespConn

func (s *RespCmdService) InitRespConn(ctx context.Context, dbIdx int) driver.IRespConn

func (*RespCmdService) Name

Name

func (*RespCmdService) OnAccept

func (s *RespCmdService) OnAccept(conn redcon.Conn) bool

func (*RespCmdService) OnClosed

func (s *RespCmdService) OnClosed(conn redcon.Conn, err error)

func (*RespCmdService) RegisterLogStore

func (s *RespCmdService) RegisterLogStore() error

func (*RespCmdService) SetStorager

func (s *RespCmdService) SetStorager(store driver.IStorager)

SetStorager

func (*RespCmdService) Start

func (s *RespCmdService) Start(ctx context.Context) (err error)

Start service set onAccept onClosed then start resp cmd service

type RplStats

type RplStats struct {
	PubLogNum            atomic.Int64
	PubLogAckNum         atomic.Int64
	PubLogTotalAckTimeMs atomic.Int64

	MasterLastLogID atomic.Uint64
}

func (*RplStats) StaticsPubLogTotalAckTime

func (m *RplStats) StaticsPubLogTotalAckTime(handle func())

type SnapshotStore

type SnapshotStore struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewSnapshotStore

func NewSnapshotStore(cfg *config.SnapshotConfig) *SnapshotStore

func (*SnapshotStore) Close

func (s *SnapshotStore) Close()

func (*SnapshotStore) Create

func (s *SnapshotStore) Create(ctx context.Context, d snapshotDumper) (*snapshot, time.Time, error)

Create full sync snapshot for replica(master) data store --dump--> snapshot file

func (*SnapshotStore) Open

func (s *SnapshotStore) Open(ctx context.Context) error

func (*SnapshotStore) OpenLatest

func (s *SnapshotStore) OpenLatest() (*snapshot, time.Time, error)

type SrvInfo

type SrvInfo struct {
	*standalone.SrvInfo
	// contains filtered or unexported fields
}

func NewSrvInfo

func NewSrvInfo(srv *RespCmdService) (srvInfo *SrvInfo)

func (*SrvInfo) DumpReplication

func (m *SrvInfo) DumpReplication(w io.Writer)

type Stat

type Stat struct {
	// FirstID from log store
	FirstID uint64
	// LastID from log store
	LastID uint64
	// CommitId from current commit log, init load to CommitLog.id
	CommitID uint64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL