Documentation ¶
Index ¶
- Constants
- Variables
- type BeatRecordCache
- type Config
- type ConnectFuncContextKey
- type ConnectPeerFunc
- type HashFunction
- type LeaderHealthChecker
- func (c *LeaderHealthChecker) BatchQuery(ctx context.Context, request *plugin.BatchQueryRequest) (*plugin.BatchQueryResponse, error)
- func (c *LeaderHealthChecker) Check(request *plugin.CheckRequest) (*plugin.CheckResponse, error)
- func (c *LeaderHealthChecker) DebugHandlers() []model.DebugHandler
- func (c *LeaderHealthChecker) Delete(ctx context.Context, key string) error
- func (c *LeaderHealthChecker) Destroy() error
- func (c *LeaderHealthChecker) Initialize(entry *plugin.ConfigEntry) error
- func (c *LeaderHealthChecker) LeaderChangeTimeSec() int64
- func (c *LeaderHealthChecker) Name() string
- func (c *LeaderHealthChecker) OnEvent(ctx context.Context, i interface{}) error
- func (c *LeaderHealthChecker) PreProcess(ctx context.Context, value any) any
- func (c *LeaderHealthChecker) Query(ctx context.Context, request *plugin.QueryRequest) (*plugin.QueryResponse, error)
- func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.ReportRequest) error
- func (c *LeaderHealthChecker) Suspend()
- func (c *LeaderHealthChecker) SuspendTimeSec() int64
- func (c *LeaderHealthChecker) Type() plugin.HealthCheckType
- type LocalBeatRecordCache
- func (lc *LocalBeatRecordCache) Clean()
- func (lc *LocalBeatRecordCache) Del(keys ...string) error
- func (lc *LocalBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error)
- func (lc *LocalBeatRecordCache) Ping() error
- func (lc *LocalBeatRecordCache) Put(records ...WriteBeatRecord) error
- func (lc *LocalBeatRecordCache) Snapshot() map[string]*ReadBeatRecord
- type LocalPeer
- type Peer
- type ReadBeatRecord
- type RecordDelter
- type RecordGetter
- type RecordSaver
- type RecordValue
- type RemoteBeatRecordCache
- func (lc *RemoteBeatRecordCache) Clean()
- func (rc *RemoteBeatRecordCache) Del(key ...string) error
- func (rc *RemoteBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error)
- func (rc *RemoteBeatRecordCache) Ping() error
- func (rc *RemoteBeatRecordCache) Put(records ...WriteBeatRecord) error
- func (lc *RemoteBeatRecordCache) Snapshot() map[string]*ReadBeatRecord
- type RemotePeer
- func (p *RemotePeer) Close() error
- func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) error
- func (p *RemotePeer) GetFunc(req *apiservice.GetHeartbeatsRequest) (*apiservice.GetHeartbeatsResponse, error)
- func (p *RemotePeer) Host() string
- func (p *RemotePeer) Initialize(conf Config)
- func (p *RemotePeer) IsAlive() bool
- func (p *RemotePeer) Ping() error
- func (p *RemotePeer) PutFunc(req *apiservice.HeartbeatsRequest) error
- func (p *RemotePeer) Serve(ctx context.Context, checker *LeaderHealthChecker, listenIP string, ...) error
- func (p *RemotePeer) Storage() BeatRecordCache
- type WriteBeatRecord
Constants ¶
const ( // PluginName plugin name PluginName = "heartbeatLeader" // Servers key to manage hb servers Servers = "servers" // CountSep separator to divide server and count Split = "|" )
Variables ¶
var ( ErrorRedirectOnlyOnce = errors.New("redirect request only once") ErrorLeaderNotInitialize = errors.New("leader checker uninitialize") )
var ( NewLocalPeerFunc = newLocalPeer NewRemotePeerFunc = newRemotePeer ConnectPeer = doConnect CreateBeatClientFunc = createBeatClient )
var ( // DefaultSoltNum default soltNum of LocalBeatRecordCache DefaultSoltNum = int32(runtime.GOMAXPROCS(0) * 16) )
var (
ErrorLeaderNotAlive = errors.New("leader not alive")
)
Functions ¶
This section is empty.
Types ¶
type BeatRecordCache ¶
type BeatRecordCache interface { // Get get records Get(keys ...string) (map[string]*ReadBeatRecord, error) // Put put records Put(records ...WriteBeatRecord) error // Del del records Del(keys ...string) error // Clean . Clean() // Snapshot Snapshot() map[string]*ReadBeatRecord // Ping Ping() error }
BeatRecordCache Heartbeat data cache
type ConnectPeerFunc ¶ added in v1.18.0
type ConnectPeerFunc func(*RemotePeer) error
type HashFunction ¶
HashFunction hash function to caul record id need locate in SegmentMap
type LeaderHealthChecker ¶
type LeaderHealthChecker struct {
// contains filtered or unexported fields
}
LeaderHealthChecker Leader~Follower 节点心跳健康检查 1. 监听 LeaderChangeEvent 事件, 2. LeaderHealthChecker 启动时先根据 store 层的 LeaderElection 选举能力选出一个 Leader 3. Leader 和 Follower 之间建立 gRPC 长连接 4. LeaderHealthChecker 在处理 Report/Query/Check/Delete 先判断自己是否为 Leader
- Leader 节点 a. 心跳数据的读写直接写本地 map 内存
- 非 Leader 节点 a. 心跳写请求通过 gRPC 长连接直接发给 Leader 节点 b. 心跳读请求通过 gRPC 长连接直接发给 Leader 节点,Leader 节点返回心跳时间戳信息
func (*LeaderHealthChecker) BatchQuery ¶ added in v1.18.0
func (c *LeaderHealthChecker) BatchQuery(ctx context.Context, request *plugin.BatchQueryRequest) (*plugin.BatchQueryResponse, error)
func (*LeaderHealthChecker) Check ¶
func (c *LeaderHealthChecker) Check(request *plugin.CheckRequest) (*plugin.CheckResponse, error)
Check process the instance check
func (*LeaderHealthChecker) DebugHandlers ¶
func (c *LeaderHealthChecker) DebugHandlers() []model.DebugHandler
func (*LeaderHealthChecker) Delete ¶
func (c *LeaderHealthChecker) Delete(ctx context.Context, key string) error
Delete delete record by key
func (*LeaderHealthChecker) Initialize ¶
func (c *LeaderHealthChecker) Initialize(entry *plugin.ConfigEntry) error
Initialize .
func (*LeaderHealthChecker) LeaderChangeTimeSec ¶
func (c *LeaderHealthChecker) LeaderChangeTimeSec() int64
func (*LeaderHealthChecker) OnEvent ¶
func (c *LeaderHealthChecker) OnEvent(ctx context.Context, i interface{}) error
OnEvent event trigger
func (*LeaderHealthChecker) PreProcess ¶
func (c *LeaderHealthChecker) PreProcess(ctx context.Context, value any) any
PreProcess do preprocess logic for event
func (*LeaderHealthChecker) Query ¶
func (c *LeaderHealthChecker) Query(ctx context.Context, request *plugin.QueryRequest) (*plugin.QueryResponse, error)
Query queries the heartbeat time
func (*LeaderHealthChecker) Report ¶
func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.ReportRequest) error
Report process heartbeat info report
func (*LeaderHealthChecker) Suspend ¶
func (c *LeaderHealthChecker) Suspend()
Suspend checker for an entire expired interval
func (*LeaderHealthChecker) SuspendTimeSec ¶
func (c *LeaderHealthChecker) SuspendTimeSec() int64
SuspendTimeSec get suspend time in seconds
func (*LeaderHealthChecker) Type ¶
func (c *LeaderHealthChecker) Type() plugin.HealthCheckType
Type for health check plugin, only one same type plugin is allowed
type LocalBeatRecordCache ¶
type LocalBeatRecordCache struct {
// contains filtered or unexported fields
}
LocalBeatRecordCache
func (*LocalBeatRecordCache) Clean ¶
func (lc *LocalBeatRecordCache) Clean()
func (*LocalBeatRecordCache) Del ¶
func (lc *LocalBeatRecordCache) Del(keys ...string) error
func (*LocalBeatRecordCache) Get ¶
func (lc *LocalBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error)
func (*LocalBeatRecordCache) Ping ¶ added in v1.18.0
func (lc *LocalBeatRecordCache) Ping() error
func (*LocalBeatRecordCache) Put ¶
func (lc *LocalBeatRecordCache) Put(records ...WriteBeatRecord) error
func (*LocalBeatRecordCache) Snapshot ¶
func (lc *LocalBeatRecordCache) Snapshot() map[string]*ReadBeatRecord
type LocalPeer ¶
type LocalPeer struct { // Cache data storage Cache BeatRecordCache // contains filtered or unexported fields }
LocalPeer Heartbeat data storage node
func (*LocalPeer) Initialize ¶
func (*LocalPeer) Storage ¶ added in v1.17.1
func (p *LocalPeer) Storage() BeatRecordCache
type Peer ¶
type Peer interface { // Initialize . Initialize(conf Config) // Serve . Serve(ctx context.Context, checker *LeaderHealthChecker, listenIP string, listenPort uint32) error // Close . Close() error // Host . Host() string // Storage . Storage() BeatRecordCache // IsAlive . IsAlive() bool }
Peer peer
type ReadBeatRecord ¶
type ReadBeatRecord struct { Record RecordValue Exist bool }
ReadBeatRecord Heartbeat records read results
type RecordDelter ¶
type RecordDelter func(req *apiservice.DelHeartbeatsRequest) error
RecordDelter beat record delter
type RecordGetter ¶
type RecordGetter func(req *apiservice.GetHeartbeatsRequest) (*apiservice.GetHeartbeatsResponse, error)
RecordGetter beat record getter
type RecordSaver ¶
type RecordSaver func(req *apiservice.HeartbeatsRequest) error
RecordSaver beat record saver
type RecordValue ¶
RecordValue heatrtbeat record value
func (RecordValue) String ¶
func (r RecordValue) String() string
type RemoteBeatRecordCache ¶
type RemoteBeatRecordCache struct {
// contains filtered or unexported fields
}
RemoteBeatRecordCache
func (*RemoteBeatRecordCache) Clean ¶
func (lc *RemoteBeatRecordCache) Clean()
func (*RemoteBeatRecordCache) Del ¶
func (rc *RemoteBeatRecordCache) Del(key ...string) error
func (*RemoteBeatRecordCache) Get ¶
func (rc *RemoteBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error)
func (*RemoteBeatRecordCache) Ping ¶ added in v1.18.0
func (rc *RemoteBeatRecordCache) Ping() error
func (*RemoteBeatRecordCache) Put ¶
func (rc *RemoteBeatRecordCache) Put(records ...WriteBeatRecord) error
func (*RemoteBeatRecordCache) Snapshot ¶
func (lc *RemoteBeatRecordCache) Snapshot() map[string]*ReadBeatRecord
type RemotePeer ¶
type RemotePeer struct {
// contains filtered or unexported fields
}
LocalPeer Heartbeat data storage node
func (*RemotePeer) DelFunc ¶
func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) error
func (*RemotePeer) GetFunc ¶
func (p *RemotePeer) GetFunc(req *apiservice.GetHeartbeatsRequest) (*apiservice.GetHeartbeatsResponse, error)
func (*RemotePeer) Host ¶
func (p *RemotePeer) Host() string
func (*RemotePeer) Initialize ¶
func (p *RemotePeer) Initialize(conf Config)
func (*RemotePeer) IsAlive ¶ added in v1.18.0
func (p *RemotePeer) IsAlive() bool
func (*RemotePeer) Ping ¶ added in v1.18.0
func (p *RemotePeer) Ping() error
func (*RemotePeer) PutFunc ¶
func (p *RemotePeer) PutFunc(req *apiservice.HeartbeatsRequest) error
func (*RemotePeer) Serve ¶
func (p *RemotePeer) Serve(ctx context.Context, checker *LeaderHealthChecker, listenIP string, listenPort uint32) error
func (*RemotePeer) Storage ¶ added in v1.17.1
func (p *RemotePeer) Storage() BeatRecordCache
type WriteBeatRecord ¶
type WriteBeatRecord struct { Record RecordValue Key string }
WriteBeatRecord Heartbeat record operation results