syncer

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// state
	SyncerStateReadyRun SyncerState = iota
	SyncerStateRun      SyncerState = iota
	SyncerStatePause    SyncerState = iota
	SyncerStateStop     SyncerState = iota

	// role
	SyncerRoleLeader   SyncerRole = iota
	SyncerRoleFollower SyncerRole = iota
)

Variables

View Source
var (
	// @TODO
	// only syncer errors
	//
	//
	// first level
	// break loop
	ErrBreak = errors.New("break")
	// syncer role is changed
	ErrRole = errors.New("role")

	// stop sync
	ErrStopSync = errors.New("stop sync")

	// quit process
	ErrQuit = fmt.Errorf("%w %s", ErrBreak, "quit")
	// restart command
	ErrRestart = fmt.Errorf("%w %s", ErrBreak, "restart")
	// restart all syncers
	ErrRedisTypologyChanged = fmt.Errorf("%w %s", ErrRestart, "redis typology is changed")
	// leadership
	ErrLeaderHandover = fmt.Errorf("%w %s", ErrRole, "hand over leadership")
	ErrLeaderTakeover = fmt.Errorf("%w %s", ErrRole, "take over leadership")
	// data
	ErrCorrupted = fmt.Errorf("%w %s", ErrBreak, "corrupted")
)
View Source
var ErrReplicaNoRunning = errors.New("replica leader is not running")

Functions

func ClientUnaryCallInterceptor

func ClientUnaryCallInterceptor(opts0 ...grpc.CallOption) grpc.UnaryClientInterceptor

Types

type Channel

type Channel interface {
	StartPoint([]string) (StartPoint, error)
	SetRunId(string) error
	DelRunId(string) error
	RunId() string
	IsValidOffset(Offset) bool
	GetOffsetRange(string) (int64, int64)
	GetRdb(string) (int64, int64)
	NewRdbWriter(io.Reader, int64, int64) (*store.RdbWriter, error)
	NewAofWritter(r io.Reader, offset int64) (*store.AofWriter, error)
	NewReader(Offset) (*store.Reader, error)
	Close() error
}

type Input

type Input interface {
	Id() string
	Run() error
	Stop() error
	SetOutput(output Output) // @TODO multi outputs
	SetChannel(ch Channel)
	StateNotify(SyncState) usync.WaitChannel
	RunIds() []string
}

type Offset

type Offset struct {
	RunId  string
	Offset int64
}

type Output

type Output interface {
	StartPoint(ctx context.Context, runIds []string) (StartPoint, error)
	Send(ctx context.Context, reader *store.Reader) error
	SetRunId(ctx context.Context, runId string) error
	Close()
}

type RedisInput

type RedisInput struct {
	// contains filtered or unexported fields
}

func NewRedisInput

func NewRedisInput(redisCfg config.RedisConfig) *RedisInput

func (*RedisInput) Id

func (ri *RedisInput) Id() string

func (*RedisInput) Run

func (ri *RedisInput) Run() (err error)

func (*RedisInput) RunIds

func (ri *RedisInput) RunIds() []string

func (*RedisInput) SetChannel

func (ri *RedisInput) SetChannel(ch Channel)

func (*RedisInput) SetOutput

func (ri *RedisInput) SetOutput(output Output)

func (*RedisInput) StateNotify

func (ri *RedisInput) StateNotify(state SyncState) usync.WaitChannel

func (*RedisInput) Stop

func (ri *RedisInput) Stop() error

@TODO call stop

type RedisOutput

type RedisOutput struct {
	// contains filtered or unexported fields
}

func NewRedisOutput

func NewRedisOutput(cfg RedisOutputConfig) *RedisOutput

func (*RedisOutput) Close

func (ro *RedisOutput) Close()

func (*RedisOutput) NewRedisConn

func (ro *RedisOutput) NewRedisConn(ctx context.Context) (conn client.Redis, err error)

func (*RedisOutput) Send

func (ro *RedisOutput) Send(ctx context.Context, reader *store.Reader) error

func (*RedisOutput) SendAof

func (ro *RedisOutput) SendAof(ctx context.Context, reader *store.Reader) error

func (*RedisOutput) SendRdb

func (ro *RedisOutput) SendRdb(ctx context.Context, reader *store.Reader) error

func (*RedisOutput) SetRunId

func (ro *RedisOutput) SetRunId(ctx context.Context, id string) error

func (*RedisOutput) StartPoint

func (ro *RedisOutput) StartPoint(ctx context.Context, runIds []string) (sp StartPoint, err error)

type RedisOutputConfig

type RedisOutputConfig struct {
	InputName                  string
	CheckpointName             string
	RunId                      string
	CanTransaction             bool
	Redis                      config.RedisConfig
	EnableResumeFromBreakPoint bool

	ReplaceHashTag         bool               `yaml:"replaceHashTag"`
	KeyExists              string             `yaml:"keyExists"` // replace|ignore|error
	KeyExistsLog           bool               `yaml:"keyExistsLog"`
	FunctionExists         string             `yaml:"functionExists"`
	MaxProtoBulkLen        int                `yaml:"maxProtoBulkLen"` // proto-max-bulk-len, default value of redis is 512MiB
	TargetDb               int                `yaml:"-"`
	TargetDbMap            map[int]int        `yaml:"targetDbMap"`
	BatchCmdCount          uint               `yaml:"batchCmdCount"`
	BatchTicker            time.Duration      `yaml:"batchTicker"`
	BatchBufferSize        uint64             `yaml:"batchBufferSize"`
	KeepaliveTicker        time.Duration      `yaml:"keepaliveTicker"`
	ReplayRdbParallel      int                `yaml:"replayRdbParallel"`
	ReplayRdbEnableRestore bool               `yaml:"replayRdbEnableRestore" default:"true"`
	UpdateCheckpointTicker time.Duration      `yaml:"updateCheckpointTicker"`
	Stats                  config.OutputStats `yaml:"stats"`

	Filter           config.FilterConfig
	SyncDelayTestKey string
}

type ReplicaFollower

type ReplicaFollower struct {
	// contains filtered or unexported fields
}

func NewReplicaFollower

func NewReplicaFollower(id int, inputAddress string, channel Channel, leader *cluster.RoleInfo) *ReplicaFollower

func (*ReplicaFollower) Run

func (rf *ReplicaFollower) Run() error

func (*ReplicaFollower) Stop

func (rf *ReplicaFollower) Stop()

type ReplicaLeader

type ReplicaLeader struct {
	// contains filtered or unexported fields
}

func NewReplicaLeader

func NewReplicaLeader(input Input, channel Channel) *ReplicaLeader

func (*ReplicaLeader) Handle

func (rl *ReplicaLeader) Handle(wait usync.WaitCloser, req *pb.SyncRequest, stream pb.ApiService_SyncServer) error

func (*ReplicaLeader) Start

func (rl *ReplicaLeader) Start()

func (*ReplicaLeader) Stop

func (rl *ReplicaLeader) Stop()

type StartPoint

type StartPoint struct {
	DbId   int
	RunId  string
	Offset int64
}

func (*StartPoint) Initialize

func (sp *StartPoint) Initialize()

func (*StartPoint) IsInitial

func (sp *StartPoint) IsInitial() bool

func (*StartPoint) IsValid

func (sp *StartPoint) IsValid() bool

func (*StartPoint) SetOffset

func (sp *StartPoint) SetOffset(off Offset)

func (*StartPoint) ToOffset

func (sp *StartPoint) ToOffset() Offset

type StoreChannel

type StoreChannel struct {
	StorerCfg StorerConf
	// contains filtered or unexported fields
}

func NewStoreChannel

func NewStoreChannel(cfg StorerConf) *StoreChannel

func (*StoreChannel) Close

func (sc *StoreChannel) Close() error

func (*StoreChannel) DelRunId

func (sc *StoreChannel) DelRunId(runId string) error

func (*StoreChannel) GetOffsetRange

func (sc *StoreChannel) GetOffsetRange(runId string) (int64, int64)

func (*StoreChannel) GetRdb

func (sc *StoreChannel) GetRdb(runId string) (int64, int64)

func (*StoreChannel) IsValidOffset

func (sc *StoreChannel) IsValidOffset(off Offset) bool

func (*StoreChannel) NewAofWritter

func (sc *StoreChannel) NewAofWritter(r io.Reader, offset int64) (*store.AofWriter, error)

func (*StoreChannel) NewRdbWriter

func (sc *StoreChannel) NewRdbWriter(reader io.Reader, offset int64, size int64) (*store.RdbWriter, error)

func (*StoreChannel) NewReader

func (sc *StoreChannel) NewReader(offset Offset) (*store.Reader, error)

func (*StoreChannel) RunId

func (sc *StoreChannel) RunId() string

func (*StoreChannel) SetRunId

func (sc *StoreChannel) SetRunId(runId string) error

func (*StoreChannel) StartPoint

func (sc *StoreChannel) StartPoint(ids []string) (StartPoint, error)

type StorerConf

type StorerConf struct {
	InputId string
	Dir     string
	MaxSize int64
	LogSize int64
	// contains filtered or unexported fields
}

type SyncFiniteStateMachine

type SyncFiniteStateMachine struct {
	// contains filtered or unexported fields
}

func NewSyncFiniteStateMachine

func NewSyncFiniteStateMachine() *SyncFiniteStateMachine

func (*SyncFiniteStateMachine) AddObserver

func (sm *SyncFiniteStateMachine) AddObserver(observer SyncStateObserver)

synchronous

func (*SyncFiniteStateMachine) Reset

func (sm *SyncFiniteStateMachine) Reset()

func (*SyncFiniteStateMachine) SetState

func (sm *SyncFiniteStateMachine) SetState(state SyncState)

func (*SyncFiniteStateMachine) State

func (sm *SyncFiniteStateMachine) State() SyncState

func (*SyncFiniteStateMachine) StateNotify

func (sm *SyncFiniteStateMachine) StateNotify(state SyncState) usync.WaitChannel

type SyncState

type SyncState int
const (
	SyncStateStarted     SyncState = iota
	SyncStateFullInit    SyncState = iota
	SyncStateFullSyncing SyncState = iota
	SyncStateFullSynced  SyncState = iota
	SyncStateIncrSyncing SyncState = iota
	SyncStateIncrSynced  SyncState = iota
)

type SyncStateObserver

type SyncStateObserver interface {
	Update(SyncState)
}

type Syncer

type Syncer interface {
	RunLeader() error
	RunFollower(leader *cluster.RoleInfo) error
	Stop()
	ServiceReplica(req *pb.SyncRequest, stream pb.ApiService_SyncServer) error
	RunIds() []string
	IsLeader() bool
	Pause()
	DelRunId()
	Resume()
	State() SyncerState
	Role() SyncerRole
	TransactionMode() bool
}

func NewSyncer

func NewSyncer(cfg SyncerConfig) Syncer

type SyncerConfig

type SyncerConfig struct {
	Id             int
	Input          config.RedisConfig
	Output         config.RedisConfig
	Channel        config.ChannelConfig
	CanTransaction bool
}

type SyncerRole

type SyncerRole int

func (SyncerRole) String

func (sr SyncerRole) String() string

type SyncerState

type SyncerState int

func (SyncerState) String

func (ss SyncerState) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL