Documentation ¶
Index ¶
- type DefaultResolverHandler
- type ResolverHandler
- type ResolverMgrServer
- type ResolverMgrServerAPI
- func (api *ResolverMgrServerAPI) CallMetaServer(serviceMethod string, args any, reply any) error
- func (api *ResolverMgrServerAPI) Shutdown(args *protocol.ResolverShutdownArgs, reply *protocol.ResolverShutdownReply) error
- func (api *ResolverMgrServerAPI) Start(args *protocol.ResolverStartArgs, reply *protocol.ResolverStartReply) error
- type ResolverServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultResolverHandler ¶
type DefaultResolverHandler struct {
// contains filtered or unexported fields
}
ResolverHandler 完成所有 snapshot 的重排序和处理工作, resolver 保证处理是串行的,handler 无需考虑并发问题, 以 data package 为单位进行重排序, 以 snapshot pair 为单位进行处理, 处理结果写入 collector。 - dp 间,根据 day 和 seq 排序(http 和 rpc 乱序,通过 map 维护滑动窗口解决); - dp 内,根据 timestamp 排序(logger 本身乱序,通过 sort 解决); - dp 间,根据 timestamp 排序,两两组装 dp(logger 本身乱序,且乱序之处恰好处于两个 dp 间,通过 skiplist 解决); - 流式读取 snapshot,组装成对; - 处理每一对 snapshot,异步写入 collector;
func NewDefaultResolverHandler ¶
func NewDefaultResolverHandler(exporterTag, metaServerAddr string, dbConf config.DatabaseConfig) *DefaultResolverHandler
func (*DefaultResolverHandler) CallMetaServer ¶
func (h *DefaultResolverHandler) CallMetaServer(serviceMethod string, args any, reply any) error
func (*DefaultResolverHandler) OnReceive ¶
func (h *DefaultResolverHandler) OnReceive(dp protocol.ReceiverDataPackage)
func (*DefaultResolverHandler) OnResolve ¶
func (h *DefaultResolverHandler) OnResolve(snapshotPairList []*snapshotPair)
type ResolverHandler ¶
type ResolverHandler interface { // OnReceive 接收到 dp,实现各种重排序任务,然后将排序好的 snapshot 组装成 pair,写入 stream OnReceive(dp protocol.ReceiverDataPackage) // OnResolve 从 stream 读取 snapshot pair,进行处理工作,并写入 collector OnResolve(snapshotPairList []*snapshotPair) }
ResolverHandler 完成所有 snapshot 的重排序和处理工作, resolver 保证处理是串行的,handler 无需考虑并发问题, 以 data package 为单位进行重排序, 以 snapshot pair 为单位进行处理, 处理结果写入 collector。 - dp 间,根据 day 和 seq 排序(http 和 rpc 乱序,通过 map 维护滑动窗口解决); - dp 内,根据 timestamp 排序(logger 本身乱序,通过 sort 解决); - dp 间,根据 timestamp 排序,两两组装 dp(logger 本身乱序,且乱序之处恰好处于两个 dp 间,通过 skiplist 解决); - 流式读取 snapshot,组装成对; - 处理每一对 snapshot,异步写入 collector;
type ResolverMgrServer ¶
type ResolverMgrServer struct {
// contains filtered or unexported fields
}
func NewResolverMgrServer ¶
func NewResolverMgrServer(addr string, conf config.ResolverMgrConfig) *ResolverMgrServer
func (*ResolverMgrServer) Address ¶
func (s *ResolverMgrServer) Address() string
func (*ResolverMgrServer) Run ¶
func (s *ResolverMgrServer) Run() error
type ResolverMgrServerAPI ¶
ResolverMgrServerAPI 目前只提供同步的接口
func NewResolveMgrServerAPI ¶
func NewResolveMgrServerAPI(conf config.ResolverConfig) *ResolverMgrServerAPI
func (*ResolverMgrServerAPI) CallMetaServer ¶
func (api *ResolverMgrServerAPI) CallMetaServer(serviceMethod string, args any, reply any) error
func (*ResolverMgrServerAPI) Shutdown ¶
func (api *ResolverMgrServerAPI) Shutdown(args *protocol.ResolverShutdownArgs, reply *protocol.ResolverShutdownReply) error
func (*ResolverMgrServerAPI) Start ¶
func (api *ResolverMgrServerAPI) Start(args *protocol.ResolverStartArgs, reply *protocol.ResolverStartReply) error
type ResolverServer ¶
type ResolverServer struct {
// contains filtered or unexported fields
}
func NewResolverServer ¶
func NewResolverServer(conf config.ResolverConfig, resolverTag, exporterTag string, resolverHandler ResolverHandler) *ResolverServer
func (*ResolverServer) CallMetaServer ¶
func (s *ResolverServer) CallMetaServer(serviceMethod string, args any, reply any) error
func (*ResolverServer) CallMqServer ¶
func (s *ResolverServer) CallMqServer(serviceMethod string, args any, reply any) error
func (*ResolverServer) Shutdown ¶
func (s *ResolverServer) Shutdown(lazyShutdown bool)
func (*ResolverServer) Start ¶
func (s *ResolverServer) Start()