Documentation ¶
Index ¶
- Constants
- Variables
- func Deregister(instance *dm.Instance) error
- func GetService(serviceName string) (*dm.Service, error)
- func GetServiceNames() ([]string, error)
- func InitServiceBook(opts ...OpOption) error
- func InitSessionBook()
- func KeepAlive(instance *dm.Instance, registryTTL time.Duration) error
- func Polls(stream pb.Discovery_PollsServer) error
- func Register(instance *dm.Instance, registryTTL time.Duration) error
- func SetProperty(property *dm.Property) error
- type OpOption
- type Server
- func (s *Server) Deregister(ctx context.Context, req *pb.DeregisterReq) (*empty.Empty, error)
- func (s *Server) KeepAlive(ctx context.Context, req *pb.KeepAliveReq) (*empty.Empty, error)
- func (s *Server) Polls(stream pb.Discovery_PollsServer) error
- func (s *Server) Register(ctx context.Context, req *pb.RegisterReq) (*empty.Empty, error)
- type ServiceBook
- type ServiceBooker
- type Session
- type SessionBook
- type SessionBooker
Constants ¶
View Source
const (
ErrCodeInternal = 500 //内部错误
)
Variables ¶
View Source
var ( ErrServiceNotFound = errors.New("service not found") ErrZoneNotFound = errors.New("zone not found") ErrInstanceNotFound = errors.New("instance not found") ErrInvalidEventData = errors.New("event data is invalid") )
View Source
var (
ErrEmptyService = errors.New("service is empty")
)
Functions ¶
func InitServiceBook ¶
初始化services 如果discovery server重启后,service的version字段也需要初始化,目前etcd版本中,读取key的最大modVersion作为service version
func InitSessionBook ¶
func InitSessionBook()
Types ¶
type Server ¶
type Server struct{}
func (*Server) Deregister ¶
服务解除注册接口,服务下线时调用
type ServiceBook ¶
所有服务列表
func (*ServiceBook) FindInstance ¶
func (book *ServiceBook) FindInstance(serviceName, zone, id string) (*dm.Instance, error)
从ServiceBook中查找对应的instance
func (*ServiceBook) GetUpgradedServices ¶
func (book *ServiceBook) GetUpgradedServices(serviceVersions map[string]int64, reconnect bool) map[string]*dm.Service
根据版本号获取已更新的service,reconnect为true时,代表重新建立连接,此时推全量数据 TODO reconnect
func (*ServiceBook) Watch ¶
func (book *ServiceBook) Watch(discoverySessionBook SessionBooker)
监听服务配置
type ServiceBooker ¶
type Session ¶
type Session struct { //唯一id,server端接收到请求时生成 Id string //stream流信息 Stream pb.Discovery_PollsServer //订阅者 Subscriber string //订阅的服务列表 ServiceNames []string //变动的服务列表 Services chan map[string]*dm.Service //session.Loop消费 CloseCh chan struct{} //error,主要由stream Recv、Send产生,在rpc接口中消费,一旦出错,则关闭当前session ErrCh chan error }
会话信息
func NewSession ¶
func NewSession(subscriber string, stream pb.Discovery_PollsServer, serviceNames []string, serviceBook ServiceBooker) *Session
创建session,生成唯一的id
func (*Session) CheckUpgradedService ¶
func (session *Session) CheckUpgradedService(serviceBook ServiceBooker, req *pb.PollsReq, reconnect bool)
根据client的请求参数,检查需要更新的service,如果reconnect时,推全量数据
type SessionBook ¶
会话列表信息
func (*SessionBook) GetSubscribers ¶
func (book *SessionBook) GetSubscribers(serviceName string) ([]string, error)
获取某个服务的订阅列表,如果后台需要改数据,需要考虑分布式多节点情况,最好周期存库,方便处理
func (*SessionBook) Push ¶
func (book *SessionBook) Push(upgradeServices map[string]*dm.Service) error
推送service变更
func (book *SessionBook) Push(serviceName string) error { book.RLock() defer book.RUnlock() sessions, ok := book.sessions[serviceName] if !ok { return nil } //获取已经更新的服务配置项 upgradeServices := registryServiceBook.GetUpgradedServices(map[string]int64{serviceName: 0}, false) if len(upgradeServices) == 0 { logger.Logex.Error("SessionBook Push GetUpgradedSe"+ "rvices error:", serviceName) return ErrEmptyService } for _, session := range sessions { session.Services <- upgradeServices } return nil }
减少外部依赖,便于添加单元测试,同时支持批量push
Click to show internal directories.
Click to hide internal directories.