ws

package
v1.1.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: MIT Imports: 23 Imported by: 0

README

ws模块用法

protoc --go_out=. ws/msg.proto

//lib js  (msg_pb_libs.js+google-protobuf.js)
protoc --js_out=library=msg_pb_libs,binary:ws/js  ws/msg.proto

//commonjs  (msg_pb_dist.js or msg_pb_dist.min.js)
cd ws
protoc --js_out=import_style=commonjs,binary:js  msg.proto

cd js
npm i google-protobuf
npm i -g browserify
npm i -g minifier
browserify msg_pb.js <custom_pb.js> -o  msg_pb_dist.js
minify msg_pb_dist.js   //msg_pb_dist.min.js

https://www.npmjs.com/package/google-protobuf

demo

//TestWssRun.go
InitServer() //server invoke 服务端调用
InitClient() //client invoke 客户端调用
ctx := context.Background()

const (
    C2S_REQ  = 1
    S2C_RESP = 2
)
const pullMsgFromDB = 1

//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection *Connection, message *Message) error {
    log.Info(ctx, "server recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
    packet := GetPoolMessage(S2C_RESP)
    packet.PMsg().Data = []byte("server response")
    connection.SendMsg(ctx, packet, nil)

    connection.SendPullNotify(ctx, pullMsgFromDB)
    return nil
})

//server start
var createSrvPullerFunc = func(conn *Connection, pullChannelId int) SrvPuller {
    return NewDefaultSrvPuller(conn, pullChannelId, func(ctx context.Context, pullConn *Connection) {
        packet := GetPoolMessage(S2C_RESP)
        packet.PMsg().Data = []byte("first msg from db")
        pullConn.SendMsg(ctx, packet, nil)
    }, func(ctx context.Context, pullConn *Connection) {
        //msg from db...
        time.Sleep(time.Second * 1)

        packet := GetPoolMessage(S2C_RESP)
        packet.PMsg().Data = []byte("pull msg from db")
        pullConn.SendMsg(ctx, packet, nil)
    })
}

e := gin.New()
e.GET("/join", func(ctx *gin.Context) {
    connMeta := ConnectionMeta{
        UserId:   ctx.DefaultQuery("uid", ""),
        Typed:    0,
        DeviceId: "",
        Version:  0,
        Charset:  0,
    }
    _, err := AcceptGin(ctx, connMeta, DebugOption(true),
        SrvUpgraderCompressOption(true),
        CompressionLevelOption(2),
        ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn establish: %v", conn.Id())

            puller := createSrvPullerFunc(conn, pullMsgFromDB)
            safego.Go(func() {
                puller.PullSend()
            })
        }),
        ConnClosingHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn closing: %v", conn.Id())
        }),
        ConnClosedHandlerOption(func(ctx context.Context, conn *Connection) {
            log.Info(ctx, "server conn closed: %v", conn.Id())
        }),
        SrvPullChannelsOption([]int{pullMsgFromDB}))
    if err != nil {
        log.Error(ctx, "Accept client connection failed. error: %v", err)
        return
    }
})
go e.Run(":8003")

//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection *Connection, message *Message) error {
    log.Info(ctx, "client recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
    return nil
})
//client connect
uid := "100"
url := "ws://127.0.0.1:8003/join?uid=" + uid
conn, _ := DialConnect(context.Background(), url, http.Header{},
    DebugOption(true),
    ClientIdOption("server1"),
    ClientDialWssOption(url, false),
    ClientDialCompressOption(true),
    CompressionLevelOption(2),
    ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn establish: %v", conn.Id())
    }),
    ConnClosingHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn closing: %v", conn.Id())
    }),
    ConnClosedHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn closed: %v", conn.Id())
    }),
)
log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)

packet := GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request")
conn.SendMsg(context.Background(), packet, nil)

time.Sleep(time.Second * 10)

//client connect displace
conn2, _ := DialConnect(context.Background(), url, http.Header{},
    DebugOption(true),
    ClientIdOption("server2"),
    ClientDialWssOption(url, false),
    ConnEstablishHandlerOption(func(ctx context.Context, conn *Connection) {
        log.Info(ctx, "client conn establish: %v", conn.Id())
    }),
)
time.Sleep(time.Second)
packet = GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request2")
conn2.SendMsg(context.Background(), packet, nil)

time.Sleep(time.Minute * 1)

Documentation

Index

Constants

View Source
const (
	CONN_KIND_CLIENT = 0
	CONN_KIND_SERVER = 1
)
View Source
const (
	CHARSET_UTF8 = 0
	CHARSET_GBK  = 1
)

Variables

View Source
var (
	P_BASE_name = map[int32]string{
		0:  "raw_bytes_msg",
		-1: "s2c_err_displace",
	}
	P_BASE_value = map[string]int32{
		"raw_bytes_msg":    0,
		"s2c_err_displace": -1,
	}
)

Enum value maps for P_BASE.

View Source
var File_ws_msg_proto protoreflect.FileDescriptor

Functions

func AutoReDialConnect added in v1.1.7

func AutoReDialConnect(ctx context.Context, sUrl string, header http.Header, cancelAutoConn chan interface{}, connInterval time.Duration,
	opts ...ConnOption)

func InitClient added in v1.0.15

func InitClient()

client invoke 客户端调用

func InitServer added in v1.0.15

func InitServer()

server invoke 服务端调用

func InitServerWithOpt added in v1.1.21

func InitServerWithOpt(serverOpt ServerOption)

func RegisterDataMsgType added in v1.1.4

func RegisterDataMsgType(protocolId int32, pMsg IDataMessage)

注册数据消息类型[P_MESSAGE.Data],功能可选,当需要使用框架提供的池功能时使用

func RegisterHandler

func RegisterHandler(protocolId int32, h MsgHandler)

注册消息处理器

Types

type ConnOption

type ConnOption func(*Connection)

连接动态参数选项

func ClientAutoReconHandlerOption added in v1.1.22

func ClientAutoReconHandlerOption(handler EventHandler) ConnOption

func ClientDialCompressOption added in v1.1.5

func ClientDialCompressOption(compress bool) ConnOption

func ClientDialConnFailedHandlerOption added in v1.1.13

func ClientDialConnFailedHandlerOption(handler EventHandler) ConnOption

func ClientDialHandshakeTimeoutOption added in v1.1.5

func ClientDialHandshakeTimeoutOption(handshakeTimeout time.Duration) ConnOption

func ClientDialOption added in v1.1.5

func ClientDialOption(dialer *websocket.Dialer) ConnOption

func ClientDialRetryOption added in v1.1.5

func ClientDialRetryOption(retryNum int, retryInterval time.Duration) ConnOption

func ClientDialWssOption added in v1.1.5

func ClientDialWssOption(sUrl string, secureWss bool) ConnOption

func ClientIdOption added in v1.1.5

func ClientIdOption(id string) ConnOption

客户端专用 默认使用时间戳来记录客户端所连服务器的id

func CompressionLevelOption added in v1.1.5

func CompressionLevelOption(compressionLevel int) ConnOption

func ConnClosedHandlerOption added in v1.1.8

func ConnClosedHandlerOption(handler EventHandler) ConnOption

func ConnClosingHandlerOption added in v1.1.8

func ConnClosingHandlerOption(handler EventHandler) ConnOption

func ConnEstablishHandlerOption added in v1.1.8

func ConnEstablishHandlerOption(handler EventHandler) ConnOption

callback

func DebugOption added in v1.1.8

func DebugOption(debug bool) ConnOption

func NetMaxFailureRetryOption added in v1.1.5

func NetMaxFailureRetryOption(maxFailureRetry int) ConnOption

func NetReadWaitOption added in v1.1.5

func NetReadWaitOption(readWait time.Duration) ConnOption

func NetTemporaryWaitOption added in v1.1.5

func NetTemporaryWaitOption(temporaryWait time.Duration) ConnOption

func NetWriteWaitOption added in v1.1.5

func NetWriteWaitOption(writeWait time.Duration) ConnOption

func RecvPingHandlerOption added in v1.1.8

func RecvPingHandlerOption(handler EventHandler) ConnOption

func RecvPongHandlerOption added in v1.1.8

func RecvPongHandlerOption(handler EventHandler) ConnOption

func SendBufferOption

func SendBufferOption(bufferSize int) ConnOption

func SrvCheckOriginOption added in v1.1.5

func SrvCheckOriginOption(checkOrigin func(r *http.Request) bool) ConnOption

func SrvPullChannelsOption

func SrvPullChannelsOption(pullChannelIds []int) ConnOption

为每种消息拉取逻辑分别注册不同的通道

func SrvUpgraderCompressOption added in v1.1.5

func SrvUpgraderCompressOption(compress bool) ConnOption

func SrvUpgraderOption

func SrvUpgraderOption(upgrader *websocket.Upgrader) ConnOption

服务端特有 upgrader定制

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

websocket连接封装

func Accept

func AcceptGin

func AcceptGin(ctx *gin.Context, meta ConnectionMeta, opts ...ConnOption) (*Connection, error)

func DialConnect added in v1.1.5

func DialConnect(ctx context.Context, sUrl string, header http.Header, opts ...ConnOption) (*Connection, error)

func (*Connection) Charset

func (c *Connection) Charset() int

func (*Connection) ClientIp added in v1.1.10

func (c *Connection) ClientIp() string

func (*Connection) DeviceId added in v1.1.14

func (c *Connection) DeviceId() string

func (*Connection) DisplaceClientByIp added in v1.1.23

func (c *Connection) DisplaceClientByIp(ctx context.Context, displaceIp string)

func (*Connection) GetCommDataValue

func (c *Connection) GetCommDataValue(key string) (interface{}, bool)

连接数据存储结构

func (*Connection) GetPullChannel

func (c *Connection) GetPullChannel(pullChannelId int) (chan struct{}, bool)

func (*Connection) Id

func (c *Connection) Id() string

func (*Connection) IncrCommDataValueBy

func (c *Connection) IncrCommDataValueBy(key string, delta int)

func (*Connection) IsDisplaced

func (c *Connection) IsDisplaced() bool

func (*Connection) IsStopped

func (c *Connection) IsStopped() bool

func (*Connection) KickClient

func (c *Connection) KickClient(displace bool)

func (*Connection) KickServer

func (c *Connection) KickServer()

func (*Connection) RefreshDeadline

func (c *Connection) RefreshDeadline()

func (*Connection) RemoveCommDataValue

func (c *Connection) RemoveCommDataValue(key string)

func (*Connection) Reset added in v1.1.14

func (c *Connection) Reset()

func (*Connection) SendMsg

func (c *Connection) SendMsg(ctx context.Context, payload *Message, sc SendCallback) (err error)

func (*Connection) SendPullNotify

func (c *Connection) SendPullNotify(ctx context.Context, pullChannelId int) (err error)

通知指定消息通道转发消息

func (*Connection) SetCommDataValue

func (c *Connection) SetCommDataValue(key string, value interface{})

func (*Connection) Type

func (c *Connection) Type() int

func (*Connection) UserId

func (c *Connection) UserId() string

func (*Connection) Version

func (c *Connection) Version() int

type ConnectionMeta

type ConnectionMeta struct {
	UserId   string //userId
	Typed    int    //客户端类型枚举
	DeviceId string //设备ID
	Version  int    //版本
	Charset  int    //客户端使用的字符集
	// contains filtered or unexported fields
}

func (*ConnectionMeta) BuildConnId

func (m *ConnectionMeta) BuildConnId() string

type EventHandler added in v1.1.8

type EventHandler func(context.Context, *Connection)

客户端事件处理函数 ConnEstablishHandlerOption sync(阻塞主流程) ConnClosingHandlerOption sync(阻塞主流程) ConnClosedHandlerOption async

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

连接管理器

func (*Hub) ConnectionIds

func (h *Hub) ConnectionIds() []string

func (*Hub) Find

func (h *Hub) Find(id string) (*Connection, error)

func (*Hub) RangeConnsByFunc

func (h *Hub) RangeConnsByFunc(f func(string, *Connection) bool)

type HubOption added in v1.1.21

type HubOption func(IHub)

func HubShardOption added in v1.1.21

func HubShardOption(cnt uint16) HubOption

type IConnection added in v1.1.21

type IConnection interface {
	Id() string
	UserId() string
	Type() int
	DeviceId() string
	Version() int
	Charset() int
	ClientIp() string
	Reset()
	IsStopped() bool
	IsDisplaced() bool
	RefreshDeadline()
	SendMsg(ctx context.Context, payload IMessage, sc SendCallback) error

	KickClient(displace bool) //server side invoke
	KickServer()              //client side invoke

	GetPullChannel(pullChannelId int) (chan struct{}, bool)
	SendPullNotify(ctx context.Context, pullChannelId int) error

	GetCommDataValue(key string) (interface{}, bool)
	SetCommDataValue(key string, value interface{})
	RemoveCommDataValue(key string)
	IncrCommDataValueBy(key string, delta int)
}

type IDataMessage added in v1.1.4

type IDataMessage interface {
	proto.Message
	Reset()
}

P_MESSAGE.Data类型的接口

type IHub added in v1.1.21

type IHub interface {
	Find(string) (*Connection, error)
	RangeConnsByFunc(func(string, *Connection) bool)
	ConnectionIds() []string
	// contains filtered or unexported methods
}
var ClientConnHub IHub //服务端管理的来自客户端的连接
var ServerConnHub IHub //客户端管理的连向服务端的连接

type IMessage added in v1.1.21

type IMessage interface {
	PMsg() *P_MESSAGE
	DataMsg() IDataMessage
}

type Message added in v1.1.3

type Message struct {
	// contains filtered or unexported fields
}

不能手动创建,必须使用 NewMessage() 或 GetPoolMessage()

func GetPoolMessage added in v1.1.3

func GetPoolMessage(protocolId int32) *Message

pool message 对象池消息

func NewMessage added in v1.1.3

func NewMessage() *Message

normal message 普通消息

func (*Message) DataMsg added in v1.1.4

func (t *Message) DataMsg() IDataMessage

func (*Message) Marshal added in v1.1.4

func (t *Message) Marshal() ([]byte, error)

func (*Message) PMsg added in v1.1.3

func (t *Message) PMsg() *P_MESSAGE

func (*Message) Unmarshal added in v1.1.3

func (t *Message) Unmarshal(payload []byte) error

type MsgHandler added in v1.1.21

type MsgHandler func(context.Context, *Connection, *Message) error

客户端消息处理函数对象 use RegisterHandler(protocolId, MsgHandler)

type P_BASE added in v1.1.25

type P_BASE int32

基础协议

const (
	P_BASE_raw_bytes_msg    P_BASE = 0  //raw bytes
	P_BASE_s2c_err_displace P_BASE = -1 //被顶号
)

func (P_BASE) Descriptor added in v1.1.25

func (P_BASE) Descriptor() protoreflect.EnumDescriptor

func (P_BASE) Enum added in v1.1.25

func (x P_BASE) Enum() *P_BASE

func (P_BASE) EnumDescriptor deprecated added in v1.1.25

func (P_BASE) EnumDescriptor() ([]byte, []int)

Deprecated: Use P_BASE.Descriptor instead.

func (P_BASE) Number added in v1.1.25

func (x P_BASE) Number() protoreflect.EnumNumber

func (P_BASE) String added in v1.1.25

func (x P_BASE) String() string

func (P_BASE) Type added in v1.1.25

func (P_BASE) Type() protoreflect.EnumType

type P_DISPLACE added in v1.1.11

type P_DISPLACE struct {
	OldIp []byte `protobuf:"bytes,1,opt,name=old_ip,json=oldIp,proto3" json:"old_ip,omitempty"`
	NewIp []byte `protobuf:"bytes,2,opt,name=new_ip,json=newIp,proto3" json:"new_ip,omitempty"`
	Ts    int64  `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"`
	// contains filtered or unexported fields
}

func (*P_DISPLACE) Descriptor deprecated added in v1.1.11

func (*P_DISPLACE) Descriptor() ([]byte, []int)

Deprecated: Use P_DISPLACE.ProtoReflect.Descriptor instead.

func (*P_DISPLACE) GetNewIp added in v1.1.11

func (x *P_DISPLACE) GetNewIp() []byte

func (*P_DISPLACE) GetOldIp added in v1.1.11

func (x *P_DISPLACE) GetOldIp() []byte

func (*P_DISPLACE) GetTs added in v1.1.11

func (x *P_DISPLACE) GetTs() int64

func (*P_DISPLACE) ProtoMessage added in v1.1.11

func (*P_DISPLACE) ProtoMessage()

func (*P_DISPLACE) ProtoReflect added in v1.1.11

func (x *P_DISPLACE) ProtoReflect() protoreflect.Message

func (*P_DISPLACE) Reset added in v1.1.11

func (x *P_DISPLACE) Reset()

func (*P_DISPLACE) String added in v1.1.11

func (x *P_DISPLACE) String() string

type P_MESSAGE

type P_MESSAGE struct {
	ProtocolId int32  `protobuf:"varint,1,opt,name=protocol_id,json=protocolId,proto3" json:"protocol_id,omitempty"` //消息协议ID
	Data       []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`                                //内容-自定义消息
	// contains filtered or unexported fields
}

通用消息体封装

func (*P_MESSAGE) Descriptor deprecated

func (*P_MESSAGE) Descriptor() ([]byte, []int)

Deprecated: Use P_MESSAGE.ProtoReflect.Descriptor instead.

func (*P_MESSAGE) GetData

func (x *P_MESSAGE) GetData() []byte

func (*P_MESSAGE) GetProtocolId

func (x *P_MESSAGE) GetProtocolId() int32

func (*P_MESSAGE) ProtoMessage

func (*P_MESSAGE) ProtoMessage()

func (*P_MESSAGE) ProtoReflect

func (x *P_MESSAGE) ProtoReflect() protoreflect.Message

func (*P_MESSAGE) Reset

func (x *P_MESSAGE) Reset()

func (*P_MESSAGE) String

func (x *P_MESSAGE) String() string

type Puller added in v1.1.23

type Puller interface {
	PullSend()
}

func NewDefaultPuller added in v1.1.23

func NewDefaultPuller(conn *Connection, pullChannelId int, firstPullFunc, pullFunc func(context.Context, *Connection)) Puller

type SendCallback

type SendCallback func(ctx context.Context, c *Connection, err error)

消息发送回调接口

type ServerOption added in v1.1.21

type ServerOption struct {
	HubOpts []HubOption
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL