iip

package module
v1.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2021 License: MIT Imports: 20 Imported by: 0

README

GoDoc

IIP是什么?

基于TCP的基础通讯协议及框架(IIP,Internal Interaction Protocol),该协议可作为RPC接口调用的底层协议,如同http2之于gRPC,本项目基于该协议实现了client/server的基础框架。

使用说明

benchmark对比测试

  • BenchmarkPFEchoClientServer: 普通iip client单个channel
  • BenchmarkPFEchoNetHttp:标准库 net/http
  • BenchmarkPFIIPBalanceClient:iip load balance client
  • 运行benchmark前需要先编译运行启动server端,server代码在example/echo_server/ 编译:$ go build ./echo_server.go 启动: $ ./echo_server
  • 单核:
$ GOMAXPROCS=1 go test -bench=. -naddr="192.168.2.98:9091" -lbcaddr="192.168.2.98:9090#2" -iipaddr="192.168.2.98:9090" -run="PF.*" -benchmem -benchtime=10s
BenchmarkPFEchoClientServer 	    5418	   1944837 ns/op	   99428 B/op	      16 allocs/op
BenchmarkPFEchoNetHttp      	    3510	   3342903 ns/op	   51476 B/op	      64 allocs/op
BenchmarkPFIIPBalanceClient 	    6043	   1942451 ns/op	   92033 B/op	      16 allocs/op
  • 四核:
$ GOMAXPROCS=4 go test -bench=. -naddr="192.168.2.98:9091" -lbcaddr="192.168.2.98:9090#2" -iipaddr="192.168.2.98:9090" -run="PF.*" -benchmem -benchtime=10s
BenchmarkPFEchoClientServer-4   	    7243	   1589468 ns/op	   99418 B/op	      16 allocs/op
BenchmarkPFEchoNetHttp-4        	   13854	    868070 ns/op	   51576 B/op	      64 allocs/op
BenchmarkPFIIPBalanceClient-4   	   19844	    590167 ns/op	   85720 B/op	      15 allocs/op
  • 八核:
$ GOMAXPROCS=8 go test -bench=. -naddr="192.168.2.98:9091" -lbcaddr="192.168.2.98:9090#2" -iipaddr="192.168.2.98:9090" -run="PF.*" -benchmem -benchtime=10s
BenchmarkPFEchoClientServer-8   	    7090	   1630507 ns/op	   99443 B/op	      16 allocs/op
BenchmarkPFEchoNetHttp-8        	   27556	    428245 ns/op	   52198 B/op	      65 allocs/op
BenchmarkPFIIPBalanceClient-8   	   36670	    305865 ns/op	   84436 B/op	      15 allocs/op

典型案例

image
百寻广告流量交易平台日处理数十亿次广告流量请求,峰值qps 4w+, 采用iip承载其内部核心交易系统的微服务。

IIP架构

image

为什么开发IIP?从http1.1说起

一、 http1的缺点,以及对应的http2的优化

1. http1是文本协议,“文本协议”的意思是其传输的数据流(包括header和body)必须先转换为ascii码的可见字符。为什么要这样呢?因为他是以\n换行符来进行数据分隔的。如果是传输带额数据是以原本的二进制内码的形式,则会和\n产生冲突,无法解析。而采用文本形式则势必需要对原数据进行文本化编码,比如url-encode,base64,等等,无论哪种编码,都会导致数据的体积增大。

http2是二进制协议,无需进行文本化编码,不会导致体积增大。

http2只有在第一次请求的时候通过http1的头部字段upgrade来请求升级到http2协议,后续的请求响应全部以http2的二进制格式的帧进行传输,帧的精心设计,避免了如http1那样的头部信息冗余。而对应于http1的header信息,在http2的header帧中保留了一个伪首部的区域以供承载 ,且这个伪首部可以采用HPACK高效压缩。

3. http1是“单路的”,意思是一条底层的tcp连接上同一时刻只能跑一个请求或响应。比如说,有r1,r2,r3三个请求,在同一个连接上,必须依次串行执行,先发出r1,收到完整的响应后,再发出r2。。。这样的单路模式,对于高并发的后端服务器来说,带来几个问题:

操作内核短时间产生大量的tcp连接,可能触发资源上限,包括连接数的上限,和大量端口占用; tcp的连接的建立、断开分别需要连接双方进行3次、4次握手,这个过程是缓慢的。同时tcp连接关闭以后,需要经历一段时间(十几秒到几十秒)的静默期,在此期间端口是不能复用的,这是tcp协议设计的要求,为了保证可靠性; 在这两者的基础上,对上层业务带来的影响就是突发性的流量请求,或者说非持久连接型的应用类型,一方面会给服务器带来更大的负载,另一方面会显著的增加响应的时延。 http2是多路复用的,意思是一条tcp连接上可以同时跑多个请求和响应,也就是说r1,r2,r3可以同时发起,其原理是http2数据帧的格式里定义标识了不同的请求r1,r2,r3的字段。这样采用数据帧交替传输,实现逻辑上的并发。物理上当然还是串行执行的,但是他减少了tcp连接,从而大幅度降低了连接断开的延时,显著避免系统内核资源的上限,也降低了系统的负载。 值得一提的是由于多个请求在同一条tcp连接上传输,http2还在数据帧上设计了优先级字段,以使某些请求可以优先传输。同时还在数据帧上设计了流量控制字段。等等,可谓用心。

4. http1只能以请求响应的模式运行,http2基于请求响应,同时支持服务端主动推送(一个客户端请求,返回多个响应)。

二、http2的不足

1. http2是基于tcp协议的,tcp协议在设计之初并非是以现在优质、高带宽的网络基础设施基础上设计的,他充分考虑了数据的可靠性,显得小心谨慎,在传输速率的表现,也已经跟不上现时的网络基础设施。未来是否有更优化的网络层协议发展出来,可以拭目以待,包括像基于udp协议的QUIC协议。个人认为下层协议还是由os内核来实现比较好,QUIC协议实现在应用层而非操作系统的内核层,始终是一个软肋。
2. 大部分的http2实现和应用(包括浏览器和web服务器),事实上都必须基于TLS(SSL)安全套接层。对于一个承载互联网内容的基础协议来说,这样的安全考量是合理的,也是必须的。有利就有弊,TLS的握手和数据加密过程必然给通信及通信双方带来一些负担。这些负担和额外的耗费,对于一些内部应用,比如说后端的微服务之间,有时候并不是必须的,是可以简化的。
3. 由于现实世界已经基于http1建立起来,一些通讯链路上的基础设施,比如说http代理,暂不能支持http2,因此这会对http2的铺开造成阻碍,且这种阻碍可能是长期的过程。
4. 由于http2是二进制的,传输又是多路复用的,在不同帧的设计上考虑到了压缩、优先级控制、流量控制、服务端推送,这导致http2的协议可以说比较复杂了。因此在协议的实现、应用的调试上将显然会比简单明文的http1增加一些难度。简单和直观,对于人类来说,具有天生的亲和力。

三、鱼和熊掌可兼得否?

我研究http2的初心起源于我想实现一组用于后端系统内部、微服务之间的通信协议,及其对应的客户端和服务器。由于手上现有的系统都是基于http1,自然是想标准的http2是否能直接满足我的需求:规避http1那些缺点,以“不支持多路复用”尤甚(这种情况在突发的网络流量下弊端尤为明显)。http2是可以满足我的需求的,且其在安全性,功能细节考虑得更完善更先进。但是,正是由于这些优势、由于http2作为互联网标准的基础协议的负担,其设计上带来的这种必然的复杂性,使得其在这种内部微服务间的确定性的应用场景中,并非最好的选择。于是,在这个场景中,基于简单,高效的原则,自行实现一个这样用于系统内部服务器节点间的应用通讯协议(IIP,Internal Interaction Protocol)的构想就产生了,该协议可作为RPC接口调用的底层协议,如同http2之于gRPC, 其主要特征:

* 基于tcp独立实现。
* 不考虑服务端推送,这种需求场景通过建立两条对向的tcp连接来实现。
* 协议基于“请求/响应”模式,每一个请求有且只有一个响应。请求和响应底层由一个或多个帧(packet)组成。
* 基于帧的“多路复用”,统一的帧格式, 依次由如下部分组成:

帧格式:

  • 1字节数据帧状态标识:
    • 0表示请求首帧,请求未完成
    • 1表示请求首帧,请求完成
    • 2表示请求后续帧,请求未完成
    • 3表示请求后续帧,请求完成;
    • 4表示响应首帧,响应未完成
    • 5表示响应首帧,响应完成
    • 6表示响应后续帧,响应未完成
    • 7表示响应后续帧,响应完成
    • 8关闭连接
  • 文本路径(URL兼容,用于指明请求的路径及参数, 如 /manager/picture/get?id=123&name=abc)
  • \0
  • 4字节channel识符(多路复用的流身份ID,无符号整数,请求方发起创建,server返回新channel的唯一ID)
  • 4字节数据长度(限制一个帧的数据长度不能大于16MB)
  • 数据
* 对于协议的扩展性考虑,按分层的思想,可以在上层通过对数据字段进行进一步的协议定义来满足,本协议保持简单性,不提供额外的冗余扩展字段。

比如示例中的文件下载,每次请求响应传输文件的其中一块,文件的整体组合由一个简单的上层协议实现。

Documentation

Overview

客户端实现

各种处理器(Handler)定义

定义logger接口,并提供默认实现:输出到stdout,stderr

协议实现的核心代码

服务器实现

channel, connection, client, server的状态统计信息

通用性函数与类型定义

Index

Constants

View Source
const (
	MaxPathLen        uint32 = 2048             //packet的path字段最大字节数
	MaxPacketSize     uint32 = 16 * 1024 * 1024 //packet最大字节数
	PacketReadBufSize uint32 = 512 * 1024       //从他tcp fd读取数据用于缓存解析的缓冲区的大小

	// 系统路径
	PathNewChannel             string = "/sys/new_channel"
	PathDeleteChannel          string = "/sys/delete_channel"
	PathServerCountJson        string = "/sys/server_count"      //获取服务器统计信息
	PathServerMeasureJson      string = "/sys/server_measure"    //获取服务器测量信息
	PathServerPathCountJson    string = "/sys/path_count"        //获取指定接口统计信息
	PathServerPathMeasureJson  string = "/sys/path_measure"      //获取指定接口测量信息
	PathServerStatis           string = "/sys/statis"            //获取服务器完整测量统计信息
	PathServerConnectionStatis string = "/sys/connection_statis" //获取服务器的所有tcp连接信息

	// 系统参数
	// 以http server作为iip server的前端反向代理,http header打包为一个map[string]string marshal后的的json,
	// 以url-safe-base64编码,通过uri(/path?__http_hdr__=xxx)的参数__http_hdr__传递给后端的iip server
	ArgHttpHeader string = "__http_hdr__"

	//角色
	RoleClient byte = 0
	RoleServer byte = 4

	//packet类型
	PacketTypeRequest  byte = 0
	PacketTypeResponse byte = 4

	//packet.status
	StatusC0 byte = 0 // 请求首帧,请求未完成
	StatusC1 byte = 1 // 请求首帧,请求完成
	StatusC2 byte = 2 // 请求后续帧,请求未完成
	StatusC3 byte = 3 // 请求后续帧,请求完成
	StatusS4 byte = 4 // 响应首帧,响应未完成
	StatusS5 byte = 5 // 表示响应首帧,响应完成
	StatusS6 byte = 6 // 表示响应后续帧,响应未完成
	StatusS7 byte = 7 // 表示响应后续帧,响应完成
	Status8  byte = 8 // 关闭连接

	//系统Context常量
	CtxServer                 string = "/ctx/sys/server"
	CtxClient                 string = "/ctx/sys/client"
	CtxResponseChan           string = "/ctx/sys/response_chan"
	CtxUncompletedRequestChan string = "/ctx/sys/uncreq_chan"     // 见Client.uncompletedRequestQueue
	CtxRequest                string = "/ctx/sys/request"         // 在client handle函数里,可以通过channel.GetCtxData(CtxRequest)获得当前响应对应的请求
	CtxClientChannel          string = "/ctx/sys/client_chan"     //
	CtxLblClientChannel       string = "/ctx/sys/lbl_client_chan" //
)

系统常量定义

Variables

View Source
var (
	DefaultResponseData      = []byte(`{"code": -1, "message": "unknown"}`)
	LogClosing          bool = true //channel, connection关闭的时候记录日志

	ErrPacketContinue              error = &Error{Code: 100, Message: "packet uncompleted"}
	ErrHandleNoResponse            error = &Error{Code: 101, Message: "handle no response"}
	ErrHandleError                 error = &Error{Code: 102, Message: "handle error"}
	ErrRequestTimeout              error = &Error{Code: 103, Message: "request timeout"}
	ErrUnknown                     error = &Error{Code: 104, Message: "unknown"}
	ErrResponseHandlerNotImplement error = &Error{Code: 105, Message: "response handler not implement"}
	ErrChannelCreateLimited        error = &Error{Code: 106, Message: "channel create limited"}
	ErrServerConnectionsLimited    error = &Error{Code: 107, Message: "server connections limited"}
	ErrClientConnectionsLimited    error = &Error{Code: 108, Message: "client connections limited"}
)

系统变量定义

Functions

func CheckClientPacketStatus

func CheckClientPacketStatus(prev, current byte) error

检查来自client端的Packet的Status是否合法

func CheckServerPacketStatus

func CheckServerPacketStatus(prev, current byte) error

检查来自server端的Packet的Status是否合法

func CreateNetPacket

func CreateNetPacket(pkt *Packet) ([]byte, error)

根据一个Packet对象创建一个用于tcp发送的网络数据包

func HttpHeaderToIIPArg added in v1.3.4

func HttpHeaderToIIPArg(h http.Header) (string, error)

func IIPArgToHttpHeader added in v1.3.4

func IIPArgToHttpHeader(argValue string) (http.Header, error)

func SetLogger

func SetLogger(logger Logger)

func ValidatePath

func ValidatePath(path string) bool

func WritePacket

func WritePacket(pkt *Packet, writer io.Writer) (int, error)

向网络发送Packet

Types

type AddrWeightClient

type AddrWeightClient struct {
	Addr   string
	Weight int
	// contains filtered or unexported fields
}

type Channel

type Channel struct {
	DefaultErrorHolder
	DefaultContext
	Id      uint32
	NewTime time.Time

	Count *Count
	// contains filtered or unexported fields
}

channel的实现 chuannel由框架内部使用

func (*Channel) Close

func (m *Channel) Close(err error)

关闭channel

func (*Channel) SendPacket

func (m *Channel) SendPacket(pkt *Packet) error

type Client

type Client struct {
	DefaultErrorHolder
	DefaultContext

	Count   *Count
	Measure *Measure
	// contains filtered or unexported fields
}

func NewClient

func NewClient(config ClientConfig, serverAddr string, timeCountRangeFunc EnsureTimeRangeFunc) (*Client, error)

创建一个新的client

func NewClientTLS

func NewClientTLS(config ClientConfig, serverAddr string, timeCountRangeFunc EnsureTimeRangeFunc, certFile, keyFile string) (*Client, error)

创建一个新的client, TLS模式

func (*Client) Close

func (m *Client) Close()

func (*Client) GetConnectionStatis added in v1.0.1

func (m *Client) GetConnectionStatis() (respData []byte, e error)

func (*Client) NewChannel

func (m *Client) NewChannel() (*ClientChannel, error)

创建一个新的channel 每个connection会默认建立一个ID为0的信道,用于基础通讯功能,创建一个新的channel就是通过这个0号channel实现的: 创建channel的流程由client发起,服务器返回新创建的channel id,后续的业务通讯(request/response)应该在新创建的channel上进行

func (*Client) RegisterHandler

func (m *Client) RegisterHandler(path string, handler ClientPathHandler) error

注册Path-Handler iip协议中包含一个path字段,该字段一般用来代表具体的服务器接口和资源 client和server通过注册对path的处理函数,以实现基于iip框架的开发

func (*Client) UnRegisterHandler

func (m *Client) UnRegisterHandler(path string)

取消注册Path-Handler

type ClientChannel

type ClientChannel struct {
	// contains filtered or unexported fields
}

func (*ClientChannel) Close

func (m *ClientChannel) Close(err error)

关闭channel

func (*ClientChannel) DoRequest

func (m *ClientChannel) DoRequest(path string, request Request, timeout time.Duration) ([]byte, error)

用于"消息式"请求/响应(系统自动将多个部分的响应数据合成为一个完整的响应,并通过这个阻塞的函数返回)

func (*ClientChannel) DoStreamRequest

func (m *ClientChannel) DoStreamRequest(path string, request Request) error

用于于流式请求/响应(用户自己注册处理Handler,每接收到一部分响应数据,系统会调用Handler一次,这个调用是异步的,发送函数立即返回)

func (*ClientChannel) GetCtx

func (m *ClientChannel) GetCtx(key string) interface{}

func (*ClientChannel) SetCtx

func (m *ClientChannel) SetCtx(key string, value interface{})

type ClientConfig

type ClientConfig struct {
	MaxConnections        int           //单client最大连接数
	MaxChannelsPerConn    int           //单connection最大channel数
	ChannelPacketQueueLen uint32        //channel的packet接收队列长度
	TcpWriteQueueLen      uint32        //connection的packet写队列长度
	TcpConnectTimeout     time.Duration //服务器连接超时限制
	TcpReadBufferSize     int           //内核socket读缓冲区大小
	TcpWriteBufferSize    int           //内核socket写缓冲区大小
	WaitResponseTimeout   time.Duration
}

type ClientPathHandler

type ClientPathHandler interface {
	// 一个response有可能由于size过大而被自动分割为多个packet传输,responseDataCompleted指示response是否已接收完整
	// 为什么框架不等接收完整才调用handle呢,主要考虑在大数据量的传输场景中,不一定要接收完整才进行数据处理,可以边接收边处理
	Handle(path string, request Request, responseData []byte, responseDataCompleted bool) error
}

type ClientPathHandlerManager

type ClientPathHandlerManager struct {
	HanderMap map[string]ClientPathHandler
	sync.Mutex
}

管理ClientPathHandler,从属于一个client

type Connection

type Connection struct {
	DefaultErrorHolder
	DefaultContext
	Role     byte    //0 client, 4 server
	Client   *Client //not nil if client side
	Server   *Server //not nil if server side
	Channels map[uint32]*Channel

	MaxChannelId  uint32
	FreeChannleId map[uint32]struct{}
	ChannelsLock  sync.RWMutex

	Count *Count
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(client *Client, server *Server, netConn net.Conn, role byte, writeQueueLen int) (*Connection, error)

创建一个Connection对象,由Client或Server内部调用

func (*Connection) Close

func (m *Connection) Close(err error)

type ConnectionSatis

type ConnectionSatis struct {
	Channels map[uint32]struct {
		ReceiveQueue int
		Count        *Count
	}
	WriteQueue int
	Count      *Count
}

type Context

type Context interface {
	GetCtxData(key string) interface{}
	SetCtxData(key string, value interface{})
	RemoveCtxData(key string)
}

type Count

type Count struct {
	PacketsSent         int64
	PacketReceived      int64
	WholePacketSent     int64
	WholePacketReceived int64
	BytesSent           int64
	BytesReceived       int64
}

func (*Count) Add

func (m *Count) Add(v Count)

func (*Count) AddPtr

func (m *Count) AddPtr(v *Count)

type DefaultClientPathHandler

type DefaultClientPathHandler struct {
}

func (*DefaultClientPathHandler) Handle

func (m *DefaultClientPathHandler) Handle(path string, request Request, responseData []byte, dataCompleted bool) error

type DefaultContext

type DefaultContext struct {
	// contains filtered or unexported fields
}

func (*DefaultContext) GetCtxData

func (m *DefaultContext) GetCtxData(key string) interface{}

func (*DefaultContext) RemoveCtxData

func (m *DefaultContext) RemoveCtxData(key string)

func (*DefaultContext) SetCtxData

func (m *DefaultContext) SetCtxData(key string, value interface{})

type DefaultErrorHolder

type DefaultErrorHolder struct {
	// contains filtered or unexported fields
}

func (*DefaultErrorHolder) GetError

func (m *DefaultErrorHolder) GetError() error

func (*DefaultErrorHolder) SetError

func (m *DefaultErrorHolder) SetError(err error)

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

func (*DefaultLogger) Debug

func (m *DefaultLogger) Debug(s string)

func (*DefaultLogger) Debugf

func (m *DefaultLogger) Debugf(format string, args ...interface{})

func (*DefaultLogger) Error

func (m *DefaultLogger) Error(s string)

func (*DefaultLogger) Errorf

func (m *DefaultLogger) Errorf(format string, args ...interface{})

func (*DefaultLogger) GetLevel

func (m *DefaultLogger) GetLevel() LogLevel

func (*DefaultLogger) Log

func (m *DefaultLogger) Log(s string)

func (*DefaultLogger) Logf

func (m *DefaultLogger) Logf(format string, args ...interface{})

func (*DefaultLogger) SetLevel

func (m *DefaultLogger) SetLevel(v LogLevel)

func (*DefaultLogger) Warn

func (m *DefaultLogger) Warn(s string)

func (*DefaultLogger) Warnf

func (m *DefaultLogger) Warnf(format string, args ...interface{})

type DefaultRequest

type DefaultRequest struct {
	DefaultContext
	// contains filtered or unexported fields
}

func (*DefaultRequest) Data

func (m *DefaultRequest) Data() []byte

type EnsureTimeRangeFunc

type EnsureTimeRangeFunc = func(duration int64) TimeRange

type Error

type Error struct {
	Code    int
	Message string
	Tm      time.Time
}

func (*Error) Error

func (m *Error) Error() string

type ErrorHolder

type ErrorHolder interface {
	GetError() error
	SetError(err error)
}

type EvaluatedClient

type EvaluatedClient struct {
	// contains filtered or unexported fields
}

func (*EvaluatedClient) DoRequest

func (m *EvaluatedClient) DoRequest(path string, request Request, timeout time.Duration) ([]byte, error)

type Handler

type Handler interface {
	Handle(c *Channel, request *Packet, dataCompleted bool) ([]byte, error)
}

packet handler接口

type LoadBalanceClient

type LoadBalanceClient struct {
	// contains filtered or unexported fields
}

由一组server提供无状态服务的场景下,LoadBalanceClient根据可配置的负载权重、keepalive检测,自动调节对不同server的请求频率 LoadBalanceClient内部管理多个client

func NewLoadBalanceClient

func NewLoadBalanceClient(serverKeepConns int, serverMaxConns int, serverList string) (*LoadBalanceClient, error)

创建一个LoadBalanceClient, severList格式:ip:port#weight,ip:port#weight,ip:port#weight,..., 0<weight<100,表明server的权重。weight大于1,相当于增加weight-1个相同地址的server 每个server地址保持serverKeepConns个活跃连接,如果并发突增超过serverKeepConns,则自动创建新连接。 serverMaxConns指定单个server最大连接数,所有的server都超出serverMaxConns,则返回ErrClientConnectionsLimited错误,否则以其他有空的server代其服务 server任务分配以轮转模式运行 由于load balance客户端一般式针对同类型的业务,因此没有必要区分多个channel,内部每个connection开启一个channel.

func (*LoadBalanceClient) DoRequest

func (m *LoadBalanceClient) DoRequest(path string, request Request, timeout time.Duration) ([]byte, error)

func (*LoadBalanceClient) GetConnectionStatis added in v1.0.1

func (m *LoadBalanceClient) GetConnectionStatis() ([]byte, error)

func (*LoadBalanceClient) Status

func (m *LoadBalanceClient) Status() string

type LogLevel

type LogLevel int
const (
	LogLevelError LogLevel = iota
	LoglevelWarn
	LogLevelLog
	LogLevelDebug
)

type Logger

type Logger interface {
	GetLevel() LogLevel
	SetLevel(v LogLevel)
	Error(s string)
	Errorf(format string, args ...interface{})
	Warn(s string)
	Warnf(format string, args ...interface{})
	Log(s string)
	Logf(format string, args ...interface{})
	Debug(s string)
	Debugf(format string, args ...interface{})
}

func GetLogger

func GetLogger() Logger

type Measure

type Measure struct {
	sync.RWMutex
	FiveMinuteRequests  [301]int64      `json:"five_minute_requests"`  //最近5分钟的请求数
	FiveMinuteDuration  [301]int64      `json:"five_minute_duration"`  //最近5分钟的请求处理耗时
	AllRequests         int64           `json:"all_requests"`          //全部请求数
	AllDuration         int64           `json:"all_duration"`          //全部请求处理耗时
	FiveMinuteTimeCount [301]*TimeCount `json:"five_minute_timecount"` //最近5分钟请求的耗时区间分段统计
	AllTimeCount        *TimeCount      `json:"all_timecount"`         //全部请求的耗时区间分段统计
	// contains filtered or unexported fields
}

分“最近5分钟”和“运行以来全部时间”两个维度的性能测量统计

func NewMesure

func NewMesure(timeRangeEnsureFunc EnsureTimeRangeFunc) *Measure

timeUnit: 时间单位粒度(μs/ms) 1秒=1000ms=1000000μs

func (*Measure) Add

func (m *Measure) Add(cnt int64, dur time.Duration)

func (*Measure) Json

func (m *Measure) Json() []byte

func (*Measure) SetTimeRangeFunc

func (m *Measure) SetTimeRangeFunc(v EnsureTimeRangeFunc)

func (*Measure) String

func (m *Measure) String(timeUnit time.Duration) string

timeUnit: time.Second,time.MicroSecond, time.MilliSecond,etc.

type Packet

type Packet struct {
	Type      byte   `json:"type"` //0 request, 4 response
	Status    byte   `json:"status"`
	Path      string `json:"path"`
	ChannelId uint32 `json:"channel_id"`
	Data      []byte `json:"data"`
	DontChunk bool   //禁止分成多个packet传输
	// contains filtered or unexported fields
}

func ReadPacket

func ReadPacket(reader io.Reader) (*Packet, error)

从网络数据中读取生成Packet对象

func (*Packet) PathParam added in v1.0.1

func (m *Packet) PathParam(key string) string

type Request

type Request interface {
	Context
	Data() []byte
}

func NewDefaultRequest

func NewDefaultRequest(data []byte) Request

构建一个iip请求,请求数据从data传入,如果传入nil,则函数自动填入"{}", iip协议不允许请求数据空白

type Response

type Response struct {
	Request Request
	Data    []byte
}

type ResponseDeleteChannel

type ResponseDeleteChannel struct {
	Code    int    `json:"code"`
	Message string `json:"message,omitempty"`
}

type ResponseHandleFail

type ResponseHandleFail struct {
	Code    int    `json:"code"`
	Message string `json:"message,omitempty"`
}

func ErrorResponse

func ErrorResponse(err *Error) *ResponseHandleFail

func (*ResponseHandleFail) Data

func (m *ResponseHandleFail) Data() []byte

type ResponseNewChannel

type ResponseNewChannel struct {
	Code      int    `json:"code"`
	Message   string `json:"message,omitempty"`
	ChannelId uint32 `json:"channel_id,omitempty"`
}

type Server

type Server struct {
	DefaultErrorHolder
	DefaultContext
	// contains filtered or unexported fields
}

func NewServer

func NewServer(config ServerConfig, listenAddr string, timeCountRangeFunc EnsureTimeRangeFunc) (*Server, error)

func (*Server) AddCount

func (m *Server) AddCount(path string, count Count)

func (*Server) AddMeasure

func (m *Server) AddMeasure(path string, reqCount int64, duration time.Duration)

func (*Server) GetConnectionStatis added in v1.0.1

func (m *Server) GetConnectionStatis() (respData []byte, e error)

func (*Server) GetListener added in v1.0.1

func (m *Server) GetListener() net.Listener

func (*Server) GetStatis added in v1.0.1

func (m *Server) GetStatis(timeUnitJson []byte) (respData []byte, e error)

requestData format: {"time_unit": "microsecond|millisecond|second|nanosecond"}

func (*Server) Handle

func (m *Server) Handle(path string, queryParams url.Values, requestData []byte, dataCompleted bool) (respData []byte, e error)

func (*Server) Listener

func (m *Server) Listener() net.Listener

func (*Server) RegisterHandler

func (m *Server) RegisterHandler(path string, handler ServerPathHandler, timeCountRangeFunc EnsureTimeRangeFunc) error

func (*Server) Serve

func (m *Server) Serve(listener net.Listener, isTls bool) error

func (*Server) StartListen

func (m *Server) StartListen() error

listen socket and start server process

func (*Server) StartListenTLS

func (m *Server) StartListenTLS(certFile, keyFile string) error

listen socket and start server process in TLS mode

func (*Server) Stop

func (m *Server) Stop(err error)

stop server

func (*Server) UnRegisterHandler

func (m *Server) UnRegisterHandler(path string)

type ServerConfig

type ServerConfig struct {
	MaxConnections        int
	MaxChannelsPerConn    int
	ChannelPacketQueueLen uint32
	TcpWriteQueueLen      uint32
	TcpReadBufferSize     int
	TcpWriteBufferSize    int
}

type ServerPathHandler

type ServerPathHandler interface {
	// 一个request有可能由于size过大而被自动分割为多个packet传输,requestDataCompleted指示request是否已接收完整
	// 为什么框架不等接收完整才调用handle呢,主要考虑在大数据量的传输场景中,不一定要接收完整才进行数据处理,可以边接收边处理
	Handle(path string, queryParams url.Values, requestData []byte, requestDataCompleted bool) (responseData []byte, e error)
}

type ServerPathHandlerManager

type ServerPathHandlerManager struct {
	HandlerMap map[string]ServerPathHandler
	sync.Mutex
}

管理ServerPathHandler,从属于一个server

type TimeCount

type TimeCount struct {
	sync.Mutex
	RangeCount [8]int64 `json:"range_count"`
}

分区间统计

func (*TimeCount) Clear

func (m *TimeCount) Clear()

func (*TimeCount) Record

func (m *TimeCount) Record(duration int64, ensureFunc EnsureTimeRangeFunc)

type TimeRange

type TimeRange int64

耗时区间

const (
	TimeRange1 TimeRange = iota
	TimeRange2
	TimeRange3
	TimeRange4
	TimeRange5
	TimeRange6
	TimeRange7
	TimeRangeOther
)

func EnsureTimeRangeMicroSecond

func EnsureTimeRangeMicroSecond(duration int64) TimeRange

func EnsureTimeRangeMilliSecond

func EnsureTimeRangeMilliSecond(duration int64) TimeRange

func EnsureTimeRangeSecond

func EnsureTimeRangeSecond(duration int64) TimeRange

Directories

Path Synopsis
example
echo_client
echo client
echo client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL