Documentation ¶
Index ¶
- Constants
- Variables
- type BinlogProgress
- type BinlogServer
- func (s *BinlogServer) CheckGtidSet(flavor string, slaveExecutedGtidSet gomysql.GTIDSet) error
- func (s *BinlogServer) DumpBinlogAt(ctx context.Context, startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet, ...) error
- func (s *BinlogServer) GetFde(preGtidEventIndex uint64) ([]byte, error)
- func (s *BinlogServer) GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error)
- func (s *BinlogServer) GetMasterInfo() (*mysql.MasterInfo, error)
- func (s *BinlogServer) GetMySQLDumpAt(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)
- func (s *BinlogServer) GetNextBinlogFile(startRaftIndex uint64) (string, error)
- func (s *BinlogServer) GetSlaves() map[string]*mysql.Slave
- func (s *BinlogServer) LastBinlogFile() string
- func (s *BinlogServer) LastFilePosition() uint32
- func (s *BinlogServer) RegisterSlave(slave *mysql.Slave) error
- func (s *BinlogServer) Start()
- func (s *BinlogServer) Stop()
- func (s *BinlogServer) UnregisterSlave(uuid string)
- type KingbusInfo
- type KingbusServer
- func (s *KingbusServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
- func (s *KingbusServer) AppliedIndex() uint64
- func (s *KingbusServer) CommittedIndex() uint64
- func (s *KingbusServer) CurrentGtidStr() string
- func (s *KingbusServer) ExecutedGtidSetStr() string
- func (s *KingbusServer) GetIP() string
- func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{}
- func (s *KingbusServer) ID() types.ID
- func (s *KingbusServer) IsBinlogServerStarted() bool
- func (s *KingbusServer) IsIDRemoved(id uint64) bool
- func (s *KingbusServer) IsLeader() bool
- func (s *KingbusServer) IsSyncerStarted() bool
- func (s *KingbusServer) LastBinlogFile() string
- func (s *KingbusServer) LastFilePosition() uint32
- func (s *KingbusServer) Leader() types.ID
- func (s *KingbusServer) Process(ctx context.Context, m raftpb.Message) error
- func (s *KingbusServer) Propose(data []byte) error
- func (s *KingbusServer) ProposeWithRetry(ctx context.Context, data []byte)
- func (s *KingbusServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
- func (s *KingbusServer) ReportSnapshot(id uint64, status etcdraft.SnapshotStatus)
- func (s *KingbusServer) ReportUnreachable(id uint64)
- func (s *KingbusServer) Run()
- func (s *KingbusServer) StartProposeBinlog(ctx context.Context)
- func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error
- func (s *KingbusServer) Stop()
- func (s *KingbusServer) StopServer(svrType config.SubServerType)
- func (s *KingbusServer) SyncAdminURL()
- func (s *KingbusServer) Term() uint64
- func (s *KingbusServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
- type PrometheusServer
- type Syncer
- func (s *Syncer) BinlogEventC() chan *storagepb.BinlogEvent
- func (s *Syncer) DataC() chan []byte
- func (s *Syncer) Execute(cmd string, args ...interface{}) (rr *gomysql.Result, err error)
- func (s *Syncer) GetMasterInfo() error
- func (s *Syncer) Start(gset gomysql.GTIDSet) error
- func (s *Syncer) Stop()
Constants ¶
const ( // HealthInterval is the minimum time the cluster should be healthy // before accepting add member requests. HealthInterval = 5 * time.Second //DialTimeout is the timeout of dial DialTimeout = 5 * time.Second //ProposeMaxRetryCount is the max count of propose retry times ProposeMaxRetryCount = 1000 )
Variables ¶
var ( //ErrStopped return for kingbus server stopped ErrStopped = errors.New("kingbus: server stopped") //ErrStarted return for kingbus server started ErrStarted = errors.New("kingbus: server already started") //ErrCanceled return for request be cancelled ErrCanceled = errors.New("kingbus: request cancelled") //ErrTimeout return for request timed out ErrTimeout = errors.New("kingbus: request timed out") //ErrTimeoutDueToLeaderFail return for request timed out,possibly due to previous leader failure ErrTimeoutDueToLeaderFail = errors.New("kingbus: request timed out, possibly due to previous leader failure") //ErrTimeoutDueToConnectionLost return for request timed out,possibly due to connection lost ErrTimeoutDueToConnectionLost = errors.New("kingbus: request timed out, possibly due to connection lost") //ErrNotEnoughStartedMembers return for re-configuration failed due to not enough started members ErrNotEnoughStartedMembers = errors.New("kingbus: re-configuration failed due to not enough started members") //ErrRequestTooLarge return for request is too large ErrRequestTooLarge = errors.New("kingbus: request is too large") //ErrNoSpace return for no space ErrNoSpace = errors.New("kingbus: no space") //ErrTooManyRequests return for too many requests ErrTooManyRequests = errors.New("kingbus: too many requests") //ErrUnhealthy return for unhealthy cluster ErrUnhealthy = errors.New("kingbus: unhealthy cluster") //ErrKeyNotFound return for key not found ErrKeyNotFound = errors.New("kingbus: key not found") //ErrCorrupt return for corrupt cluster ErrCorrupt = errors.New("kingbus: corrupt cluster") //ErrUnsupport return for apply unsupport event type ErrUnsupport = errors.New("apply unsupport event type") //ErrArgs return for args are not available ErrArgs = errors.New("binlog_server:args are not available") //ErrNotContain return for slave gtidset not contain in master ErrNotContain = errors.New("binlog_server:slave gtidset not contain in master") //ErrUUIDIsNull return for the uuid of server is null ErrUUIDIsNull = errors.New("binlog_server:the uuid of server is null") )
Functions ¶
This section is empty.
Types ¶
type BinlogProgress ¶
type BinlogProgress struct {
// contains filtered or unexported fields
}
BinlogProgress is the progress of receiving binlog
func (*BinlogProgress) CurrentGtidStr ¶
func (s *BinlogProgress) CurrentGtidStr() string
CurrentGtidStr get the current gtid
func (*BinlogProgress) ExecutedGtidSetClone ¶
func (s *BinlogProgress) ExecutedGtidSetClone() gomysql.GTIDSet
ExecutedGtidSetClone geth the executed gtid set clone, deep copy
func (*BinlogProgress) ExecutedGtidSetStr ¶
func (s *BinlogProgress) ExecutedGtidSetStr() string
ExecutedGtidSetStr get the executed gtid set
func (*BinlogProgress) LastBinlogFile ¶
func (s *BinlogProgress) LastBinlogFile() string
LastBinlogFile get the last file of binlog event
func (*BinlogProgress) LastFilePosition ¶
func (s *BinlogProgress) LastFilePosition() uint32
LastFilePosition get the last file position of binlog event
type BinlogServer ¶
type BinlogServer struct {
// contains filtered or unexported fields
}
BinlogServer is a binlog server,send binlog event to slave. The generic process: 1.authentication SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM' SET @master_binlog_checksum='NONE' SET @master_heartbeat_period=%d 2.COM_REGISTER_SLAVE 3.semi-sync: SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'; SET @rpl_semi_sync_slave = 1 4.COM_BINLOG_DUMP_GTID
func NewBinlogServer ¶
func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error)
NewBinlogServer create a binlog server
func (*BinlogServer) CheckGtidSet ¶
func (s *BinlogServer) CheckGtidSet(flavor string, slaveExecutedGtidSet gomysql.GTIDSet) error
CheckGtidSet the illegal of the slave executed gtid
func (*BinlogServer) DumpBinlogAt ¶
func (s *BinlogServer) DumpBinlogAt(ctx context.Context, startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet, eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error
DumpBinlogAt implements dump binlog event by slave executed gtid set
func (*BinlogServer) GetFde ¶
func (s *BinlogServer) GetFde(preGtidEventIndex uint64) ([]byte, error)
GetFde get the FORMAT_DESCRIPTION_EVENT by previous gtids log event raft index
func (*BinlogServer) GetGtidSet ¶
GetGtidSet get gtid set
func (*BinlogServer) GetMasterInfo ¶
func (s *BinlogServer) GetMasterInfo() (*mysql.MasterInfo, error)
GetMasterInfo get the master information connected by syncer
func (*BinlogServer) GetMySQLDumpAt ¶
func (s *BinlogServer) GetMySQLDumpAt(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)
GetMySQLDumpAt return raft index, dump binlog event from this position TODO if gtids is empty set, and previous_gtids also is empty, ok!
func (*BinlogServer) GetNextBinlogFile ¶
func (s *BinlogServer) GetNextBinlogFile(startRaftIndex uint64) (string, error)
GetNextBinlogFile get next binlog file by raft index
func (*BinlogServer) GetSlaves ¶
func (s *BinlogServer) GetSlaves() map[string]*mysql.Slave
GetSlaves get all slaves connected the binlog server
func (*BinlogServer) LastBinlogFile ¶
func (s *BinlogServer) LastBinlogFile() string
LastBinlogFile return the last binlog file
func (*BinlogServer) LastFilePosition ¶
func (s *BinlogServer) LastFilePosition() uint32
LastFilePosition return the last binlog file position
func (*BinlogServer) RegisterSlave ¶
func (s *BinlogServer) RegisterSlave(slave *mysql.Slave) error
RegisterSlave implements register slave into binlog server
func (*BinlogServer) UnregisterSlave ¶
func (s *BinlogServer) UnregisterSlave(uuid string)
UnregisterSlave unregister slave by uuid
type KingbusInfo ¶
type KingbusInfo interface { AppliedIndex() uint64 LastBinlogFile() string LastFilePosition() uint32 ExecutedGtidSetStr() string }
KingbusInfo get the kingbus server information
type KingbusServer ¶
type KingbusServer struct { Cfg *config.KingbusServerConfig //lock for leadElectedTime Mu sync.RWMutex // contains filtered or unexported fields }
KingbusServer is a instance run all sub servers
func NewKingbusServer ¶
func NewKingbusServer(cfg *config.KingbusServerConfig) (*KingbusServer, error)
NewKingbusServer create a kingbus server
func (*KingbusServer) AddMember ¶
func (s *KingbusServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
AddMember member into raft cluster, only executed in lead node
func (*KingbusServer) AppliedIndex ¶
func (s *KingbusServer) AppliedIndex() uint64
AppliedIndex get applied index
func (*KingbusServer) CommittedIndex ¶
func (s *KingbusServer) CommittedIndex() uint64
CommittedIndex get committed index
func (*KingbusServer) CurrentGtidStr ¶
func (s *KingbusServer) CurrentGtidStr() string
CurrentGtidStr return current gtid
func (*KingbusServer) ExecutedGtidSetStr ¶
func (s *KingbusServer) ExecutedGtidSetStr() string
ExecutedGtidSetStr return executed gtid
func (*KingbusServer) GetServerStatus ¶
func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{}
GetServerStatus get the sub server status
func (*KingbusServer) IsBinlogServerStarted ¶
func (s *KingbusServer) IsBinlogServerStarted() bool
IsBinlogServerStarted return if binlog server started
func (*KingbusServer) IsIDRemoved ¶
func (s *KingbusServer) IsIDRemoved(id uint64) bool
IsIDRemoved return if the kingbus has been removed
func (*KingbusServer) IsLeader ¶
func (s *KingbusServer) IsLeader() bool
IsLeader return the node is lead
func (*KingbusServer) IsSyncerStarted ¶
func (s *KingbusServer) IsSyncerStarted() bool
IsSyncerStarted return if syncer started
func (*KingbusServer) LastBinlogFile ¶
func (s *KingbusServer) LastBinlogFile() string
LastBinlogFile return last binlog file
func (*KingbusServer) LastFilePosition ¶
func (s *KingbusServer) LastFilePosition() uint32
LastFilePosition return last binlog file position
func (*KingbusServer) Leader ¶
func (s *KingbusServer) Leader() types.ID
Leader get raft cluster leader
func (*KingbusServer) Process ¶
Process takes a raft message and applies it to the server's raft state machine, respecting any timeout of the given context.
func (*KingbusServer) Propose ¶
func (s *KingbusServer) Propose(data []byte) error
Propose data,only execute in lead node,follower node will be forbidden
func (*KingbusServer) ProposeWithRetry ¶
func (s *KingbusServer) ProposeWithRetry(ctx context.Context, data []byte)
ProposeWithRetry implements propose data with retry
func (*KingbusServer) RemoveMember ¶
func (s *KingbusServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
RemoveMember member from raft cluster, only executed in lead node
func (*KingbusServer) ReportSnapshot ¶
func (s *KingbusServer) ReportSnapshot(id uint64, status etcdraft.SnapshotStatus)
ReportSnapshot reports snapshot sent status to the raft state machine, and clears the used snapshot from the snapshot store.
func (*KingbusServer) ReportUnreachable ¶
func (s *KingbusServer) ReportUnreachable(id uint64)
ReportUnreachable report unreachable
func (*KingbusServer) StartProposeBinlog ¶
func (s *KingbusServer) StartProposeBinlog(ctx context.Context)
StartProposeBinlog start propose binlog event
func (*KingbusServer) StartServer ¶
func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error
StartServer start sub servers:syncer server or binlog master server
func (*KingbusServer) StopServer ¶
func (s *KingbusServer) StopServer(svrType config.SubServerType)
StopServer stop sub server
func (*KingbusServer) SyncAdminURL ¶
func (s *KingbusServer) SyncAdminURL()
SyncAdminURL sync the admin url between raft cluster since only propose data in lead node, other followers should wait apply the lead admin url,then send http api request to lead admin url.
func (*KingbusServer) UpdateMember ¶
func (s *KingbusServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
UpdateMember member in raft cluster, only executed in lead node
type PrometheusServer ¶
type PrometheusServer struct {
// contains filtered or unexported fields
}
PrometheusServer provides a container with config parameters for the Prometheus Exporter
func NewPrometheusServer ¶
func NewPrometheusServer(addr string, r metrics.Registry, promRegistry prometheus.Registerer, FlushInterval time.Duration) *PrometheusServer
NewPrometheusServer returns a Provider that produces Prometheus metrics. Namespace and subsystem are applied to all produced metrics.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer is a mock mysql slave, and receive binlog from master and propose event into raft cluster
func (*Syncer) BinlogEventC ¶
func (s *Syncer) BinlogEventC() chan *storagepb.BinlogEvent
BinlogEventC get binlogEvent channel