ws

package
v1.1.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2022 License: MIT Imports: 22 Imported by: 0

README

ws模块用法

protoc --go_out=. ws/msg.proto

//lib js  (msg_pb_libs.js+google-protobuf.js)
protoc --js_out=library=msg_pb_libs,binary:ws/js  ws/msg.proto

//commonjs  (msg_pb_dist.js or msg_pb_dist.min.js)
cd ws
protoc --js_out=import_style=commonjs,binary:js  msg.proto

cd js
npm i -g google-protobuf
npm i -g browserify
browserify msg_pb.js <custom_pb.js> -o  msg_pb_dist.js

https://www.npmjs.com/package/google-protobuf

demo

//TestWssRun.go
InitServer() //server invoke 服务端调用
InitClient() //client invoke 客户端调用
ctx := context.Background()

const (
    C2S_REQ  = 1
    S2C_RESP = 2
)
const pullMsgFromDB = 1

//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection *Connection, message *Message) error {
    log.Info(ctx, "server recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
    packet := GetPoolMessage(S2C_RESP)
    packet.PMsg().Data = []byte("server response")
    connection.SendMsg(ctx, packet, nil)

    connection.SendPullNotify(ctx, pullMsgFromDB)
    return nil
})

//server start
var createSrvPullerFunc = func(conn *Connection, pullChannelId int) SrvPuller {
    return NewDefaultSrvPuller(conn, pullChannelId, func(ctx context.Context, pullConn *Connection) {
        packet := GetPoolMessage(S2C_RESP)
        packet.PMsg().Data = []byte("first msg from db")
        pullConn.SendMsg(ctx, packet, nil)
    }, func(ctx context.Context, pullConn *Connection) {
        //msg from db...
        time.Sleep(time.Second * 1)

        packet := GetPoolMessage(S2C_RESP)
        packet.PMsg().Data = []byte("pull msg from db")
        pullConn.SendMsg(ctx, packet, nil)
    })
}

e := gin.New()
e.GET("/join", func(ctx *gin.Context) {
    connMeta := ConnectionMeta{
        UserId:   ctx.DefaultQuery("uid", ""),
        Typed:    0,
        DeviceId: "",
        Version:  0,
        Charset:  0,
    }
    _, err := AcceptGin(ctx, connMeta, DebugOption(true),
        SrvUpgraderCompressOption(true),
        CompressionLevelOption(2),
        ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn establish: %v", conn.Id())

            puller := createSrvPullerFunc(conn, pullMsgFromDB)
            safego.Go(func() {
                puller.PullSend()
            })
        }),
        ConnClosingHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn closing: %v", conn.Id())
        }),
        ConnClosedHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn closed: %v", conn.Id())
        }),
        SrvPullChannelsOption([]int{pullMsgFromDB}))
    if err != nil {
        log.Error(ctx, "Accept client connection failed. error: %v", err)
        return
    }
})
go e.Run(":8003")

//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection *Connection, message *Message) error {
    log.Info(ctx, "client recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
    return nil
})
//client connect
uid := "100"
url := "ws://127.0.0.1:8003/join?uid=" + uid
conn, _ := DialConnect(context.Background(), url, http.Header{},
    DebugOption(true),
    ClientIdOption("server1"),
    ClientDialWssOption(url, false),
    ClientDialCompressOption(true),
    CompressionLevelOption(2),
    ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn establish: %v", conn.Id())
    }),
    ConnClosingHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn closing: %v", conn.Id())
    }),
    ConnClosedHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn closed: %v", conn.Id())
    }),
)
log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)

packet := GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request")
conn.SendMsg(context.Background(), packet, nil)

time.Sleep(time.Second * 10)

//client connect displace
conn2, _ := DialConnect(context.Background(), url, http.Header{},
    DebugOption(true),
    ClientIdOption("server2"),
    ClientDialWssOption(url, false),
    ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn establish: %v", conn.Id())
    }),
)
time.Sleep(time.Second)
packet = GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request2")
conn2.SendMsg(context.Background(), packet, nil)

time.Sleep(time.Minute * 1)

Documentation

Index

Constants

View Source
const (
	CONN_KIND_CLIENT = 0
	CONN_KIND_SERVER = 1
)
View Source
const (
	CHARSET_UTF8 = 0
	CHARSET_GBK  = 1
)

Variables

View Source
var (
	P_S2C_name = map[int32]string{
		0:  "s2c_connected",
		-1: "s2c_err_displace",
	}
	P_S2C_value = map[string]int32{
		"s2c_connected":    0,
		"s2c_err_displace": -1,
	}
)

Enum value maps for P_S2C.

View Source
var ClientConnHub = newHub()
View Source
var File_ws_msg_proto protoreflect.FileDescriptor
View Source
var (
	Handlers = make(map[int32]Handler)
)
View Source
var ServerConnHub = newHub()

Functions

func AutoReDialConnect added in v1.1.7

func AutoReDialConnect(ctx context.Context, sUrl string, header http.Header, cancelAutoConn chan interface{}, connInterval time.Duration,
	opts ...ConnOption)

func InitClient added in v1.0.15

func InitClient()

func InitServer added in v1.0.15

func InitServer()

func RegisterDataMsgType added in v1.1.4

func RegisterDataMsgType(protocolId int32, pMsg IDataMessage)

注册数据消息类型[P_MESSAGE.Data],功能可选,当需要使用框架提供的池功能时使用

func RegisterHandler

func RegisterHandler(cmd int32, h Handler)

注册消息处理器

Types

type ConnOption

type ConnOption func(*Connection)

连接动态参数选项

func ClientDialCompressOption added in v1.1.5

func ClientDialCompressOption(compress bool) ConnOption

func ClientDialConnFailedHandlerOption added in v1.1.13

func ClientDialConnFailedHandlerOption(handler EventHandler) ConnOption

func ClientDialHandshakeTimeoutOption added in v1.1.5

func ClientDialHandshakeTimeoutOption(handshakeTimeout time.Duration) ConnOption

func ClientDialOption added in v1.1.5

func ClientDialOption(dialer *websocket.Dialer) ConnOption

func ClientDialRetryOption added in v1.1.5

func ClientDialRetryOption(retryNum int, retryInterval time.Duration) ConnOption

func ClientDialWssOption added in v1.1.5

func ClientDialWssOption(sUrl string, secureWss bool) ConnOption

func ClientIdOption added in v1.1.5

func ClientIdOption(id string) ConnOption

客户端专用 默认使用时间戳来记录客户端所连服务器的id

func CompressionLevelOption added in v1.1.5

func CompressionLevelOption(compressionLevel int) ConnOption

func ConnClosedHandlerOption added in v1.1.8

func ConnClosedHandlerOption(handler EventHandler) ConnOption

func ConnClosingHandlerOption added in v1.1.8

func ConnClosingHandlerOption(handler EventHandler) ConnOption

func ConnEstablishHandlerOption added in v1.1.8

func ConnEstablishHandlerOption(handler EventHandler) ConnOption

callback

func DebugOption added in v1.1.8

func DebugOption(debug bool) ConnOption

func NetMaxFailureRetryOption added in v1.1.5

func NetMaxFailureRetryOption(maxFailureRetry int) ConnOption

func NetReadWaitOption added in v1.1.5

func NetReadWaitOption(readWait time.Duration) ConnOption

func NetTemporaryWaitOption added in v1.1.5

func NetTemporaryWaitOption(temporaryWait time.Duration) ConnOption

func NetWriteWaitOption added in v1.1.5

func NetWriteWaitOption(writeWait time.Duration) ConnOption

func RecvPingHandlerOption added in v1.1.8

func RecvPingHandlerOption(handler EventHandler) ConnOption

func RecvPongHandlerOption added in v1.1.8

func RecvPongHandlerOption(handler EventHandler) ConnOption

func SendBufferOption

func SendBufferOption(bufferSize int) ConnOption

func SrvCheckOriginOption added in v1.1.5

func SrvCheckOriginOption(checkOrigin func(r *http.Request) bool) ConnOption

func SrvPullChannelsOption

func SrvPullChannelsOption(channels []int) ConnOption

为每种消息拉取逻辑分别注册不同的通道

func SrvUpgraderCompressOption added in v1.1.5

func SrvUpgraderCompressOption(compress bool) ConnOption

func SrvUpgraderOption

func SrvUpgraderOption(upgrader *websocket.Upgrader) ConnOption

服务端特有 upgrader定制

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

websocket连接封装

func Accept

func AcceptGin

func AcceptGin(ctx *gin.Context, meta ConnectionMeta, opts ...ConnOption) (*Connection, error)

func Connect

func Connect(ctx context.Context, sId, sUrl string, secureWss bool, header http.Header, opts ...ConnOption) (*Connection, error)

func DialConnect added in v1.1.5

func DialConnect(ctx context.Context, sUrl string, header http.Header, opts ...ConnOption) (*Connection, error)

func (*Connection) Charset

func (c *Connection) Charset() int

func (*Connection) ClientIp added in v1.1.10

func (c *Connection) ClientIp() string

func (*Connection) DeviceId added in v1.1.14

func (c *Connection) DeviceId() string

func (*Connection) GetCommDataValue

func (c *Connection) GetCommDataValue(key string) (interface{}, bool)

连接数据存储结构

func (*Connection) GetPullChannel

func (c *Connection) GetPullChannel(pullChannelId int) (chan struct{}, bool)

func (*Connection) Id

func (c *Connection) Id() string

func (*Connection) IncrCommDataValueBy

func (c *Connection) IncrCommDataValueBy(key string, delta int)

func (*Connection) IsDisplaced

func (c *Connection) IsDisplaced() bool

func (*Connection) IsStopped

func (c *Connection) IsStopped() bool

func (*Connection) KickClient

func (c *Connection) KickClient(displace bool)

displace=true,通常在集群环境下,踢掉在其他集群节点建立的连接,当前节点不需要主动调用

func (*Connection) KickServer

func (c *Connection) KickServer(displace bool)

displace=true,通常在集群环境下,踢掉在其他集群节点建立的连接,当前节点不需要主动调用

func (*Connection) RefreshDeadline

func (c *Connection) RefreshDeadline()

func (*Connection) RemoveCommDataValue

func (c *Connection) RemoveCommDataValue(key string)

func (*Connection) Reset added in v1.1.14

func (c *Connection) Reset()

func (*Connection) SendMsg

func (c *Connection) SendMsg(ctx context.Context, payload *Message, sc SendCallback) (err error)

func (*Connection) SendPullNotify

func (c *Connection) SendPullNotify(ctx context.Context, pullChannelId int) (err error)

通知指定消息通道转发消息

func (*Connection) SetCommDataValue

func (c *Connection) SetCommDataValue(key string, value interface{})

func (*Connection) Type

func (c *Connection) Type() int

func (*Connection) UserId

func (c *Connection) UserId() string

func (*Connection) Version

func (c *Connection) Version() int

type ConnectionMeta

type ConnectionMeta struct {
	UserId   string //userId
	Typed    int    //客户端类型枚举
	DeviceId string //设备ID
	Version  int    //版本
	Charset  int    //客户端使用的字符集
	// contains filtered or unexported fields
}

func (*ConnectionMeta) BuildConnId

func (m *ConnectionMeta) BuildConnId() string

type EventHandler added in v1.1.8

type EventHandler func(context.Context, *Connection)

客户端事件处理函数 ConnEstablishHandlerOption sync(阻塞主流程) ConnClosingHandlerOption sync(阻塞主流程) ConnClosedHandlerOption async

type Handler

type Handler func(context.Context, *Connection, *Message) error

客户端消息处理函数对象 use RegisterHandler(constant...., func(context.Context,*Connection,*Message) error {})

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

连接管理器

func (*Hub) ConnectionIds

func (h *Hub) ConnectionIds() []string

func (*Hub) Find

func (h *Hub) Find(id string) (*Connection, error)

func (*Hub) RangeConnsByFunc

func (h *Hub) RangeConnsByFunc(f func(string, *Connection) bool)

type IDataMessage added in v1.1.4

type IDataMessage interface {
	proto.Message
	Reset()
}

P_MESSAGE.Data类型的接口

type Message added in v1.1.3

type Message struct {
	// contains filtered or unexported fields
}

不能手动创建,必须使用 NewMessage() 或 GetPoolMessage()

func GetPoolMessage added in v1.1.3

func GetPoolMessage(protocolId int32) *Message

对象池消息

func NewMessage added in v1.1.3

func NewMessage() *Message

普通消息

func (*Message) DataMsg added in v1.1.4

func (t *Message) DataMsg() IDataMessage

func (*Message) Marshal added in v1.1.4

func (t *Message) Marshal() ([]byte, error)

func (*Message) PMsg added in v1.1.3

func (t *Message) PMsg() *P_MESSAGE

func (*Message) Unmarshal added in v1.1.3

func (t *Message) Unmarshal(payload []byte) error

type P_DISPLACE added in v1.1.11

type P_DISPLACE struct {
	OldIp []byte `protobuf:"bytes,1,opt,name=old_ip,json=oldIp,proto3" json:"old_ip,omitempty"`
	NewIp []byte `protobuf:"bytes,2,opt,name=new_ip,json=newIp,proto3" json:"new_ip,omitempty"`
	Ts    int64  `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
	// contains filtered or unexported fields
}

func (*P_DISPLACE) Descriptor deprecated added in v1.1.11

func (*P_DISPLACE) Descriptor() ([]byte, []int)

Deprecated: Use P_DISPLACE.ProtoReflect.Descriptor instead.

func (*P_DISPLACE) GetNewIp added in v1.1.11

func (x *P_DISPLACE) GetNewIp() []byte

func (*P_DISPLACE) GetOldIp added in v1.1.11

func (x *P_DISPLACE) GetOldIp() []byte

func (*P_DISPLACE) GetTs added in v1.1.11

func (x *P_DISPLACE) GetTs() int64

func (*P_DISPLACE) ProtoMessage added in v1.1.11

func (*P_DISPLACE) ProtoMessage()

func (*P_DISPLACE) ProtoReflect added in v1.1.11

func (x *P_DISPLACE) ProtoReflect() protoreflect.Message

func (*P_DISPLACE) Reset added in v1.1.11

func (x *P_DISPLACE) Reset()

func (*P_DISPLACE) String added in v1.1.11

func (x *P_DISPLACE) String() string

type P_MESSAGE

type P_MESSAGE struct {
	ProtocolId int32  `protobuf:"varint,1,opt,name=protocol_id,json=protocolId,proto3" json:"protocol_id,omitempty"` //消息协议ID
	Data       []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`                                //内容-自定义消息
	// contains filtered or unexported fields
}

通用消息体封装

func (*P_MESSAGE) Descriptor deprecated

func (*P_MESSAGE) Descriptor() ([]byte, []int)

Deprecated: Use P_MESSAGE.ProtoReflect.Descriptor instead.

func (*P_MESSAGE) GetData

func (x *P_MESSAGE) GetData() []byte

func (*P_MESSAGE) GetProtocolId

func (x *P_MESSAGE) GetProtocolId() int32

func (*P_MESSAGE) ProtoMessage

func (*P_MESSAGE) ProtoMessage()

func (*P_MESSAGE) ProtoReflect

func (x *P_MESSAGE) ProtoReflect() protoreflect.Message

func (*P_MESSAGE) Reset

func (x *P_MESSAGE) Reset()

func (*P_MESSAGE) String

func (x *P_MESSAGE) String() string

type P_S2C

type P_S2C int32

基础协议

const (
	P_S2C_s2c_connected    P_S2C = 0  //连接成功建立
	P_S2C_s2c_err_displace P_S2C = -1 //被顶号
)

func (P_S2C) Descriptor

func (P_S2C) Descriptor() protoreflect.EnumDescriptor

func (P_S2C) Enum

func (x P_S2C) Enum() *P_S2C

func (P_S2C) EnumDescriptor deprecated

func (P_S2C) EnumDescriptor() ([]byte, []int)

Deprecated: Use P_S2C.Descriptor instead.

func (P_S2C) Number

func (x P_S2C) Number() protoreflect.EnumNumber

func (P_S2C) String

func (x P_S2C) String() string

func (P_S2C) Type

func (P_S2C) Type() protoreflect.EnumType

type SendCallback

type SendCallback func(ctx context.Context, c *Connection, err error)

消息发送回调接口

type SrvPuller added in v1.1.14

type SrvPuller interface {
	PullSend()
}

func NewDefaultSrvPuller added in v1.1.14

func NewDefaultSrvPuller(conn *Connection, pullChannelId int, firstPullFunc, pullFunc func(context.Context, *Connection)) SrvPuller

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL