Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterExpiredLogStore(s IExpiredLogStore) error
- type CommitLog
- type IExpiredLogStore
- type ILogStore
- type Log
- func (l *Log) Decode(r io.Reader) error
- func (l *Log) DecodeAt(r io.ReaderAt, pos int64) error
- func (l *Log) DecodeHead(r io.Reader) (uint32, error)
- func (l *Log) DecodeHeadAt(r io.ReaderAt, pos int64) (uint32, error)
- func (l *Log) Encode(w io.Writer) error
- func (l *Log) HeadSize() int
- func (l *Log) Marshal() ([]byte, error)
- func (l *Log) Size() int
- func (l *Log) Unmarshal(b []byte) error
- type LogStoreName
- type NewLogEventHandler
- type OpenkvLogStore
- func (s *OpenkvLogStore) Clear() error
- func (s *OpenkvLogStore) Close() error
- func (s *OpenkvLogStore) FirstID() (uint64, error)
- func (s *OpenkvLogStore) GetLog(id uint64, log *Log) error
- func (s *OpenkvLogStore) LastID() (uint64, error)
- func (s *OpenkvLogStore) Name() LogStoreName
- func (s *OpenkvLogStore) Open() error
- func (s *OpenkvLogStore) PurgeExpired(n int64) error
- func (s *OpenkvLogStore) Reset()
- func (s *OpenkvLogStore) StoreLog(log *Log) error
- func (s *OpenkvLogStore) Sync() error
- type ReplicaSlave
- type Replication
- func (r *Replication) AddNewLogEventHandler(h NewLogEventHandler)
- func (r *Replication) Close() error
- func (r *Replication) Commit(ctx context.Context, wb *openkv.WriteBatch) (err error)
- func (r *Replication) CommitIDBehind() (bool, error)
- func (r *Replication) Dump(ctx context.Context, writer io.Writer) error
- func (r *Replication) DumpFile(ctx context.Context, path string) error
- func (r *Replication) FirstLogID() (uint64, error)
- func (r *Replication) LastCommitID() (uint64, error)
- func (r *Replication) LastLogID() (uint64, error)
- func (r *Replication) LoadDump(ctx context.Context, read io.Reader) (h *storager.SnapshotHead, err error)
- func (r *Replication) LoadDumpFile(ctx context.Context, path string) (*storager.SnapshotHead, error)
- func (r *Replication) LogStore(data []byte) (*Log, error)
- func (r *Replication) NextNeedCommitLog(log *Log) error
- func (r *Replication) OnReplayLogToCommit()
- func (r *Replication) PubNewLogNotify()
- func (r *Replication) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error)
- func (r *Replication) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout time.Duration) (n int, nextLogID uint64, err error)
- func (r *Replication) SetStorager(store driver.IStorager)
- func (r *Replication) Start(ctx context.Context) (err error)
- func (r *Replication) Stat() (s *Stat, err error)
- func (r *Replication) StoreLog(log *Log) error
- func (r *Replication) StoreLogsFromData(ctx context.Context, data []byte) error
- func (r *Replication) StoreLogsFromReader(ctx context.Context, rb io.Reader) error
- func (r *Replication) UpdateCommitID(id uint64) error
- func (r *Replication) WaitNewLog() <-chan struct{}
- func (r *Replication) WaitReplication() error
- type RespCmdConn
- type RespCmdService
- func (s *RespCmdService) Close() (err error)
- func (s *RespCmdService) InitRespConn(ctx context.Context, dbIdx int) driver.IRespConn
- func (s *RespCmdService) Name() driver.RespServiceName
- func (s *RespCmdService) OnAccept(conn redcon.Conn) bool
- func (s *RespCmdService) OnClosed(conn redcon.Conn, err error)
- func (s *RespCmdService) RegisterLogStore() error
- func (s *RespCmdService) SetStorager(store driver.IStorager)
- func (s *RespCmdService) Start(ctx context.Context) (err error)
- type RplStats
- type SnapshotStore
- type SrvInfo
- type Stat
Constants ¶
const ( InvalidLogID uint64 = 0 MaxReplLogSize = 1 * 1024 * 1024 DefaultSlavePriority = 100 )
const ( // slave needs to connect to its master RplConnectState int32 = iota + 1 // slave-master connection is in progress RplConnectingState // perform the synchronization RplSyncState // slave is online RplConnectedState )
const LogHeadSize = 17
const (
MaxSlotNum = 1024
)
Variables ¶
var ( ErrLogNotFound = errors.New("log not found") ErrLogMissed = errors.New("log is pured in server") ErrStoreLogID = errors.New("log id is less") ErrNoBehindLog = errors.New("no behind commit log") ErrCommitIDBehind = errors.New("commit id is behind last log id") ErrWriteInROnly = errors.New("write not support in readonly mode") ErrRplInRDWR = errors.New("replication not support in read write mode") ErrRplNotSupport = errors.New("replication not support") )
Functions ¶
func RegisterExpiredLogStore ¶
func RegisterExpiredLogStore(s IExpiredLogStore) error
Types ¶
type CommitLog ¶
type CommitLog struct {
// contains filtered or unexported fields
}
CommitLog save current commited logId file
func (*CommitLog) InitCommitLog ¶
func (m *CommitLog) InitCommitLog(cfg *config.ReplicationConfig) (err error)
InitCommitLog open commit.log file to init latest commited logId
type IExpiredLogStore ¶
type IExpiredLogStore interface { ILogStore // Delete logs before n seconds PurgeExpired(n int64) error }
func GetExpiredLogStore ¶
func GetExpiredLogStore(name LogStoreName) (IExpiredLogStore, error)
type ILogStore ¶
type ILogStore interface { Name() LogStoreName Open() error // FirstID get first logId FirstID() (uint64, error) // LastID get last logId LastID() (uint64, error) // GetLog get Log obj by logId GetLog(logId uint64, log *Log) error // StoreLog if log id is less than current last id, return error StoreLog(log *Log) error // Sync flush(fsync/msync/sync syscall) in-memory data to stable storage disk file Sync() error // Clear all logs Clear() error // Close Close() error }
type Log ¶
func (*Log) DecodeHead ¶
DecodeHead decode log head meta info (log len)
type LogStoreName ¶
type LogStoreName string
func ListExpiredLogStore ¶
func ListExpiredLogStore() []LogStoreName
type NewLogEventHandler ¶
type NewLogEventHandler func(rl *Log)
NewLogEventHandler is the handler to handle new log event.
type OpenkvLogStore ¶
func NewOpenkvLogStore ¶
func NewOpenkvLogStore(cfg *config.OpenkvOptions) *OpenkvLogStore
func (*OpenkvLogStore) Clear ¶
func (s *OpenkvLogStore) Clear() error
func (*OpenkvLogStore) Close ¶
func (s *OpenkvLogStore) Close() error
func (*OpenkvLogStore) FirstID ¶
func (s *OpenkvLogStore) FirstID() (uint64, error)
func (*OpenkvLogStore) LastID ¶
func (s *OpenkvLogStore) LastID() (uint64, error)
func (*OpenkvLogStore) Name ¶
func (s *OpenkvLogStore) Name() LogStoreName
func (*OpenkvLogStore) Open ¶
func (s *OpenkvLogStore) Open() error
func (*OpenkvLogStore) PurgeExpired ¶
func (s *OpenkvLogStore) PurgeExpired(n int64) error
func (*OpenkvLogStore) Reset ¶
func (s *OpenkvLogStore) Reset()
func (*OpenkvLogStore) StoreLog ¶
func (s *OpenkvLogStore) StoreLog(log *Log) error
func (*OpenkvLogStore) Sync ¶
func (s *OpenkvLogStore) Sync() error
type ReplicaSlave ¶
func NewReplicaSlave ¶
func NewReplicaSlave(srv *RespCmdService) *ReplicaSlave
func (*ReplicaSlave) Close ¶
func (m *ReplicaSlave) Close()
type Replication ¶
type Replication struct { // replica w/r op lock sync.Mutex // contains filtered or unexported fields }
func NewReplication ¶
func NewReplication(cfg *config.ReplicationConfig) *Replication
func (*Replication) AddNewLogEventHandler ¶
func (r *Replication) AddNewLogEventHandler(h NewLogEventHandler)
AddNewLogEventHandler adds the handler for the new log event
func (*Replication) Close ¶
func (r *Replication) Close() error
func (*Replication) Commit ¶
func (r *Replication) Commit(ctx context.Context, wb *openkv.WriteBatch) (err error)
Commit with WriteBatch for ICommitter Impl, when data batch atomic commit
func (*Replication) CommitIDBehind ¶
func (r *Replication) CommitIDBehind() (bool, error)
CommitIDBehind check commitId behind last logId
func (*Replication) DumpFile ¶
func (r *Replication) DumpFile(ctx context.Context, path string) error
DumpFile dumps data to the file
func (*Replication) FirstLogID ¶
func (r *Replication) FirstLogID() (uint64, error)
func (*Replication) LastCommitID ¶
func (r *Replication) LastCommitID() (uint64, error)
func (*Replication) LastLogID ¶
func (r *Replication) LastLogID() (uint64, error)
func (*Replication) LoadDump ¶
func (r *Replication) LoadDump(ctx context.Context, read io.Reader) (h *storager.SnapshotHead, err error)
LoadDump clears all data and loads dump file to db
func (*Replication) LoadDumpFile ¶
func (r *Replication) LoadDumpFile(ctx context.Context, path string) (*storager.SnapshotHead, error)
LoadDumpFile clears all data and loads dump file to db
func (*Replication) LogStore ¶
func (r *Replication) LogStore(data []byte) (*Log, error)
Log store log and notify wait
func (*Replication) NextNeedCommitLog ¶
func (r *Replication) NextNeedCommitLog(log *Log) error
NextNeedCommitLog get next commitId log from log store,(current commitId in commited.log)
func (*Replication) OnReplayLogToCommit ¶
func (r *Replication) OnReplayLogToCommit()
func (*Replication) PubNewLogNotify ¶
func (r *Replication) PubNewLogNotify()
PubNewLogNotify pub new log has saved notify
func (*Replication) ReadLogsTo ¶
func (r *Replication) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error)
ReadLogsTo reads [startLogID, lastLogID] logs and write to the Writer. return total logs size n, n must <= MaxReplLogSize nextLogId next logId to sync if startLogID < logStore firtId, return ErrLogMissed, slave receive ErrLogMissed need full sync
func (*Replication) ReadLogsToTimeout ¶
func (r *Replication) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout time.Duration) (n int, nextLogID uint64, err error)
ReadLogsToTimeout tries to read events, if no events read, wait a new log has been stored to read until timeout or processor quit just for sync log to slave
func (*Replication) SetStorager ¶
func (r *Replication) SetStorager(store driver.IStorager)
func (*Replication) Stat ¶
func (r *Replication) Stat() (s *Stat, err error)
func (*Replication) StoreLog ¶
func (r *Replication) StoreLog(log *Log) error
func (*Replication) StoreLogsFromData ¶
func (r *Replication) StoreLogsFromData(ctx context.Context, data []byte) error
StoreLogsFromData stores logs from data.
func (*Replication) StoreLogsFromReader ¶
StoreLogsFromReader stores logs from the Reader
func (*Replication) UpdateCommitID ¶
func (r *Replication) UpdateCommitID(id uint64) error
UpdateCommitID update commitId(logId) to commited.log
func (*Replication) WaitNewLog ¶
func (r *Replication) WaitNewLog() <-chan struct{}
WaitNewLog return sub new log has saved notify ch
func (*Replication) WaitReplication ¶
func (r *Replication) WaitReplication() error
WaitReplication waits replication done when start init replica
type RespCmdConn ¶
type RespCmdConn struct { *standalone.RespCmdConn // contains filtered or unexported fields }
func (*RespCmdConn) FullSync ¶
func (c *RespCmdConn) FullSync(ctx context.Context, needNew bool) (err error)
FullSync
type RespCmdService ¶
type RespCmdService struct { // standalone RespCmdService *standalone.RespCmdService // contains filtered or unexported fields }
func New ¶
func New(opts *config.RespCmdServiceOptions, standaloneOpts *standaloneCfg.RespCmdServiceOptions) *RespCmdService
func (*RespCmdService) InitRespConn ¶
func (*RespCmdService) RegisterLogStore ¶
func (s *RespCmdService) RegisterLogStore() error
func (*RespCmdService) SetStorager ¶
func (s *RespCmdService) SetStorager(store driver.IStorager)
SetStorager
type RplStats ¶
type RplStats struct { PubLogNum atomic.Int64 PubLogAckNum atomic.Int64 PubLogTotalAckTimeMs atomic.Int64 MasterLastLogID atomic.Uint64 }
func (*RplStats) StaticsPubLogTotalAckTime ¶
func (m *RplStats) StaticsPubLogTotalAckTime(handle func())
type SnapshotStore ¶
func NewSnapshotStore ¶
func NewSnapshotStore(cfg *config.SnapshotConfig) *SnapshotStore
func (*SnapshotStore) Close ¶
func (s *SnapshotStore) Close()
func (*SnapshotStore) Create ¶
Create full sync snapshot for replica(master) data store --dump--> snapshot file
func (*SnapshotStore) OpenLatest ¶
func (s *SnapshotStore) OpenLatest() (*snapshot, time.Time, error)
type SrvInfo ¶
type SrvInfo struct { *standalone.SrvInfo // contains filtered or unexported fields }
func NewSrvInfo ¶
func NewSrvInfo(srv *RespCmdService) (srvInfo *SrvInfo)