Documentation ¶
Index ¶
- Constants
- Variables
- func ClientUnaryCallInterceptor(opts0 ...grpc.CallOption) grpc.UnaryClientInterceptor
- type Channel
- type Input
- type Offset
- type Output
- type RedisInput
- func (ri *RedisInput) Id() string
- func (ri *RedisInput) Run() (err error)
- func (ri *RedisInput) RunIds() []string
- func (ri *RedisInput) SetChannel(ch Channel)
- func (ri *RedisInput) SetOutput(output Output)
- func (ri *RedisInput) StateNotify(state SyncState) usync.WaitChannel
- func (ri *RedisInput) Stop() error
- type RedisOutput
- func (ro *RedisOutput) Close()
- func (ro *RedisOutput) NewRedisConn(ctx context.Context) (conn client.Redis, err error)
- func (ro *RedisOutput) Send(ctx context.Context, reader *store.Reader) error
- func (ro *RedisOutput) SendAof(ctx context.Context, reader *store.Reader) error
- func (ro *RedisOutput) SendRdb(ctx context.Context, reader *store.Reader) error
- func (ro *RedisOutput) SetRunId(ctx context.Context, id string) error
- func (ro *RedisOutput) StartPoint(ctx context.Context, runIds []string) (sp StartPoint, err error)
- type RedisOutputConfig
- type ReplicaFollower
- type ReplicaLeader
- type StartPoint
- type StoreChannel
- func (sc *StoreChannel) Close() error
- func (sc *StoreChannel) DelRunId(runId string) error
- func (sc *StoreChannel) GetOffsetRange(runId string) (int64, int64)
- func (sc *StoreChannel) GetRdb(runId string) (int64, int64)
- func (sc *StoreChannel) IsValidOffset(off Offset) bool
- func (sc *StoreChannel) NewAofWritter(r io.Reader, offset int64) (*store.AofWriter, error)
- func (sc *StoreChannel) NewRdbWriter(reader io.Reader, offset int64, size int64) (*store.RdbWriter, error)
- func (sc *StoreChannel) NewReader(offset Offset) (*store.Reader, error)
- func (sc *StoreChannel) RunId() string
- func (sc *StoreChannel) SetRunId(runId string) error
- func (sc *StoreChannel) StartPoint(ids []string) (StartPoint, error)
- type StorerConf
- type SyncFiniteStateMachine
- func (sm *SyncFiniteStateMachine) AddObserver(observer SyncStateObserver)
- func (sm *SyncFiniteStateMachine) Reset()
- func (sm *SyncFiniteStateMachine) SetState(state SyncState)
- func (sm *SyncFiniteStateMachine) State() SyncState
- func (sm *SyncFiniteStateMachine) StateNotify(state SyncState) usync.WaitChannel
- type SyncState
- type SyncStateObserver
- type Syncer
- type SyncerConfig
- type SyncerRole
- type SyncerState
Constants ¶
View Source
const ( // state SyncerStateReadyRun SyncerState = iota SyncerStateRun SyncerState = iota SyncerStatePause SyncerState = iota SyncerStateStop SyncerState = iota // role SyncerRoleLeader SyncerRole = iota SyncerRoleFollower SyncerRole = iota )
Variables ¶
View Source
var ( // @TODO // only syncer errors // // // first level // break loop ErrBreak = errors.New("break") // syncer role is changed ErrRole = errors.New("role") // stop sync ErrStopSync = errors.New("stop sync") // quit process ErrQuit = fmt.Errorf("%w %s", ErrBreak, "quit") // restart command ErrRestart = fmt.Errorf("%w %s", ErrBreak, "restart") // restart all syncers ErrRedisTypologyChanged = fmt.Errorf("%w %s", ErrRestart, "redis typology is changed") // leadership ErrLeaderHandover = fmt.Errorf("%w %s", ErrRole, "hand over leadership") ErrLeaderTakeover = fmt.Errorf("%w %s", ErrRole, "take over leadership") // data ErrCorrupted = fmt.Errorf("%w %s", ErrBreak, "corrupted") )
View Source
var ErrReplicaNoRunning = errors.New("replica leader is not running")
Functions ¶
func ClientUnaryCallInterceptor ¶
func ClientUnaryCallInterceptor(opts0 ...grpc.CallOption) grpc.UnaryClientInterceptor
Types ¶
type Channel ¶
type Channel interface { StartPoint([]string) (StartPoint, error) SetRunId(string) error DelRunId(string) error RunId() string IsValidOffset(Offset) bool GetOffsetRange(string) (int64, int64) GetRdb(string) (int64, int64) NewRdbWriter(io.Reader, int64, int64) (*store.RdbWriter, error) NewAofWritter(r io.Reader, offset int64) (*store.AofWriter, error) NewReader(Offset) (*store.Reader, error) Close() error }
type RedisInput ¶
type RedisInput struct {
// contains filtered or unexported fields
}
func NewRedisInput ¶
func NewRedisInput(redisCfg config.RedisConfig) *RedisInput
func (*RedisInput) Id ¶
func (ri *RedisInput) Id() string
func (*RedisInput) Run ¶
func (ri *RedisInput) Run() (err error)
func (*RedisInput) RunIds ¶
func (ri *RedisInput) RunIds() []string
func (*RedisInput) SetChannel ¶
func (ri *RedisInput) SetChannel(ch Channel)
func (*RedisInput) SetOutput ¶
func (ri *RedisInput) SetOutput(output Output)
func (*RedisInput) StateNotify ¶
func (ri *RedisInput) StateNotify(state SyncState) usync.WaitChannel
type RedisOutput ¶
type RedisOutput struct {
// contains filtered or unexported fields
}
func NewRedisOutput ¶
func NewRedisOutput(cfg RedisOutputConfig) *RedisOutput
func (*RedisOutput) Close ¶
func (ro *RedisOutput) Close()
func (*RedisOutput) NewRedisConn ¶
func (*RedisOutput) SetRunId ¶
func (ro *RedisOutput) SetRunId(ctx context.Context, id string) error
func (*RedisOutput) StartPoint ¶
func (ro *RedisOutput) StartPoint(ctx context.Context, runIds []string) (sp StartPoint, err error)
type RedisOutputConfig ¶
type RedisOutputConfig struct { InputName string CheckpointName string RunId string CanTransaction bool Redis config.RedisConfig EnableResumeFromBreakPoint bool ReplaceHashTag bool `yaml:"replaceHashTag"` KeyExists string `yaml:"keyExists"` // replace|ignore|error KeyExistsLog bool `yaml:"keyExistsLog"` FunctionExists string `yaml:"functionExists"` MaxProtoBulkLen int `yaml:"maxProtoBulkLen"` // proto-max-bulk-len, default value of redis is 512MiB TargetDb int `yaml:"-"` TargetDbMap map[int]int `yaml:"targetDbMap"` BatchCmdCount uint `yaml:"batchCmdCount"` BatchTicker time.Duration `yaml:"batchTicker"` BatchBufferSize uint64 `yaml:"batchBufferSize"` KeepaliveTicker time.Duration `yaml:"keepaliveTicker"` ReplayRdbParallel int `yaml:"replayRdbParallel"` ReplayRdbEnableRestore bool `yaml:"replayRdbEnableRestore" default:"true"` UpdateCheckpointTicker time.Duration `yaml:"updateCheckpointTicker"` Stats config.OutputStats `yaml:"stats"` Filter config.FilterConfig SyncDelayTestKey string }
type ReplicaFollower ¶
type ReplicaFollower struct {
// contains filtered or unexported fields
}
func NewReplicaFollower ¶
func (*ReplicaFollower) Run ¶
func (rf *ReplicaFollower) Run() error
func (*ReplicaFollower) Stop ¶
func (rf *ReplicaFollower) Stop()
type ReplicaLeader ¶
type ReplicaLeader struct {
// contains filtered or unexported fields
}
func NewReplicaLeader ¶
func NewReplicaLeader(input Input, channel Channel) *ReplicaLeader
func (*ReplicaLeader) Handle ¶
func (rl *ReplicaLeader) Handle(wait usync.WaitCloser, req *pb.SyncRequest, stream pb.ApiService_SyncServer) error
func (*ReplicaLeader) Start ¶
func (rl *ReplicaLeader) Start()
func (*ReplicaLeader) Stop ¶
func (rl *ReplicaLeader) Stop()
type StartPoint ¶
func (*StartPoint) Initialize ¶
func (sp *StartPoint) Initialize()
func (*StartPoint) IsInitial ¶
func (sp *StartPoint) IsInitial() bool
func (*StartPoint) IsValid ¶
func (sp *StartPoint) IsValid() bool
func (*StartPoint) SetOffset ¶
func (sp *StartPoint) SetOffset(off Offset)
func (*StartPoint) ToOffset ¶
func (sp *StartPoint) ToOffset() Offset
type StoreChannel ¶
type StoreChannel struct { StorerCfg StorerConf // contains filtered or unexported fields }
func NewStoreChannel ¶
func NewStoreChannel(cfg StorerConf) *StoreChannel
func (*StoreChannel) Close ¶
func (sc *StoreChannel) Close() error
func (*StoreChannel) DelRunId ¶
func (sc *StoreChannel) DelRunId(runId string) error
func (*StoreChannel) GetOffsetRange ¶
func (sc *StoreChannel) GetOffsetRange(runId string) (int64, int64)
func (*StoreChannel) IsValidOffset ¶
func (sc *StoreChannel) IsValidOffset(off Offset) bool
func (*StoreChannel) NewAofWritter ¶
func (*StoreChannel) NewRdbWriter ¶
func (*StoreChannel) NewReader ¶
func (sc *StoreChannel) NewReader(offset Offset) (*store.Reader, error)
func (*StoreChannel) RunId ¶
func (sc *StoreChannel) RunId() string
func (*StoreChannel) SetRunId ¶
func (sc *StoreChannel) SetRunId(runId string) error
func (*StoreChannel) StartPoint ¶
func (sc *StoreChannel) StartPoint(ids []string) (StartPoint, error)
type StorerConf ¶
type SyncFiniteStateMachine ¶
type SyncFiniteStateMachine struct {
// contains filtered or unexported fields
}
func NewSyncFiniteStateMachine ¶
func NewSyncFiniteStateMachine() *SyncFiniteStateMachine
func (*SyncFiniteStateMachine) AddObserver ¶
func (sm *SyncFiniteStateMachine) AddObserver(observer SyncStateObserver)
synchronous
func (*SyncFiniteStateMachine) Reset ¶
func (sm *SyncFiniteStateMachine) Reset()
func (*SyncFiniteStateMachine) SetState ¶
func (sm *SyncFiniteStateMachine) SetState(state SyncState)
func (*SyncFiniteStateMachine) State ¶
func (sm *SyncFiniteStateMachine) State() SyncState
func (*SyncFiniteStateMachine) StateNotify ¶
func (sm *SyncFiniteStateMachine) StateNotify(state SyncState) usync.WaitChannel
type SyncStateObserver ¶
type SyncStateObserver interface {
Update(SyncState)
}
type Syncer ¶
type Syncer interface { RunLeader() error RunFollower(leader *cluster.RoleInfo) error Stop() ServiceReplica(req *pb.SyncRequest, stream pb.ApiService_SyncServer) error RunIds() []string IsLeader() bool Pause() DelRunId() Resume() State() SyncerState Role() SyncerRole TransactionMode() bool }
func NewSyncer ¶
func NewSyncer(cfg SyncerConfig) Syncer
type SyncerConfig ¶
type SyncerConfig struct { Id int Input config.RedisConfig Output config.RedisConfig Channel config.ChannelConfig CanTransaction bool }
type SyncerRole ¶
type SyncerRole int
func (SyncerRole) String ¶
func (sr SyncerRole) String() string
type SyncerState ¶
type SyncerState int
func (SyncerState) String ¶
func (ss SyncerState) String() string
Click to show internal directories.
Click to hide internal directories.