Documentation ¶
Index ¶
- Variables
- func NewGrpcPushCenter(store *core.NacosDataStorage, sender Sender) (core.PushCenter, error)
- func ValueClientIP(ctx context.Context) string
- func ValueConnID(ctx context.Context) string
- func WithAuthSvr(userSvr auth.UserServer, checkerSvr auth.StrategyServer) option
- func WithConnLimitConfig(connLimitConfig *connlimit.Config) option
- func WithDiscoverSvr(discoverSvr service.DiscoverServer) option
- func WithHealthSvr(healthSvr *healthcheck.Server) option
- func WithNamespaceSvr(namespaceSvr namespace.NamespaceOperateServer) option
- func WithOriginDiscoverSvr(discoverSvr service.DiscoverServer) option
- func WithTLS(tlsInfo *secure.TLSInfo) option
- func WithVirtualStreamBaseServer(server *NacosV2Server) initVirtualStream
- func WithVirtualStreamLogger(log *commonlog.Scope) initVirtualStream
- func WithVirtualStreamMethod(method string) initVirtualStream
- func WithVirtualStreamPostProcessFunc(postprocess PostProcessFunc) initVirtualStream
- func WithVirtualStreamPreProcessFunc(preprocess PreProcessFunc) initVirtualStream
- func WithVirtualStreamServerStream(stream grpc.ServerStream) initVirtualStream
- type Checker
- type CheckerLeaderSubscriber
- type Client
- type ClientConnectionInterceptor
- type ClientIPKey
- type ClientInFlights
- type ConnIDKey
- type ConnectionClient
- type ConnectionClientManager
- type ConnectionEvent
- type ConnectionInfoKey
- type ConnectionManager
- func (h *ConnectionManager) GetClient(id string) (*Client, bool)
- func (h *ConnectionManager) GetClientByAddr(addr string) (*Client, bool)
- func (h *ConnectionManager) GetStream(connID string) *SyncServerStream
- func (h *ConnectionManager) HandleConn(ctx context.Context, s stats.ConnStats)
- func (h *ConnectionManager) HandleRPC(ctx context.Context, s stats.RPCStats)
- func (h *ConnectionManager) OnAccept(conn net.Conn)
- func (h *ConnectionManager) OnClose()
- func (h *ConnectionManager) OnRelease(conn net.Conn)
- func (h *ConnectionManager) RefreshClient(ctx context.Context)
- func (h *ConnectionManager) RegisterConnection(ctx context.Context, payload *nacospb.Payload, ...) error
- func (h *ConnectionManager) TagConn(ctx context.Context, connInfo *stats.ConnTagInfo) context.Context
- func (h *ConnectionManager) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context
- func (h *ConnectionManager) UnRegisterConnection(connID string)
- type ConnectionMeta
- type EventType
- type GRPCNotifier
- type GrpcPushCenter
- type InFlight
- type InFlights
- type NacosV2Server
- func (h *NacosV2Server) AllowAccess(method string) bool
- func (h *NacosV2Server) ConvertContext(ctx context.Context) context.Context
- func (n *NacosV2Server) DescribeConnectionDetail(req *restful.Request, rsp *restful.Response)
- func (n *NacosV2Server) DescribeConnections(req *restful.Request, rsp *restful.Response)
- func (h *NacosV2Server) EnterRatelimit(ip string, method string) uint32
- func (h *NacosV2Server) GetPort() uint32
- func (h *NacosV2Server) GetProtocol() string
- func (h *NacosV2Server) HandleClientConnect(ctx context.Context, client *ConnectionClient)
- func (h *NacosV2Server) HandleClientDisConnect(ctx context.Context, client *ConnectionClient)
- func (h *NacosV2Server) Initialize(ctx context.Context, option map[string]interface{}, port uint32, ...) error
- func (h *NacosV2Server) MarshalPayload(valu interface{}) (*nacospb.Payload, error)
- func (n *NacosV2Server) RegistryDebugRoute()
- func (h *NacosV2Server) Request(ctx context.Context, payload *nacospb.Payload) (*nacospb.Payload, error)
- func (h *NacosV2Server) RequestBiStream(svr nacospb.BiRequestStream_RequestBiStreamServer) error
- func (h *NacosV2Server) Run(errCh chan error)
- func (h *NacosV2Server) Stop(protocol string)
- func (h *NacosV2Server) UnmarshalPayload(payload *nacospb.Payload) (RequestHandler, nacospb.CustomerPayload, error)
- type PostProcessFunc
- type PreProcessFunc
- type RequestHandler
- type RequestHandlerWarrper
- type Sender
- type SyncServerStream
- type VirtualStream
- func (v *VirtualStream) Context() context.Context
- func (v *VirtualStream) RecvMsg(m interface{}) error
- func (v *VirtualStream) SendHeader(md metadata.MD) error
- func (v *VirtualStream) SendMsg(m interface{}) error
- func (v *VirtualStream) SetHeader(md metadata.MD) error
- func (v *VirtualStream) SetTrailer(md metadata.MD)
Constants ¶
This section is empty.
Variables ¶
var ( ErrorNoSuchPayloadType = errors.New("not such payload type") ErrorInvalidRequestBodyType = errors.New("invalid request body type") )
Functions ¶
func NewGrpcPushCenter ¶
func NewGrpcPushCenter(store *core.NacosDataStorage, sender Sender) (core.PushCenter, error)
func ValueClientIP ¶
func ValueConnID ¶
func WithAuthSvr ¶
func WithAuthSvr(userSvr auth.UserServer, checkerSvr auth.StrategyServer) option
func WithConnLimitConfig ¶
func WithDiscoverSvr ¶
func WithDiscoverSvr(discoverSvr service.DiscoverServer) option
func WithHealthSvr ¶
func WithHealthSvr(healthSvr *healthcheck.Server) option
func WithNamespaceSvr ¶
func WithNamespaceSvr(namespaceSvr namespace.NamespaceOperateServer) option
func WithOriginDiscoverSvr ¶
func WithOriginDiscoverSvr(discoverSvr service.DiscoverServer) option
func WithVirtualStreamBaseServer ¶
func WithVirtualStreamBaseServer(server *NacosV2Server) initVirtualStream
WithVirtualStreamBaseServer 设置 BaseGrpcServer
func WithVirtualStreamLogger ¶
WithVirtualStreamLogger 设置 Logger
func WithVirtualStreamMethod ¶
func WithVirtualStreamMethod(method string) initVirtualStream
WithVirtualStreamMethod 设置 method
func WithVirtualStreamPostProcessFunc ¶
func WithVirtualStreamPostProcessFunc(postprocess PostProcessFunc) initVirtualStream
WithVirtualStreamPostProcessFunc 设置 PostProcessFunc
func WithVirtualStreamPreProcessFunc ¶
func WithVirtualStreamPreProcessFunc(preprocess PreProcessFunc) initVirtualStream
WithVirtualStreamPreProcessFunc 设置 PreProcessFunc
func WithVirtualStreamServerStream ¶
func WithVirtualStreamServerStream(stream grpc.ServerStream) initVirtualStream
WithVirtualStreamServerStream 设置 grpc.ServerStream
Types ¶
type Checker ¶
type Checker struct {
// contains filtered or unexported fields
}
func (*Checker) OnBatchCreated ¶
func (c *Checker) OnBatchCreated(value interface{})
OnBatchCreated callback when cache value created
func (*Checker) OnBatchDeleted ¶
func (c *Checker) OnBatchDeleted(value interface{})
OnBatchDeleted callback when cache value deleted
func (*Checker) OnBatchUpdated ¶
func (c *Checker) OnBatchUpdated(value interface{})
OnBatchUpdated callback when cache value updated
func (*Checker) OnCreated ¶
func (c *Checker) OnCreated(value interface{})
OnCreated callback when cache value created
type CheckerLeaderSubscriber ¶
type CheckerLeaderSubscriber struct {
// contains filtered or unexported fields
}
CheckerLeaderSubscriber
func (*CheckerLeaderSubscriber) OnEvent ¶
func (c *CheckerLeaderSubscriber) OnEvent(ctx context.Context, i interface{}) error
OnEvent event trigger
func (*CheckerLeaderSubscriber) PreProcess ¶
func (c *CheckerLeaderSubscriber) PreProcess(ctx context.Context, value any) any
PreProcess do preprocess logic for event
type Client ¶
type Client struct { ID string `json:"id"` Addr *net.TCPAddr `json:"addr"` ConnMeta ConnectionMeta `json:"conn_meta"` // contains filtered or unexported fields }
Client
type ClientConnectionInterceptor ¶
type ClientConnectionInterceptor interface { // HandleClientConnect . HandleClientConnect(ctx context.Context, client *ConnectionClient) // HandleClientDisConnect . HandleClientDisConnect(ctx context.Context, client *ConnectionClient) }
ClientConnectionInterceptor
type ClientIPKey ¶
type ClientIPKey struct{}
type ClientInFlights ¶
type ClientInFlights struct {
// contains filtered or unexported fields
}
ClientInFlights
type ConnectionClient ¶
type ConnectionClient struct { // ConnID 物理连接唯一ID标识 ConnID string // PublishInstances 这个连接上发布的实例信息 PublishInstances map[model.ServiceKey]map[string]struct{} // contains filtered or unexported fields }
ConnectionClient .
func (*ConnectionClient) Destroy ¶
func (c *ConnectionClient) Destroy()
func (*ConnectionClient) RangePublishInstance ¶
func (c *ConnectionClient) RangePublishInstance(f func(svc model.ServiceKey, ids []string))
type ConnectionClientManager ¶
type ConnectionClientManager struct {
// contains filtered or unexported fields
}
ConnectionClientManager
func (*ConnectionClientManager) OnEvent ¶
func (c *ConnectionClientManager) OnEvent(ctx context.Context, a any) error
OnEvent event process logic
func (*ConnectionClientManager) PreProcess ¶
func (cm *ConnectionClientManager) PreProcess(_ context.Context, a any) any
PreProcess do preprocess logic for event
type ConnectionEvent ¶
ConnectionEvent
type ConnectionInfoKey ¶
type ConnectionInfoKey struct{}
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
func (*ConnectionManager) GetClient ¶
func (h *ConnectionManager) GetClient(id string) (*Client, bool)
func (*ConnectionManager) GetClientByAddr ¶
func (h *ConnectionManager) GetClientByAddr(addr string) (*Client, bool)
func (*ConnectionManager) GetStream ¶
func (h *ConnectionManager) GetStream(connID string) *SyncServerStream
func (*ConnectionManager) HandleConn ¶
func (h *ConnectionManager) HandleConn(ctx context.Context, s stats.ConnStats)
HandleConn processes the Conn stats.
func (*ConnectionManager) HandleRPC ¶
func (h *ConnectionManager) HandleRPC(ctx context.Context, s stats.RPCStats)
HandleRPC processes the RPC stats.
func (*ConnectionManager) OnAccept ¶
func (h *ConnectionManager) OnAccept(conn net.Conn)
OnAccept call when net.Conn accept
func (*ConnectionManager) OnClose ¶
func (h *ConnectionManager) OnClose()
OnClose call when net.Listener close
func (*ConnectionManager) OnRelease ¶
func (h *ConnectionManager) OnRelease(conn net.Conn)
OnRelease call when net.Conn release
func (*ConnectionManager) RefreshClient ¶
func (h *ConnectionManager) RefreshClient(ctx context.Context)
func (*ConnectionManager) RegisterConnection ¶
func (h *ConnectionManager) RegisterConnection(ctx context.Context, payload *nacospb.Payload, req *nacospb.ConnectionSetupRequest) error
func (*ConnectionManager) TagConn ¶
func (h *ConnectionManager) TagConn(ctx context.Context, connInfo *stats.ConnTagInfo) context.Context
TagConn can attach some information to the given context. The returned context will be used for stats handling. For conn stats handling, the context used in HandleConn for this connection will be derived from the context returned. For RPC stats handling,
- On server side, the context used in HandleRPC for all RPCs on this
connection will be derived from the context returned.
- On client side, the context is not derived from the context returned.
func (*ConnectionManager) TagRPC ¶
func (h *ConnectionManager) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context
TagRPC can attach some information to the given context. The context used for the rest lifetime of the RPC will be derived from the returned context.
func (*ConnectionManager) UnRegisterConnection ¶
func (h *ConnectionManager) UnRegisterConnection(connID string)
type ConnectionMeta ¶
type ConnectionMeta struct { ConnectType string ClientIp string RemoteIp string RemotePort int LocalPort int Version string ConnectionId string CreateTime time.Time AppName string Tenant string Labels map[string]string ClientAttributes nacospb.ClientAbilities }
ConnectionMeta
func ValueConnMeta ¶
func ValueConnMeta(ctx context.Context) ConnectionMeta
type EventType ¶
type EventType int32
const ( ClientConnectionEvent = "ClientConnectionEvent" EventClientConnected EventType EventClientDisConnected )
type GRPCNotifier ¶
type GRPCNotifier struct {
// contains filtered or unexported fields
}
func (*GRPCNotifier) Close ¶
func (c *GRPCNotifier) Close() error
func (*GRPCNotifier) IsZombie ¶
func (c *GRPCNotifier) IsZombie() bool
type GrpcPushCenter ¶
type GrpcPushCenter struct { *core.BasePushCenter // contains filtered or unexported fields }
func (*GrpcPushCenter) AddSubscriber ¶
func (p *GrpcPushCenter) AddSubscriber(s core.Subscriber)
func (*GrpcPushCenter) EnablePush ¶
func (p *GrpcPushCenter) EnablePush(s core.Subscriber) bool
func (*GrpcPushCenter) RemoveSubscriber ¶
func (p *GrpcPushCenter) RemoveSubscriber(s core.Subscriber)
func (*GrpcPushCenter) Type ¶
func (p *GrpcPushCenter) Type() core.PushType
type InFlight ¶
type InFlight struct { ConnID string RequestID string Callback func(nacospb.BaseResponse, error) }
InFlight
type InFlights ¶
type InFlights struct {
// contains filtered or unexported fields
}
InFlights
func (*InFlights) AddInFlight ¶
AddInFlight 添加一个待回调通知的 InFligjt
func (*InFlights) NotifyInFlight ¶
func (i *InFlights) NotifyInFlight(connID string, resp nacospb.BaseResponse)
type NacosV2Server ¶
NacosV2Server gRPC API服务器
func NewNacosV2Server ¶
func NewNacosV2Server(v1Svr *v1.NacosV1Server, store *core.NacosDataStorage, options ...option) *NacosV2Server
func (*NacosV2Server) AllowAccess ¶
func (h *NacosV2Server) AllowAccess(method string) bool
AllowAccess api allow access
func (*NacosV2Server) ConvertContext ¶
func (h *NacosV2Server) ConvertContext(ctx context.Context) context.Context
ConvertContext 将GRPC上下文转换成内部上下文
func (*NacosV2Server) DescribeConnectionDetail ¶
func (n *NacosV2Server) DescribeConnectionDetail(req *restful.Request, rsp *restful.Response)
DescribeConnectionDetail 查询某一个连接ID的详细信息
func (*NacosV2Server) DescribeConnections ¶
func (n *NacosV2Server) DescribeConnections(req *restful.Request, rsp *restful.Response)
DescribeConnections 查询 V2 客户端的连接
func (*NacosV2Server) EnterRatelimit ¶
func (h *NacosV2Server) EnterRatelimit(ip string, method string) uint32
EnterRatelimit api ratelimit
func (*NacosV2Server) GetPort ¶
func (h *NacosV2Server) GetPort() uint32
GetPort get the connector listen port value
func (*NacosV2Server) GetProtocol ¶
func (h *NacosV2Server) GetProtocol() string
GetProtocol 获取Server的协议
func (*NacosV2Server) HandleClientConnect ¶
func (h *NacosV2Server) HandleClientConnect(ctx context.Context, client *ConnectionClient)
func (*NacosV2Server) HandleClientDisConnect ¶
func (h *NacosV2Server) HandleClientDisConnect(ctx context.Context, client *ConnectionClient)
func (*NacosV2Server) Initialize ¶
func (h *NacosV2Server) Initialize(ctx context.Context, option map[string]interface{}, port uint32, apiConf map[string]apiserver.APIConfig) error
Initialize 初始化HTTP API服务器
func (*NacosV2Server) MarshalPayload ¶
func (h *NacosV2Server) MarshalPayload(valu interface{}) (*nacospb.Payload, error)
MarshalPayload .
func (*NacosV2Server) RegistryDebugRoute ¶
func (n *NacosV2Server) RegistryDebugRoute()
func (*NacosV2Server) RequestBiStream ¶
func (h *NacosV2Server) RequestBiStream(svr nacospb.BiRequestStream_RequestBiStreamServer) error
func (*NacosV2Server) Stop ¶
func (h *NacosV2Server) Stop(protocol string)
Stop stopping the gRPC server
func (*NacosV2Server) UnmarshalPayload ¶
func (h *NacosV2Server) UnmarshalPayload(payload *nacospb.Payload) (RequestHandler, nacospb.CustomerPayload, error)
UnmarshalPayload .
type PostProcessFunc ¶
type PostProcessFunc func(stream *VirtualStream, m interface{})
PostProcessFunc postprocess function define
type PreProcessFunc ¶
type PreProcessFunc func(stream *VirtualStream, isPrint bool) error
PreProcessFunc preprocess function define
type RequestHandler ¶
type RequestHandler func(context.Context, nacospb.BaseRequest, nacospb.RequestMeta) (nacospb.BaseResponse, error)
RequestHandler
type RequestHandlerWarrper ¶
type RequestHandlerWarrper struct { Handler RequestHandler PayloadBuilder func() nacospb.CustomerPayload }
RequestHandlerWarrper
type SyncServerStream ¶
type SyncServerStream struct {
// contains filtered or unexported fields
}
SyncServerStream
func (*SyncServerStream) Context ¶
func (s *SyncServerStream) Context() context.Context
Context returns the context for this stream.
func (*SyncServerStream) SendMsg ¶
func (s *SyncServerStream) SendMsg(m interface{}) error
type VirtualStream ¶
type VirtualStream struct { Method string ClientAddress string ClientIP string UserAgent string RequestID string Code int StartTime time.Time // contains filtered or unexported fields }
VirtualStream 虚拟Stream 继承ServerStream
func (*VirtualStream) Context ¶
func (v *VirtualStream) Context() context.Context
Context returns the context for this stream.
func (*VirtualStream) RecvMsg ¶
func (v *VirtualStream) RecvMsg(m interface{}) error
RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the client has performed a CloseSend. On any non-EOF error, the stream is aborted and the error contains the RPC status.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.
func (*VirtualStream) SendHeader ¶
func (v *VirtualStream) SendHeader(md metadata.MD) error
SendHeader sends the header metadata. The provided md and headers set by SetHeader() will be sent. It fails if called multiple times.
func (*VirtualStream) SendMsg ¶
func (v *VirtualStream) SendMsg(m interface{}) error
SendMsg sends a message. On error, SendMsg aborts the stream and the error is returned directly.
SendMsg blocks until:
- There is sufficient flow control to schedule m with the transport, or
- The stream is done, or
- The stream breaks.
SendMsg does not wait until the message is received by the client. An untimely stream closure may result in lost messages.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines.
func (*VirtualStream) SetHeader ¶
func (v *VirtualStream) SetHeader(md metadata.MD) error
SetHeader sets the header metadata. It may be called multiple times. When call multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens:
- ServerStream.SendHeader() is called;
- The first response is sent out;
- An RPC status is sent out (error or success).
func (*VirtualStream) SetTrailer ¶
func (v *VirtualStream) SetTrailer(md metadata.MD)
SetTrailer sets the trailer metadata which will be sent with the RPC status. When called more than once, all the provided metadata will be merged.