Documentation ¶
Index ¶
- Constants
- Variables
- func CloneClientOutgoingData(ctx context.Context) metadata.MD
- func CloneServerIncomingData(ctx context.Context) metadata.MD
- func ConvertNormalError(svrErr error) (gst *status.Status)
- func GetClientIP(ctx context.Context) (string, error)
- func GetGlobalReqIDFromContext(ctx context.Context) string
- func IsContextError(err error) bool
- func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (resp interface{}, err error)
- func TimingOld(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (resp interface{}, err error)
- func ToErrEcode(gst *status.Status) errcode.Codes
- func TransContextErr2GrpcErr(err error) error
- type AtreusReqId
- type Client
- func (c *Client) AddCliOpt(opts ...grpc.DialOption) *Client
- func (c *Client) BuildUnaryInterceptorChain2() grpc.UnaryClientInterceptor
- func (c *Client) CircuitBreaker() grpc.UnaryClientInterceptor
- func (c *Client) ClientCallTimeout(timeout time.Duration) grpc.UnaryClientInterceptor
- func (c *Client) ClientValidator() grpc.UnaryClientInterceptor
- func (c *Client) DoClientRetry(optFuncs ...retrys.CallOption) grpc.UnaryClientInterceptor
- func (c *Client) IsBreakerNeedError(err error) bool
- func (c *Client) IsBreakerNeedErrorV2(err error) bool
- func (c *Client) OpenTracingForClient() grpc.UnaryClientInterceptor
- func (c *Client) Recovery() grpc.UnaryClientInterceptor
- func (c *Client) Timing() grpc.UnaryClientInterceptor
- func (c *Client) TransError() grpc.UnaryClientInterceptor
- func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client
- type Graceful
- type GracefulGrpcAppserver
- type Limiter
- type Server
- func (s *Server) Authorize() grpc.UnaryServerInterceptor
- func (s *Server) BuildUnaryInterceptorChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor
- func (s *Server) BuildUnaryInterceptorChain2(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, ...) (interface{}, error)
- func (s *Server) ExitWithSignalHandler()
- func (s *Server) GetServer() *grpc.Server
- func (s *Server) Limit(limiter Limiter) grpc.UnaryServerInterceptor
- func (s *Server) LimitStream(limiter Limiter) grpc.StreamServerInterceptor
- func (s *Server) Metrics2Prometheus() grpc.UnaryServerInterceptor
- func (s *Server) OpenTracingForServer() grpc.UnaryServerInterceptor
- func (s *Server) Recovery() grpc.UnaryServerInterceptor
- func (s *Server) ReloadConfig() error
- func (s *Server) RetryChecking() grpc.UnaryServerInterceptor
- func (s *Server) Run() error
- func (s *Server) Serve(lis net.Listener) error
- func (s *Server) ServerDealTimeout(timeout time.Duration) grpc.UnaryServerInterceptor
- func (s *Server) ServerStat() grpc.UnaryServerInterceptor
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) SrcIpFilter() grpc.UnaryServerInterceptor
- func (s *Server) SrvValidator() grpc.UnaryServerInterceptor
- func (s *Server) Timing() grpc.UnaryServerInterceptor
- func (s *Server) TransError() grpc.UnaryServerInterceptor
- func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server
- func (s *Server) UseStreamInterceptor(handlers ...grpc.UnaryServerInterceptor) *Server
- func (s *Server) XRequestId() grpc.UnaryServerInterceptor
- type WMetadata
- func (m *WMetadata) Add(key string, value string)
- func (m WMetadata) Copy() *WMetadata
- func (m *WMetadata) Del(key string)
- func (m *WMetadata) FromIncoming(ctx context.Context) bool
- func (m *WMetadata) FromOutgoing(ctx context.Context) bool
- func (m *WMetadata) Get(key string) string
- func (m *WMetadata) GetArr(key string) []string
- func (m *WMetadata) Set(key string, value string)
- func (m *WMetadata) ToIncoming(ctx context.Context) context.Context
- func (m *WMetadata) ToOutgoing(ctx context.Context) context.Context
- type XRateLimiter
Constants ¶
const ( DefaultAtreusReqIDKey = "atreus-requestid" DefaultAtreusReqIDVal = "atreus-requestid-value" DefaultAtreusReqIDName = "atreus-reqid-name" )
const ( DEFAULT_ATREUS_SERVICE_NAME = "atreus_svc" DEFAULT_TIME_TO_QUIT = 5 * time.Second )
const (
MAX_STACK_SIZE = 2048
)
Variables ¶
var DefaultAtreusReqIDSKey = globalReqIDKey{}
Functions ¶
func CloneClientOutgoingData ¶
metadata API for client
func CloneServerIncomingData ¶
metadata API for server
func ConvertNormalError ¶
ConvertNormalError convert error for service reply and try to convert it to grpc.Status.
func IsContextError ¶
判断 err 是否为 ctx 错误(DeadlineExceeded || Canceled)
func Recovery ¶
func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)
Recovery interceptor:必须放在第 0 号链位置
func TimingOld ¶
func TimingOld(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)
计时(最后一个拦截器)
func ToErrEcode ¶
ToErrEcode convert grpc.status to ecode.Codes 外部接口
Types ¶
type AtreusReqId ¶
type AtreusReqId string
func NewAtreusReqId ¶
func NewAtreusReqId(name string) AtreusReqId
type Client ¶
type Client struct { Logger *zap.Logger Conf *config.AtreusCliConfig //客户端配置 Lock *sync.RWMutex DialOpts []grpc.DialOption //grpc-客户端option InnerHandlers []grpc.UnaryClientInterceptor //GRPC拦截器数组 CliResolver dis.ServiceResolverWrapper //sony breaker CbBreakerMap map[string]*gobreaker.CircuitBreaker CbBreakerConfig gobreaker.Settings //这里暂时全局配置 //retry MaxRetry int RpcPersistClient *grpc.ClientConn // contains filtered or unexported fields }
客户端封装结构
func (*Client) BuildUnaryInterceptorChain2 ¶
func (c *Client) BuildUnaryInterceptorChain2() grpc.UnaryClientInterceptor
实现链式的客户端拦截器
func (*Client) CircuitBreaker ¶
func (c *Client) CircuitBreaker() grpc.UnaryClientInterceptor
客户端熔断拦截器
func (*Client) ClientCallTimeout ¶
func (c *Client) ClientCallTimeout(timeout time.Duration) grpc.UnaryClientInterceptor
客户端超时调用处理
func (*Client) ClientValidator ¶
func (c *Client) ClientValidator() grpc.UnaryClientInterceptor
客户端参数校验
func (*Client) DoClientRetry ¶
func (c *Client) DoClientRetry(optFuncs ...retrys.CallOption) grpc.UnaryClientInterceptor
客户端重试
func (*Client) IsBreakerNeedError ¶
check whether or not error is acceptable,根据服务端错误的返回,来判断哪些错误才进入熔断计算逻辑 https://grpc.github.io/grpc/core/md_doc_statuscodes.html https://github.com/sony/gobreaker/blob/master/gobreaker.go#L113
func (*Client) IsBreakerNeedErrorV2 ¶
如果框架使用grpc的原生错误,那么必须使用status.Code(err)方法对errors进行转换
func (*Client) OpenTracingForClient ¶
func (c *Client) OpenTracingForClient() grpc.UnaryClientInterceptor
用于客户端及服务端的tracing拦截器(jaeger)
func (*Client) Timing ¶
func (c *Client) Timing() grpc.UnaryClientInterceptor
客户端接口调用耗时拦截器 Timing is an interceptor that logs the processing time (for client)
func (*Client) TransError ¶
func (c *Client) TransError() grpc.UnaryClientInterceptor
客户端错误统一处理,将服务端返回的 err 类型(status.Status)统一转换为 errcode.Codes 类型 因为熔断器需要 errcode.Codes 类型
type GracefulGrpcAppserver ¶
type GracefulGrpcAppserver struct { AtreusServer *Server Addr string Listener net.Listener //当前GracefulGrpcAppserver对应的listener Logger *zap.Logger }
func NewGracefulGrpcAppserver ¶
func NewGracefulGrpcAppserver(srv *Server, new_bindaddr string) (*GracefulGrpcAppserver, error)
以GracefulGrpcAppserver启动并创建Listener
func (*GracefulGrpcAppserver) RunServer ¶
func (g *GracefulGrpcAppserver) RunServer() error
使用GracefulGrpcAppserver的run方法启动服务,代替Server启动
type Server ¶
type Server struct { Logger *zap.Logger Conf *config.AtreusSvcConfig //TODO:hot loading ConfLock *sync.RWMutex EtcdClient *etcdv3.Client InnerHandlers []grpc.UnaryServerInterceptor //拦截器数组 InnerStreamHandlers []grpc.StreamServerInterceptor //stream拦截器数组 ServiceReg dis.ServiceRegisterWrapper //auth Auther *auth.Authenticator //通用的验证接口 //limiter Limiters *XRateLimiter //context Ctx context.Context //acl CallerIp []string //max retry limit MaxRetry int //Log sampling Sampling float64 Proba *xmath.XProbability //wrapper Server RpcServer *grpc.Server //原生Server // contains filtered or unexported fields }
grpc-server核心结构(封装)
func NewServer ¶
func NewServer(conf *config.AtreusSvcConfig, opt ...grpc.ServerOption) *Server
func (*Server) Authorize ¶
func (s *Server) Authorize() grpc.UnaryServerInterceptor
Authorize :Server端认证的一元拦截器
func (*Server) BuildUnaryInterceptorChain ¶
func (s *Server) BuildUnaryInterceptorChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor
opt = append(opt, grpc.UnaryInterceptor(BuildUnaryInterceptorChain(Interceptor1, Interceptor2, Interceptor3, Interceptor4)))
func (*Server) BuildUnaryInterceptorChain2 ¶
func (s *Server) BuildUnaryInterceptorChain2(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
func (*Server) ExitWithSignalHandler ¶
func (s *Server) ExitWithSignalHandler()
func (*Server) Limit ¶
func (s *Server) Limit(limiter Limiter) grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.
func (*Server) LimitStream ¶
func (s *Server) LimitStream(limiter Limiter) grpc.StreamServerInterceptor
StreamServerInterceptor returns a new stream server interceptor that performs rate limiting on the request.
func (*Server) Metrics2Prometheus ¶
func (s *Server) Metrics2Prometheus() grpc.UnaryServerInterceptor
func (*Server) OpenTracingForServer ¶
func (s *Server) OpenTracingForServer() grpc.UnaryServerInterceptor
必须实现`metadata.TextMapReader`公共接口:https://pkg.go.dev/github.com/opentracing/opentracing-go#TextMapReader
func (*Server) Recovery ¶
func (s *Server) Recovery() grpc.UnaryServerInterceptor
NICE:将 recovery 作为 Server 拦截器,调用,打印崩溃异常的 stack 信息 在 Server 初始化时,这样调用,s.Use(s.Recovery(),...)
func (*Server) ReloadConfig ¶
func (*Server) RetryChecking ¶
func (s *Server) RetryChecking() grpc.UnaryServerInterceptor
服务端重试检测:检测ctx中的重传次数是否满足服务端限制
func (*Server) ServerDealTimeout ¶
func (s *Server) ServerDealTimeout(timeout time.Duration) grpc.UnaryServerInterceptor
Server 端超时调用处理
func (*Server) ServerStat ¶
func (s *Server) ServerStat() grpc.UnaryServerInterceptor
将cpu数据作为拦截器,每一次rpc调用都采集并返回客户端
INFO 05/18-06:44:36.358 grpc-access-log ret=0 path=/testproto.Greeter/SayHello ts=0.000648521 args=name:"tom" age:23 ip=127.0.0.1:8081 get reply: {hello tom from 127.0.0.1:8081 false} map[cpu_usage:[36] serverinfo:[enjoy]]
func (*Server) SrcIpFilter ¶
func (s *Server) SrcIpFilter() grpc.UnaryServerInterceptor
func (*Server) SrvValidator ¶
func (s *Server) SrvValidator() grpc.UnaryServerInterceptor
func (*Server) Use ¶
func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server
Use method attachs a global unary inteceptor to the server
func (*Server) UseStreamInterceptor ¶
func (s *Server) UseStreamInterceptor(handlers ...grpc.UnaryServerInterceptor) *Server
TODO:添加stream interceptors chain
func (*Server) XRequestId ¶
func (s *Server) XRequestId() grpc.UnaryServerInterceptor
type WMetadata ¶
func (*WMetadata) FromIncoming ¶
从服务端ctx获取metadata
func (*WMetadata) FromOutgoing ¶
从客户端ctx获取metadata
func (*WMetadata) ToIncoming ¶
服务端注入数据
type XRateLimiter ¶
type XRateLimiter struct { RateStore map[string]*rate.Limiter //按照RPC-method限流 LogTime int64 Rate rate.Limit BucketSize int }
提供基础xrate的限速实现
func NewXRateLimiter ¶
func NewXRateLimiter(rates rate.Limit, size int) *XRateLimiter
func (*XRateLimiter) Allow ¶
func (x *XRateLimiter) Allow(method string) bool
true:限速,请求丢弃 false:请求放过