wsPool

package module
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2020 License: MIT Imports: 16 Imported by: 0

README

wsPool

介绍

golang websocket 连接池 v1.1.4

依赖了goframe 框架(很优秀的框架)的部分库 基于gorilla/websocket和protobuf实现

同时支持各种类型的数据交互

examples
func main() {
	flag.Parse()
	//初骀化连接池
	wsPool.InitWsPool(func(err interface{}) {
		//接收连接池中的运行时错误信息
		log.Panicln(err)
	})
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		pcol := r.Header.Get("Sec-Websocket-Protocol")
		list:=strings.Split(pcol, "_")
		head := http.Header{}
		head.Add("Sec-Websocket-Protocol", pcol)

		//实例化连接对象
		client:=wsPool.NewClient(&wsPool.Config{
			Id:list[0], //连接标识
			Type:"ws", //连接类型
			Channel:list[1:], //指定频道
		})

		//连接成功回调
		client.OnOpen(func() {
			log.Panicln("连接己开启"+client.Id)
		})

		//接收消息
		client.OnMessage(func(msg *wsPool.SendMsg) {
			log.Panicln(msg)
			if msg.ToClientId!="" {
				//发送消息给指定的ToClientID连接
				wsPool.Send(msg)
				//发送消息给当前连接对象
				client.Send(msg)
			}
			if len(msg.Channel)>0{
				//按频道广播,可指定多个频道[]string
				client.Broadcast(msg) //或者 wsPool.Broadcast(msg)
			}
			//或都全局广播,所有连接都进行发送
			wsPool.BroadcastAll(msg)

		})
		//连接断开回调
		client.OnClose(func() {
			log.Panicln("连接己经关闭"+client.Id)
		})
		client.OnError(func(err error) {
			log.Panicln("连接",client.Id,"错误信息:",err)
		})

		//开启连接
		client.OpenClient(w,r,head)

	})
	err := http.ListenAndServe(*addr, nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}


基层的protobuf格式

除 toClientId,fromClientId,channel外 其它都可以随意定议,增减都可以。


message SendMsg {
  int32 cmd =1;  //指令编号
  int64 timestamp  = 2; //消息发送时间
  string fromClientId =3;  //指令消息的来源。发送者的连接ID
  string toClientId = 4;  //指令消息的接收者。发送给对应的客户端连接ID
  bytes cmdData =5;  //对应指令的CmdData1的protobuf的message
  int32 status=6;  //消息发送响应状态
  int32 priority=7; //用于处理指令队列的优先级的权重值
  string localId = 8; //客户端标识消息的id,主要区分相同cmd的不同消息,方便收到回复分发处理等效果,考虑长度问题定义成string
  string serverId = 9; //服务端发送消息的ID,主要区分相同cmd的不同消息,方便服务端收到回复分发处理等效果 考虑长度问题定义成string
  repeated string channel = 10; //指定需要广播的频道,可以指定一个或多个频道
  string msg =11; //一般用于json数据传递,或消息发送响应内容
  string Desc=12; //消息介绍内容,或其它数据
}


作者很懒惰!!

其它看源码和例子,有些注释,很简单 !

Documentation

Overview

Package grpool implements a goroutine reusable pool.

Index

Constants

This section is empty.

Variables

View Source
var (
	//最大连接池缓冲处理连接对像管道长度
	Max_client_channel_len = 10240
	//最大全局广播缓冲处理管道长度
	Max_broadcastQueue_len = 4096
	//最大频道广播缓冲处理管道长度
	Max_chanBroadcastQueue_len = 4096

	//最大接收消息缓冲处理管道长度
	Max_recvCh_len = 10240
	//最大发送消息缓冲处理管道长度
	Max_sendCh_len = 10240
)
View Source
var (
	ErrInvalidLengthSendMsg        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowSendMsg          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupSendMsg = fmt.Errorf("proto: unexpected end of group")
)

Functions

func Broadcast

func Broadcast(msg *SendMsg) error

func BroadcastAll

func BroadcastAll(msg *SendMsg) error

全局广播 广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像 通过此方法进行广播的消息体,会对所有的类型和频道都进行广播

func InitWsPool

func InitWsPool(errfun func(err interface{}))

初始化执行连接池对象 参数为接收连接池中运行时的一些错误信息的回调方法

func Send

func Send(msg *SendMsg) error

包级的公开方法 所有包级的发送如果连接断开,消息会丢失

// 发送消息 只从连接池中按指定的toClientId的连接对象发送出消息 在此方法中sendMsg.Channel指定的值不会处理

Types

type Client

type Client struct {
	CloseTime time.Time //连接断开的时间

	Id      string    //标识连接的名称
	IsClose chan bool //连接的状态。true为关闭
	// contains filtered or unexported fields
}

Client is a middleman between the websocket connection and the hub.

func NewClient

func NewClient(conf *Config) *Client

第一步,实例化连接对像

func (*Client) Close

func (c *Client) Close()

服务主动关闭连接

func (*Client) GetRuntimeInfo

func (c *Client) GetRuntimeInfo() *RuntimeInfo

获取连接对像运行过程中的信息

func (*Client) OnClose

func (c *Client) OnClose(h func())

监听连接对象的连接open成功的事件

func (*Client) OnError

func (c *Client) OnError(h func(err error))

监听连接对象的错误信息

func (*Client) OnMessage

func (c *Client) OnMessage(h func(msg *SendMsg))

监听连接对象的连接open成功的事件

func (*Client) OnOpen

func (c *Client) OnOpen(h func())

监听连接对象的连接open成功的事件

func (*Client) OpenClient

func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.Header)

开启连接 serveWs handles websocket requests from the peer.

func (*Client) Send

func (c *Client) Send(msg *SendMsg) error

单个连接发送消息

type Config

type Config struct {
	Id        string   //标识连接的名称
	Type      string   //连接类型或path
	Channel   []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	Goroutine int      //每个连接开启的go程数里 默认为10
}

连接参数结构体

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Goroutine Pool

func New

func New(limit ...int) *Pool

New creates and returns a new goroutine pool object. The parameter <limit> is used to limit the max goroutine count, which is not limited in default.

func (*Pool) Add

func (p *Pool) Add(f func()) error

Add pushes a new job to the pool. The job will be executed asynchronously.

func (*Pool) Cap

func (p *Pool) Cap() int

Cap returns the capacity of the pool. This capacity is defined when pool is created. If it returns -1 means no limit.

func (*Pool) Close

func (p *Pool) Close()

Close closes the goroutine pool, which makes all goroutines exit.

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns if pool is closed.

func (*Pool) Jobs

func (p *Pool) Jobs() int

Jobs returns current job count of the pool.

func (*Pool) Size

func (p *Pool) Size() int

Size returns current goroutine count of the pool.

type RuntimeInfo

type RuntimeInfo struct {
	Id              string //标识连接的名称
	Type            string //连接类型或path
	Ip              string
	Channel         []string  //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	OpenTime        time.Time //连接打开时间
	LastReceiveTime time.Time //最后一次接收到数据的时间
	LastSendTime    time.Time //最后一次发送数据的时间
}

type SendMsg

type SendMsg struct {
	Cmd                  int32    `protobuf:"varint,1,opt,name=cmd,proto3" json:"cmd,omitempty"`
	Timestamp            int64    `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	FromClientId         string   `protobuf:"bytes,3,opt,name=fromClientId,proto3" json:"fromClientId,omitempty"`
	ToClientId           string   `protobuf:"bytes,4,opt,name=toClientId,proto3" json:"toClientId,omitempty"`
	CmdData              []byte   `protobuf:"bytes,5,opt,name=cmdData,proto3" json:"cmdData,omitempty"`
	Status               int32    `protobuf:"varint,6,opt,name=status,proto3" json:"status,omitempty"`
	Priority             int32    `protobuf:"varint,7,opt,name=priority,proto3" json:"priority,omitempty"`
	LocalId              string   `protobuf:"bytes,8,opt,name=localId,proto3" json:"localId,omitempty"`
	ServerId             string   `protobuf:"bytes,9,opt,name=serverId,proto3" json:"serverId,omitempty"`
	Channel              []string `protobuf:"bytes,10,rep,name=channel,proto3" json:"channel,omitempty"`
	Msg                  string   `protobuf:"bytes,11,opt,name=msg,proto3" json:"msg,omitempty"`
	Desc                 string   `protobuf:"bytes,12,opt,name=Desc,proto3" json:"Desc,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*SendMsg) Descriptor

func (*SendMsg) Descriptor() ([]byte, []int)

func (*SendMsg) GetChannel

func (m *SendMsg) GetChannel() []string

func (*SendMsg) GetCmd

func (m *SendMsg) GetCmd() int32

func (*SendMsg) GetCmdData

func (m *SendMsg) GetCmdData() []byte

func (*SendMsg) GetDesc

func (m *SendMsg) GetDesc() string

func (*SendMsg) GetFromClientId

func (m *SendMsg) GetFromClientId() string

func (*SendMsg) GetLocalId

func (m *SendMsg) GetLocalId() string

func (*SendMsg) GetMsg

func (m *SendMsg) GetMsg() string

func (*SendMsg) GetPriority

func (m *SendMsg) GetPriority() int32

func (*SendMsg) GetServerId

func (m *SendMsg) GetServerId() string

func (*SendMsg) GetStatus

func (m *SendMsg) GetStatus() int32

func (*SendMsg) GetTimestamp

func (m *SendMsg) GetTimestamp() int64

func (*SendMsg) GetToClientId

func (m *SendMsg) GetToClientId() string

func (*SendMsg) Marshal added in v1.1.6

func (m *SendMsg) Marshal() (dAtA []byte, err error)

func (*SendMsg) MarshalTo added in v1.1.6

func (m *SendMsg) MarshalTo(dAtA []byte) (int, error)

func (*SendMsg) MarshalToSizedBuffer added in v1.1.6

func (m *SendMsg) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*SendMsg) ProtoMessage

func (*SendMsg) ProtoMessage()

func (*SendMsg) Reset

func (m *SendMsg) Reset()

func (*SendMsg) Size added in v1.1.6

func (m *SendMsg) Size() (n int)

func (*SendMsg) String

func (m *SendMsg) String() string

func (*SendMsg) Unmarshal added in v1.1.6

func (m *SendMsg) Unmarshal(dAtA []byte) error

func (*SendMsg) XXX_DiscardUnknown

func (m *SendMsg) XXX_DiscardUnknown()

func (*SendMsg) XXX_Marshal

func (m *SendMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SendMsg) XXX_Merge

func (m *SendMsg) XXX_Merge(src proto.Message)

func (*SendMsg) XXX_Size

func (m *SendMsg) XXX_Size() int

func (*SendMsg) XXX_Unmarshal

func (m *SendMsg) XXX_Unmarshal(b []byte) error

type Server

type Server struct {
	ErrFun func(err interface{}) //用于接收ws连接池内代码运行时错误信息
	// contains filtered or unexported fields
}

连接池的结构体

Directories

Path Synopsis
util
grpool
Package glist provides a concurrent-safe/unsafe doubly linked list.
Package glist provides a concurrent-safe/unsafe doubly linked list.
queue
bill 2018.1.8 优先级队列[同级别先进先出]权重值越大越优先
bill 2018.1.8 优先级队列[同级别先进先出]权重值越大越优先

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL