Documentation ¶
Overview ¶
客户端实现
各种处理器(Handler)定义
定义logger接口,并提供默认实现:输出到stdout,stderr
协议实现的核心代码
服务器实现
channel, connection, client, server的状态统计信息
通用性函数与类型定义
Index ¶
- Constants
- Variables
- func CheckClientPacketStatus(prev, current byte) error
- func CheckServerPacketStatus(prev, current byte) error
- func CreateNetPacket(pkt *Packet) ([]byte, error)
- func HttpHeaderToIIPArg(h http.Header) (string, error)
- func IIPArgToHttpHeader(argValue string) (http.Header, error)
- func SetLogger(logger Logger)
- func ValidatePath(path string) bool
- func WritePacket(pkt *Packet, writer io.Writer) (int, error)
- type AddrWeightClient
- type Channel
- type Client
- type ClientChannel
- func (m *ClientChannel) Close(err error)
- func (m *ClientChannel) DoRequest(path string, request Request, timeout time.Duration) ([]byte, error)
- func (m *ClientChannel) DoStreamRequest(path string, request Request) error
- func (m *ClientChannel) GetCtx(key string) interface{}
- func (m *ClientChannel) SetCtx(key string, value interface{})
- type ClientConfig
- type ClientPathHandler
- type ClientPathHandlerManager
- type Connection
- type ConnectionSatis
- type Context
- type Count
- type DefaultClientPathHandler
- type DefaultContext
- type DefaultErrorHolder
- type DefaultLogger
- func (m *DefaultLogger) Debug(s string)
- func (m *DefaultLogger) Debugf(format string, args ...interface{})
- func (m *DefaultLogger) Error(s string)
- func (m *DefaultLogger) Errorf(format string, args ...interface{})
- func (m *DefaultLogger) GetLevel() LogLevel
- func (m *DefaultLogger) Log(s string)
- func (m *DefaultLogger) Logf(format string, args ...interface{})
- func (m *DefaultLogger) SetLevel(v LogLevel)
- func (m *DefaultLogger) Warn(s string)
- func (m *DefaultLogger) Warnf(format string, args ...interface{})
- type DefaultRequest
- type EnsureTimeRangeFunc
- type Error
- type ErrorHolder
- type EvaluatedClient
- type Handler
- type LoadBalanceClient
- type LogLevel
- type Logger
- type Measure
- type Packet
- type Request
- type Response
- type ResponseDeleteChannel
- type ResponseHandleFail
- type ResponseNewChannel
- type Server
- func (m *Server) AddCount(path string, count Count)
- func (m *Server) AddMeasure(path string, reqCount int64, duration time.Duration)
- func (m *Server) GetConnectionStatis() (respData []byte, e error)
- func (m *Server) GetListener() net.Listener
- func (m *Server) GetStatis(timeUnitJson []byte) (respData []byte, e error)
- func (m *Server) Handle(path string, queryParams url.Values, requestData []byte, dataCompleted bool) (respData []byte, e error)
- func (m *Server) Listener() net.Listener
- func (m *Server) RegisterHandler(path string, handler ServerPathHandler, timeCountRangeFunc EnsureTimeRangeFunc) error
- func (m *Server) Serve(listener net.Listener, isTls bool) error
- func (m *Server) StartListen() error
- func (m *Server) StartListenTLS(certFile, keyFile string) error
- func (m *Server) Stop(err error)
- func (m *Server) UnRegisterHandler(path string)
- type ServerConfig
- type ServerPathHandler
- type ServerPathHandlerManager
- type TimeCount
- type TimeRange
Constants ¶
const ( MaxPathLen uint32 = 2048 //packet的path字段最大字节数 MaxPacketSize uint32 = 16 * 1024 * 1024 //packet最大字节数 PacketReadBufSize uint32 = 512 * 1024 //从他tcp fd读取数据用于缓存解析的缓冲区的大小 // 系统路径 PathNewChannel string = "/sys/new_channel" PathDeleteChannel string = "/sys/delete_channel" PathServerCountJson string = "/sys/server_count" //获取服务器统计信息 PathServerMeasureJson string = "/sys/server_measure" //获取服务器测量信息 PathServerPathCountJson string = "/sys/path_count" //获取指定接口统计信息 PathServerPathMeasureJson string = "/sys/path_measure" //获取指定接口测量信息 PathServerStatis string = "/sys/statis" //获取服务器完整测量统计信息 PathServerConnectionStatis string = "/sys/connection_statis" //获取服务器的所有tcp连接信息 // 系统参数 // 以http server作为iip server的前端反向代理,http header打包为一个map[string]string marshal后的的json, // 以url-safe-base64编码,通过uri(/path?__http_hdr__=xxx)的参数__http_hdr__传递给后端的iip server ArgHttpHeader string = "__http_hdr__" //角色 RoleClient byte = 0 RoleServer byte = 4 //packet类型 PacketTypeRequest byte = 0 PacketTypeResponse byte = 4 //packet.status StatusC0 byte = 0 // 请求首帧,请求未完成 StatusC1 byte = 1 // 请求首帧,请求完成 StatusC2 byte = 2 // 请求后续帧,请求未完成 StatusC3 byte = 3 // 请求后续帧,请求完成 StatusS4 byte = 4 // 响应首帧,响应未完成 StatusS5 byte = 5 // 表示响应首帧,响应完成 StatusS6 byte = 6 // 表示响应后续帧,响应未完成 StatusS7 byte = 7 // 表示响应后续帧,响应完成 Status8 byte = 8 // 关闭连接 //系统Context常量 CtxServer string = "/ctx/sys/server" CtxClient string = "/ctx/sys/client" CtxResponseChan string = "/ctx/sys/response_chan" CtxUncompletedRequestChan string = "/ctx/sys/uncreq_chan" // 见Client.uncompletedRequestQueue CtxRequest string = "/ctx/sys/request" // 在client handle函数里,可以通过channel.GetCtxData(CtxRequest)获得当前响应对应的请求 CtxClientChannel string = "/ctx/sys/client_chan" // CtxLblClientChannel string = "/ctx/sys/lbl_client_chan" // )
系统常量定义
Variables ¶
var ( DefaultResponseData = []byte(`{"code": -1, "message": "unknown"}`) LogClosing bool = true //channel, connection关闭的时候记录日志 ErrPacketContinue error = &Error{Code: 100, Message: "packet uncompleted"} ErrHandleNoResponse error = &Error{Code: 101, Message: "handle no response"} ErrHandleError error = &Error{Code: 102, Message: "handle error"} ErrRequestTimeout error = &Error{Code: 103, Message: "request timeout"} ErrUnknown error = &Error{Code: 104, Message: "unknown"} ErrResponseHandlerNotImplement error = &Error{Code: 105, Message: "response handler not implement"} ErrChannelCreateLimited error = &Error{Code: 106, Message: "channel create limited"} ErrServerConnectionsLimited error = &Error{Code: 107, Message: "server connections limited"} ErrClientConnectionsLimited error = &Error{Code: 108, Message: "client connections limited"} )
系统变量定义
Functions ¶
func CheckClientPacketStatus ¶
检查来自client端的Packet的Status是否合法
func CheckServerPacketStatus ¶
检查来自server端的Packet的Status是否合法
func CreateNetPacket ¶
根据一个Packet对象创建一个用于tcp发送的网络数据包
func IIPArgToHttpHeader ¶ added in v1.3.4
func ValidatePath ¶
Types ¶
type AddrWeightClient ¶
type Channel ¶
type Channel struct { DefaultErrorHolder DefaultContext Id uint32 NewTime time.Time Count *Count // contains filtered or unexported fields }
channel的实现 chuannel由框架内部使用
func (*Channel) SendPacket ¶
type Client ¶
type Client struct { DefaultErrorHolder DefaultContext Count *Count Measure *Measure // contains filtered or unexported fields }
func NewClient ¶
func NewClient(config ClientConfig, serverAddr string, timeCountRangeFunc EnsureTimeRangeFunc) (*Client, error)
创建一个新的client
func NewClientTLS ¶
func NewClientTLS(config ClientConfig, serverAddr string, timeCountRangeFunc EnsureTimeRangeFunc, certFile, keyFile string) (*Client, error)
创建一个新的client, TLS模式
func (*Client) GetConnectionStatis ¶ added in v1.0.1
func (*Client) NewChannel ¶
func (m *Client) NewChannel() (*ClientChannel, error)
创建一个新的channel 每个connection会默认建立一个ID为0的信道,用于基础通讯功能,创建一个新的channel就是通过这个0号channel实现的: 创建channel的流程由client发起,服务器返回新创建的channel id,后续的业务通讯(request/response)应该在新创建的channel上进行
func (*Client) RegisterHandler ¶
func (m *Client) RegisterHandler(path string, handler ClientPathHandler) error
注册Path-Handler iip协议中包含一个path字段,该字段一般用来代表具体的服务器接口和资源 client和server通过注册对path的处理函数,以实现基于iip框架的开发
type ClientChannel ¶
type ClientChannel struct {
// contains filtered or unexported fields
}
func (*ClientChannel) DoRequest ¶
func (m *ClientChannel) DoRequest(path string, request Request, timeout time.Duration) ([]byte, error)
用于"消息式"请求/响应(系统自动将多个部分的响应数据合成为一个完整的响应,并通过这个阻塞的函数返回)
func (*ClientChannel) DoStreamRequest ¶
func (m *ClientChannel) DoStreamRequest(path string, request Request) error
用于于流式请求/响应(用户自己注册处理Handler,每接收到一部分响应数据,系统会调用Handler一次,这个调用是异步的,发送函数立即返回)
func (*ClientChannel) GetCtx ¶
func (m *ClientChannel) GetCtx(key string) interface{}
func (*ClientChannel) SetCtx ¶
func (m *ClientChannel) SetCtx(key string, value interface{})
type ClientConfig ¶
type ClientConfig struct { MaxConnections int //单client最大连接数 MaxChannelsPerConn int //单connection最大channel数 ChannelPacketQueueLen uint32 //channel的packet接收队列长度 TcpWriteQueueLen uint32 //connection的packet写队列长度 TcpConnectTimeout time.Duration //服务器连接超时限制 TcpReadBufferSize int //内核socket读缓冲区大小 TcpWriteBufferSize int //内核socket写缓冲区大小 WaitResponseTimeout time.Duration }
type ClientPathHandler ¶
type ClientPathHandlerManager ¶
type ClientPathHandlerManager struct { HanderMap map[string]ClientPathHandler sync.Mutex }
管理ClientPathHandler,从属于一个client
type Connection ¶
type Connection struct { DefaultErrorHolder DefaultContext Role byte //0 client, 4 server Client *Client //not nil if client side Server *Server //not nil if server side Channels map[uint32]*Channel MaxChannelId uint32 FreeChannleId map[uint32]struct{} ChannelsLock sync.RWMutex Count *Count // contains filtered or unexported fields }
func NewConnection ¶
func NewConnection(client *Client, server *Server, netConn net.Conn, role byte, writeQueueLen int) (*Connection, error)
创建一个Connection对象,由Client或Server内部调用
func (*Connection) Close ¶
func (m *Connection) Close(err error)
type ConnectionSatis ¶
type Count ¶
type DefaultClientPathHandler ¶
type DefaultClientPathHandler struct { }
type DefaultContext ¶
type DefaultContext struct {
// contains filtered or unexported fields
}
func (*DefaultContext) GetCtxData ¶
func (m *DefaultContext) GetCtxData(key string) interface{}
func (*DefaultContext) RemoveCtxData ¶
func (m *DefaultContext) RemoveCtxData(key string)
func (*DefaultContext) SetCtxData ¶
func (m *DefaultContext) SetCtxData(key string, value interface{})
type DefaultErrorHolder ¶
type DefaultErrorHolder struct {
// contains filtered or unexported fields
}
func (*DefaultErrorHolder) GetError ¶
func (m *DefaultErrorHolder) GetError() error
func (*DefaultErrorHolder) SetError ¶
func (m *DefaultErrorHolder) SetError(err error)
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
func (*DefaultLogger) Debug ¶
func (m *DefaultLogger) Debug(s string)
func (*DefaultLogger) Debugf ¶
func (m *DefaultLogger) Debugf(format string, args ...interface{})
func (*DefaultLogger) Error ¶
func (m *DefaultLogger) Error(s string)
func (*DefaultLogger) Errorf ¶
func (m *DefaultLogger) Errorf(format string, args ...interface{})
func (*DefaultLogger) GetLevel ¶
func (m *DefaultLogger) GetLevel() LogLevel
func (*DefaultLogger) Log ¶
func (m *DefaultLogger) Log(s string)
func (*DefaultLogger) Logf ¶
func (m *DefaultLogger) Logf(format string, args ...interface{})
func (*DefaultLogger) SetLevel ¶
func (m *DefaultLogger) SetLevel(v LogLevel)
func (*DefaultLogger) Warn ¶
func (m *DefaultLogger) Warn(s string)
func (*DefaultLogger) Warnf ¶
func (m *DefaultLogger) Warnf(format string, args ...interface{})
type DefaultRequest ¶
type DefaultRequest struct { DefaultContext // contains filtered or unexported fields }
func (*DefaultRequest) Data ¶
func (m *DefaultRequest) Data() []byte
type EnsureTimeRangeFunc ¶
type ErrorHolder ¶
type EvaluatedClient ¶
type EvaluatedClient struct {
// contains filtered or unexported fields
}
type LoadBalanceClient ¶
type LoadBalanceClient struct {
// contains filtered or unexported fields
}
由一组server提供无状态服务的场景下,LoadBalanceClient根据可配置的负载权重、keepalive检测,自动调节对不同server的请求频率 LoadBalanceClient内部管理多个client
func NewLoadBalanceClient ¶
func NewLoadBalanceClient(serverKeepConns int, serverMaxConns int, serverList string) (*LoadBalanceClient, error)
创建一个LoadBalanceClient, severList格式:ip:port#weight,ip:port#weight,ip:port#weight,..., 0<weight<100,表明server的权重。weight大于1,相当于增加weight-1个相同地址的server 每个server地址保持serverKeepConns个活跃连接,如果并发突增超过serverKeepConns,则自动创建新连接。 serverMaxConns指定单个server最大连接数,所有的server都超出serverMaxConns,则返回ErrClientConnectionsLimited错误,否则以其他有空的server代其服务 server任务分配以轮转模式运行 由于load balance客户端一般式针对同类型的业务,因此没有必要区分多个channel,内部每个connection开启一个channel.
func (*LoadBalanceClient) GetConnectionStatis ¶ added in v1.0.1
func (m *LoadBalanceClient) GetConnectionStatis() ([]byte, error)
func (*LoadBalanceClient) Status ¶
func (m *LoadBalanceClient) Status() string
type Logger ¶
type Measure ¶
type Measure struct { sync.RWMutex FiveMinuteRequests [301]int64 `json:"five_minute_requests"` //最近5分钟的请求数 FiveMinuteDuration [301]int64 `json:"five_minute_duration"` //最近5分钟的请求处理耗时 AllRequests int64 `json:"all_requests"` //全部请求数 AllDuration int64 `json:"all_duration"` //全部请求处理耗时 FiveMinuteTimeCount [301]*TimeCount `json:"five_minute_timecount"` //最近5分钟请求的耗时区间分段统计 AllTimeCount *TimeCount `json:"all_timecount"` //全部请求的耗时区间分段统计 // contains filtered or unexported fields }
分“最近5分钟”和“运行以来全部时间”两个维度的性能测量统计
func NewMesure ¶
func NewMesure(timeRangeEnsureFunc EnsureTimeRangeFunc) *Measure
timeUnit: 时间单位粒度(μs/ms) 1秒=1000ms=1000000μs
func (*Measure) SetTimeRangeFunc ¶
func (m *Measure) SetTimeRangeFunc(v EnsureTimeRangeFunc)
type Packet ¶
type Request ¶
func NewDefaultRequest ¶
构建一个iip请求,请求数据从data传入,如果传入nil,则函数自动填入"{}", iip协议不允许请求数据空白
type ResponseDeleteChannel ¶
type ResponseHandleFail ¶
func ErrorResponse ¶
func ErrorResponse(err *Error) *ResponseHandleFail
func (*ResponseHandleFail) Data ¶
func (m *ResponseHandleFail) Data() []byte
type ResponseNewChannel ¶
type Server ¶
type Server struct { DefaultErrorHolder DefaultContext // contains filtered or unexported fields }
func NewServer ¶
func NewServer(config ServerConfig, listenAddr string, timeCountRangeFunc EnsureTimeRangeFunc) (*Server, error)
func (*Server) AddMeasure ¶
func (*Server) GetConnectionStatis ¶ added in v1.0.1
func (*Server) GetListener ¶ added in v1.0.1
func (*Server) GetStatis ¶ added in v1.0.1
requestData format: {"time_unit": "microsecond|millisecond|second|nanosecond"}
func (*Server) RegisterHandler ¶
func (m *Server) RegisterHandler(path string, handler ServerPathHandler, timeCountRangeFunc EnsureTimeRangeFunc) error
func (*Server) StartListen ¶
listen socket and start server process
func (*Server) StartListenTLS ¶
listen socket and start server process in TLS mode
func (*Server) UnRegisterHandler ¶
type ServerConfig ¶
type ServerPathHandler ¶
type ServerPathHandler interface { // 一个request有可能由于size过大而被自动分割为多个packet传输,requestDataCompleted指示request是否已接收完整 // 为什么框架不等接收完整才调用handle呢,主要考虑在大数据量的传输场景中,不一定要接收完整才进行数据处理,可以边接收边处理 Handle(path string, queryParams url.Values, requestData []byte, requestDataCompleted bool) (responseData []byte, e error) }
type ServerPathHandlerManager ¶
type ServerPathHandlerManager struct { HandlerMap map[string]ServerPathHandler sync.Mutex }
管理ServerPathHandler,从属于一个server
type TimeCount ¶
分区间统计
func (*TimeCount) Record ¶
func (m *TimeCount) Record(duration int64, ensureFunc EnsureTimeRangeFunc)
type TimeRange ¶
type TimeRange int64
耗时区间