gws

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

中文

GWS

logo

Simple, Fast, Reliable WebSocket Server & Client

awesome codecov Go Test go-reportcard HelloGithub license go-version

Introduction

GWS (Go WebSocket) is a very simple, fast, reliable and feature-rich WebSocket implementation written in Go. It is designed to be used in highly-concurrent environments, and it is suitable for building API, Proxy, Game, Live Video, Message, etc. It supports both server and client side with a simple API which mean you can easily write a server or client by yourself.

GWS developed base on Event-Driven model. every connection has a goroutine to handle the event, and the event is able to be processed in a non-blocking way.

Why GWS

  • Simplicity and Ease of Use

    • User-Friendly: Simple and clear WebSocket Event API design makes server-client interaction easy.
    • Code Efficiency: Minimizes the amount of code needed to implement complex WebSocket solutions.
  • High-Performance

    • High IOPS Low Latency: Designed for rapid data transmission and reception, ideal for time-sensitive applications.
    • Low Memory Usage: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership.
  • Reliability and Stability

    • Robust Error Handling: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
    • Well-Developed Test Cases: Passed all Autobahn test cases, fully compliant with RFC 7692. Unit test coverage is almost 100%, covering all conditional branches.

Benchmark

IOPS (Echo Server)

GOMAXPROCS=4, Connection=1000, CompressEnabled=false

performance

GoBench

go test -benchmem -run=^$ -bench . github.com/L1LSunflower/gws
goos: linux
goarch: amd64
pkg: github.com/L1LSunflower/gws
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkConn_WriteMessage/compress_disabled-12                  5263632               232.3 ns/op            24 B/op          1 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-12                     99663             11265 ns/op             386 B/op          1 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-12                   7809654               152.4 ns/op             8 B/op          0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-12                     326257              3133 ns/op              81 B/op          1 allocs/op 
PASS
ok      github.com/L1LSunflower/gws    17.231s

Index

Feature

  • Event API
  • Broadcast
  • Dial via Proxy
  • Context-Takeover
  • Concurrent & Asynchronous Non-Blocking Write
  • Segmented Writing of Large Files
  • Passed Autobahn Test Cases Server / Client

Attention

  • The errors returned by the gws.Conn export methods are ignorable, and are handled internally.
  • Transferring large files with gws tends to block the connection.
  • If HTTP Server is reused, it is recommended to enable goroutine, as blocking will prevent the context from being GC.

Install

go get -v github.com/L1LSunflower/gws@latest

Event

type Event interface {
    OnOpen(socket *Conn)                        // connection is established
    OnClose(socket *Conn, err error)            // received a close frame or input/output error occurs
    OnPing(socket *Conn, payload []byte)        // received a ping frame
    OnPong(socket *Conn, payload []byte)        // received a pong frame
    OnMessage(socket *Conn, message *Message)   // received a text/binary frame
}

Quick Start

package main

import "github.com/L1LSunflower/gws"

func main() {
	gws.NewServer(&gws.BuiltinEventHandler{}, nil).Run(":6666")
}

Best Practice

package main

import (
	"github.com/L1LSunflower/gws"
	"net/http"
	"time"
)

const (
	PingInterval = 5 * time.Second
	PingWait     = 10 * time.Second
)

func main() {
	upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
		ParallelEnabled:  true,                                 // Parallel message processing
		Recovery:          gws.Recovery,                         // Exception recovery
		PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // Enable compression
	})
	http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
		socket, err := upgrader.Upgrade(writer, request)
		if err != nil {
			return
		}
		go func() {
			socket.ReadLoop() // Blocking prevents the context from being GC.
		}()
	})
	http.ListenAndServe(":6666", nil)
}

type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
	_ = socket.WritePong(nil)
}

func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
	defer message.Close()
	socket.WriteMessage(message.Opcode, message.Bytes())
}

More Examples

KCP

  • server
package main

import (
	"log"
	"github.com/L1LSunflower/gws"
	kcp "github.com/xtaci/kcp-go"
)

func main() {
	listener, err := kcp.Listen(":6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app := gws.NewServer(&gws.BuiltinEventHandler{}, nil)
	app.RunListener(listener)
}
  • client
package main

import (
	"github.com/L1LSunflower/gws"
	kcp "github.com/xtaci/kcp-go"
	"log"
)

func main() {
	conn, err := kcp.Dial("127.0.0.1:6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app, _, err := gws.NewClientFromConn(&gws.BuiltinEventHandler{}, nil, conn)
	if err != nil {
		log.Println(err.Error())
		return
	}
	app.ReadLoop()
}

Proxy

Dial via proxy, using socks5 protocol.

package main

import (
	"crypto/tls"
	"github.com/L1LSunflower/gws"
	"golang.org/x/net/proxy"
	"log"
)

func main() {
	socket, _, err := gws.NewClient(new(gws.BuiltinEventHandler), &gws.ClientOption{
		Addr:      "wss://example.com/connect",
		TlsConfig: &tls.Config{InsecureSkipVerify: true},
		NewDialer: func() (gws.Dialer, error) {
			return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
		},
		PermessageDeflate: gws.PermessageDeflate{
			Enabled:               true,
			ServerContextTakeover: true,
			ClientContextTakeover: true,
		},
	})
	if err != nil {
		log.Println(err.Error())
		return
	}
	socket.ReadLoop()
}

Broadcast

Create a Broadcaster instance, call the Broadcast method in a loop to send messages to each client, and close the broadcaster to reclaim memory. The message is compressed only once.

func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
    var b = gws.NewBroadcaster(opcode, payload)
    defer b.Close()
    for _, item := range conns {
        _ = b.Broadcast(item)
    }
}

WriteWithTimeout

SetDeadline covers most of the scenarios, but if you want to control the timeout for each write, you need to encapsulate the WriteWithTimeout function, the creation and destruction of the timer will incur some overhead.

func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {
	var sig = atomic.Uint32{}
	var timer = time.AfterFunc(timeout, func() {
		if sig.CompareAndSwap(0, 1) {
			socket.WriteClose(1000, []byte("write timeout"))
		}
	})
	var err = socket.WriteMessage(gws.OpcodeText, p)
	if sig.CompareAndSwap(0, 1) {
		timer.Stop()
	}
	return err
}

Pub / Sub

Use the event_emitter package to implement the publish-subscribe model. Wrap gws.Conn in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription.

This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.

package main

import (
    "github.com/lxzan/event_emitter"
    "github.com/L1LSunflower/gws"
)

type Subscriber gws.Conn

func NewSubscriber(conn *gws.Conn) *Subscriber { return (*Subscriber)(conn) }

func (c *Subscriber) GetSubscriberID() int64 {
    userId, _ := c.GetMetadata().Load("userId")
    return userId.(int64)
}

func (c *Subscriber) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }

func (c *Subscriber) Conn() *gws.Conn { return (*gws.Conn)(c) }

func Subscribe(em *event_emitter.EventEmitter[int64, *Subscriber], s *Subscriber, topic string) {
    em.Subscribe(s, topic, func(msg any) {
        _ = msg.(*gws.Broadcaster).Broadcast(s.Conn())
    })
}

func Publish(em *event_emitter.EventEmitter[int64, *Subscriber], topic string, msg []byte) {
    var broadcaster = gws.NewBroadcaster(gws.OpcodeText, msg)
    defer broadcaster.Close()
    em.Publish(topic, broadcaster)
}

Autobahn Test

cd examples/autobahn
mkdir reports
docker run -it --rm \
    -v ${PWD}/config:/config \
    -v ${PWD}/reports:/reports \
    crossbario/autobahn-testsuite \
    wstest -m fuzzingclient -s /config/fuzzingclient.json

Communication

微信需要先添加好友再拉群, 请注明来自 GitHub

WeChat      QQ

Buy me a coffee

WeChat

Acknowledgments

The following project had particular influence on gws's design.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrUnauthorized 未通过鉴权认证
	// Failure to pass forensic authentication
	ErrUnauthorized = errors.New("unauthorized")

	// ErrHandshake 握手错误, 请求头未通过校验
	// Handshake error, request header does not pass checksum.
	ErrHandshake = errors.New("handshake error")

	// ErrCompressionNegotiation 压缩拓展协商失败, 请尝试关闭压缩
	// Compression extension negotiation failed, please try to disable compression.
	ErrCompressionNegotiation = errors.New("invalid compression negotiation")

	// ErrSubprotocolNegotiation 子协议协商失败
	// Sub-protocol negotiation failed
	ErrSubprotocolNegotiation = errors.New("sub-protocol negotiation failed")

	// ErrTextEncoding 文本消息编码错误(必须是utf8编码)
	// Text message encoding error (must be utf8)
	ErrTextEncoding = errors.New("invalid text encoding")

	// ErrMessageTooLarge 消息体积过大
	// message is too large
	ErrMessageTooLarge = errors.New("message too large")

	// ErrConnClosed 连接已关闭
	// Connection closed
	ErrConnClosed = net.ErrClosed

	// ErrUnsupportedProtocol 不支持的网络协议
	// Unsupported network protocols
	ErrUnsupportedProtocol = errors.New("unsupported protocol")
)

Functions

func Recovery

func Recovery(logger Logger)

Recovery 异常恢复,并记录错误信息 Exception recovery with logging of error messages

func SetBufferThreshold

func SetBufferThreshold(x uint32)

SetBufferThreshold 设置buffer阈值, x=pow(2,n), 超过x个字节的buffer不会被回收 Set the buffer threshold, x=pow(2,n), that buffers larger than x bytes are not reclaimed.

Types

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster

NewBroadcaster 创建广播器 Creates a broadcaster 相比循环调用 WriteAsync, Broadcaster 只会压缩一次消息, 可以节省大量 CPU 开销. Instead of calling WriteAsync in a loop, Broadcaster compresses the message only once, saving a lot of CPU overhead.

func (*Broadcaster) Broadcast

func (c *Broadcaster) Broadcast(socket *Conn) error

Broadcast 广播 向客户端发送广播消息 Send a broadcast message to a client.

func (*Broadcaster) Close

func (c *Broadcaster) Close() error

Close 释放资源 Releases resources 在完成所有 Broadcast 调用之后执行 Close 方法释放资源。 Call the Close method after all the Broadcasts have been completed to release the resources.

type BuiltinEventHandler

type BuiltinEventHandler struct{}

func (BuiltinEventHandler) OnClose

func (b BuiltinEventHandler) OnClose(socket *Conn, err error)

func (BuiltinEventHandler) OnMessage

func (b BuiltinEventHandler) OnMessage(socket *Conn, message *Message)

func (BuiltinEventHandler) OnOpen

func (b BuiltinEventHandler) OnOpen(socket *Conn)

func (BuiltinEventHandler) OnPing

func (b BuiltinEventHandler) OnPing(socket *Conn, payload []byte)

func (BuiltinEventHandler) OnPong

func (b BuiltinEventHandler) OnPong(socket *Conn, payload []byte)

type ClientOption

type ClientOption struct {
	// 写缓冲区的大小, v1.4.5版本此参数被废弃
	// Deprecated: Size of the write buffer, v1.4.5 version of this parameter is deprecated
	WriteBufferSize int

	// PermessageDeflate 配置
	// PermessageDeflate configuration
	PermessageDeflate PermessageDeflate

	// 是否启用并行处理
	// Whether parallel processing is enabled
	ParallelEnabled bool

	// 并行协程限制
	// Parallel goroutine limit
	ParallelGolimit int

	// 读取最大负载大小
	// Maximum payload size for reading
	ReadMaxPayloadSize int

	// 读取缓冲区大小
	// Read buffer size
	ReadBufferSize int

	// 写入最大负载大小
	// Maximum payload size for writing
	WriteMaxPayloadSize int

	// 是否启用 UTF-8 检查
	// Whether UTF-8 check is enabled
	CheckUtf8Enabled bool

	// 日志记录器
	// Logger
	Logger Logger

	// 恢复函数
	// Recovery function
	Recovery func(logger Logger)

	// 连接地址, 例如 wss://example.com/connect
	// Server address, e.g., wss://example.com/connect
	Addr string

	// 额外的请求头
	// Extra request headers
	RequestHeader http.Header

	// 握手超时时间
	// Handshake timeout duration
	HandshakeTimeout time.Duration

	// TLS 设置
	// TLS configuration
	TlsConfig *tls.Config

	// 拨号器
	// 默认是返回 net.Dialer 实例, 也可以用于设置代理.
	// The default is to return the net.Dialer instance.
	// Can also be used to set a proxy, for example:
	// NewDialer: func() (proxy.Dialer, error) {
	//     return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
	// },
	NewDialer func() (Dialer, error)

	// 创建 session 存储空间
	// 用于自定义 SessionStorage 实现
	// For custom SessionStorage implementations
	NewSession func() SessionStorage
}

ClientOption 客户端配置 Client configurations

type CloseError

type CloseError struct {
	// 关闭代码,表示关闭连接的原因
	// Close code, indicating the reason for closing the connection
	Code uint16

	// 关闭原因,详细描述关闭的原因
	// Close reason, providing a detailed description of the closure
	Reason []byte
}

func (*CloseError) Error

func (c *CloseError) Error() string

Error 关闭错误的描述 Returns a description of the close error

type ConcurrentMap

type ConcurrentMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ConcurrentMap 并发安全的映射结构 concurrency-safe map structure

func NewConcurrentMap

func NewConcurrentMap[K comparable, V any](size ...uint64) *ConcurrentMap[K, V]

NewConcurrentMap 创建一个新的并发安全映射 creates a new concurrency-safe map arg0 表示分片的数量;arg1 表示分片的初始化容量 arg0 represents the number of shardings; arg1 represents the initialized capacity of a sharding.

func (*ConcurrentMap[K, V]) Delete

func (c *ConcurrentMap[K, V]) Delete(key K)

Delete 删除键对应的值 Delete deletes the value for a key

func (*ConcurrentMap[K, V]) GetSharding

func (c *ConcurrentMap[K, V]) GetSharding(key K) *Map[K, V]

GetSharding 返回一个键的分片映射 returns a map sharding for a key 分片中的操作是无锁的,需要手动加锁 The operations inside the sharding are lockless and need to be locked manually.

func (*ConcurrentMap[K, V]) Len

func (c *ConcurrentMap[K, V]) Len() int

Len 返回映射中的元素数量 Len returns the number of elements in the map

func (*ConcurrentMap[K, V]) Load

func (c *ConcurrentMap[K, V]) Load(key K) (value V, ok bool)

Load 返回映射中键对应的值,如果不存在则返回 nil returns the value stored in the map for a key, or nil if no value is present ok 结果表示是否在映射中找到了值 The ok result indicates whether the value was found in the map

func (*ConcurrentMap[K, V]) Range

func (c *ConcurrentMap[K, V]) Range(f func(key K, value V) bool)

Range 遍历 如果 f 返回 false,遍历停止 If f returns false, range stops the iteration

func (*ConcurrentMap[K, V]) Store

func (c *ConcurrentMap[K, V]) Store(key K, value V)

Store 设置键对应的值 sets the value for a key

type Config

type Config struct {

	// 是否开启并行消息处理
	// Whether to enable parallel message processing
	ParallelEnabled bool

	// (单个连接)用于并行消息处理的协程数量限制
	// Limit on the number of concurrent goroutines used for parallel message processing (single connection)
	ParallelGolimit int

	// 最大读取的消息内容长度
	// Maximum read message content length
	ReadMaxPayloadSize int

	// 读缓冲区的大小
	// Size of the read buffer
	ReadBufferSize int

	// 最大写入的消息内容长度
	// Maximum length of written message content
	WriteMaxPayloadSize int

	// 写缓冲区的大小, v1.4.5版本此参数被废弃
	// Deprecated: Size of the write buffer, v1.4.5 version of this parameter is deprecated
	WriteBufferSize int

	// 是否检查文本utf8编码, 关闭性能会好点
	// Whether to check the text utf8 encoding, turn off the performance will be better
	CheckUtf8Enabled bool

	// 消息回调(OnMessage)的恢复程序
	// Message callback (OnMessage) recovery program
	Recovery func(logger Logger)

	// 日志工具
	// Logging tools
	Logger Logger
	// contains filtered or unexported fields
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn WebSocket连接 WebSocket connection

func NewClient

func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error)

NewClient 创建一个新的 WebSocket 客户端连接 Creates a new WebSocket client connection

func NewClientFromConn

func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Conn, *http.Response, error)

NewClientFromConn 通过外部连接创建客户端, 支持 TCP/KCP/Unix Domain Socket Create new client via external connection, supports TCP/KCP/Unix Domain Socket.

func (*Conn) Async

func (c *Conn) Async(f func())

Async 异步 将任务加入发送队列(并发度为1), 执行异步操作 注意: 不要加入长时间阻塞的任务 Add the task to the send queue (concurrency 1), perform asynchronous operation. Note: Don't add tasks that are blocking for a long time.

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr 返回本地网络地址 Returns the local network address

func (*Conn) NetConn

func (c *Conn) NetConn() net.Conn

NetConn 获取底层的 TCP/TLS/KCP 等连接 Gets the underlying TCP/TLS/KCP... connection

func (*Conn) ReadLoop

func (c *Conn) ReadLoop()

ReadLoop 循环读取消息. 如果复用了HTTP Server, 建议开启goroutine, 阻塞会导致请求上下文无法被GC. Read messages in a loop. If HTTP Server is reused, it is recommended to enable goroutine, as blocking will prevent the context from being GC.

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr 返回远程网络地址 Returns the remote network address

func (*Conn) Session

func (c *Conn) Session() SessionStorage

Session 获取会话存储 Gets the session storage

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

SetDeadline 设置连接的截止时间 Sets the deadline for the connection

func (*Conn) SetNoDelay

func (c *Conn) SetNoDelay(noDelay bool) error

SetNoDelay 设置无延迟 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle算法). 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送. Controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

SetReadDeadline 设置读取操作的截止时间 Sets the deadline for read operations

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

SetWriteDeadline 设置写入操作的截止时间 Sets the deadline for write operations

func (*Conn) SubProtocol

func (c *Conn) SubProtocol() string

SubProtocol 获取协商的子协议 Gets the negotiated sub-protocol

func (*Conn) WriteAsync

func (c *Conn) WriteAsync(opcode Opcode, payload []byte, callback func(error))

WriteAsync 异步写 Writes messages asynchronously 异步非阻塞地将消息写入到任务队列, 收到回调后才允许回收payload内存 Write messages to the task queue asynchronously and non-blockingly, allowing payload memory to be recycled only after receiving the callback

func (*Conn) WriteClose

func (c *Conn) WriteClose(code uint16, reason []byte) error

WriteClose 发送关闭帧并断开连接 没有特殊需求的话, 推荐code=1000, reason=nil Send shutdown frame, active disconnection If you don't have any special needs, we recommend code=1000, reason=nil https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes

func (*Conn) WriteFile

func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error

WriteFile 大文件写入 采用分段写入技术, 减少写入过程中的内存占用 Segmented write technology to reduce memory usage during write process

func (*Conn) WriteMessage

func (c *Conn) WriteMessage(opcode Opcode, payload []byte) error

WriteMessage 写入文本/二进制消息, 文本消息应该使用UTF8编码 Writes text/binary messages, text messages should be encoded in UTF8.

func (*Conn) WritePing

func (c *Conn) WritePing(payload []byte) error

WritePing 写入Ping消息, 携带的信息不要超过125字节 Control frame length cannot exceed 125 bytes

func (*Conn) WritePong

func (c *Conn) WritePong(payload []byte) error

WritePong 写入Pong消息, 携带的信息不要超过125字节 Control frame length cannot exceed 125 bytes

func (*Conn) WriteString

func (c *Conn) WriteString(s string) error

WriteString 写入文本消息, 使用UTF8编码. Write text messages, should be encoded in UTF8.

func (*Conn) Writev

func (c *Conn) Writev(opcode Opcode, payloads ...[]byte) error

Writev 类似 WriteMessage, 区别是可以一次写入多个切片 Writev is similar to WriteMessage, except that you can write multiple slices at once.

func (*Conn) WritevAsync

func (c *Conn) WritevAsync(opcode Opcode, payloads [][]byte, callback func(error))

WritevAsync 类似 WriteAsync, 区别是可以一次写入多个切片 It's similar to WriteAsync, except that you can write multiple slices at once.

type ConnInterface

type ConnInterface interface {
	ReadLoop()
	SetDeadline(t time.Time) error
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	NetConn() net.Conn
	SetNoDelay(noDelay bool) error
	SubProtocol() string
	Session() SessionStorage
	WriteClose(code uint16, reason []byte) error
	WritePing(payload []byte) error
	WritePong(payload []byte) error
	WriteString(s string) error
	WriteMessage(opcode Opcode, payload []byte) error
	WriteAsync(opcode Opcode, payload []byte, callback func(error))
	Writev(opcode Opcode, payloads ...[]byte) error
	WritevAsync(opcode Opcode, payloads [][]byte, callback func(error))
	Async(f func())
}

type Dialer

type Dialer interface {
	// Dial 连接到指定网络上的地址
	// Connects to the address on the named network
	Dial(network, addr string) (c net.Conn, err error)
}

Dialer 拨号器接口 Dialer interface

type Event

type Event interface {
	// OnOpen 建立连接事件
	// WebSocket connection was successfully established
	OnOpen(socket ConnInterface)

	// OnClose 关闭事件
	// 接收到了网络连接另一端发送的关闭帧, 或者IO过程中出现错误主动断开连接
	// 如果是前者, err可以断言为*CloseError
	// Received a close frame from the other end of the network connection, or disconnected voluntarily due to an error in the IO process
	// In the former case, err can be asserted as *CloseError
	OnClose(socket ConnInterface, err error)

	// OnPing 心跳探测事件
	// Received a ping frame
	OnPing(socket ConnInterface, payload []byte)

	// OnPong 心跳响应事件
	// Received a pong frame
	OnPong(socket ConnInterface, payload []byte)

	// OnMessage 消息事件
	// 如果开启了ParallelEnabled, 会并行地调用OnMessage; 没有做recover处理.
	// If ParallelEnabled is enabled, OnMessage is called in parallel. No recover is done.
	OnMessage(socket ConnInterface, message *Message)
}

type Logger

type Logger interface {
	// Error 打印错误日志
	// Printing the error log
	Error(v ...any)
}

Logger 日志接口 Logger interface

type Map

type Map[K comparable, V any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Map 线程安全的泛型映射类型. thread-safe generic map type.

func NewMap

func NewMap[K comparable, V any](size ...int) *Map[K, V]

NewMap 创建一个新的 Map 实例 creates a new instance of Map size 参数用于指定初始容量,如果未提供则默认为 0 The size parameter is used to specify the initial capacity, defaulting to 0 if not provided.

func (*Map[K, V]) Delete

func (c *Map[K, V]) Delete(key K)

Delete 从 Map 中删除指定键的值. deletes the value for the specified key from the Map.

func (*Map[K, V]) Len

func (c *Map[K, V]) Len() int

Len 返回 Map 中的元素数量. Len returns the number of elements in the Map.

func (*Map[K, V]) Load

func (c *Map[K, V]) Load(key K) (value V, ok bool)

Load 从 Map 中加载指定键的值. Load loads the value for the specified key from the Map.

func (*Map[K, V]) Range

func (c *Map[K, V]) Range(f func(K, V) bool)

Range 遍历 如果函数返回 false,遍历将提前终止. If the function returns false, the iteration stops early.

func (*Map[K, V]) Store

func (c *Map[K, V]) Store(key K, value V)

Store 将指定键值对存储到 Map 中. stores the specified key-value pair in the Map.

type Message

type Message struct {

	// 操作码
	// opcode of the message
	Opcode Opcode

	// 消息内容
	// content of the message
	Data *bytes.Buffer
	// contains filtered or unexported fields
}

func (*Message) Bytes

func (c *Message) Bytes() []byte

Bytes 返回消息的数据缓冲区的字节切片 Returns the byte slice of the message's data buffer

func (*Message) Close

func (c *Message) Close() error

Close 关闭消息, 回收资源 Close message, recycling resources

func (*Message) Read

func (c *Message) Read(p []byte) (n int, err error)

Read 从消息中读取数据到给定的字节切片 p 中 Reads data from the message into the given byte slice p

type Opcode

type Opcode uint8

Opcode 操作码

const (
	OpcodeContinuation    Opcode = 0x0 // 	继续
	OpcodeText            Opcode = 0x1 // 	文本
	OpcodeBinary          Opcode = 0x2 // 	二级制
	OpcodeCloseConnection Opcode = 0x8 // 	关闭
	OpcodePing            Opcode = 0x9 // 	心跳探测
	OpcodePong            Opcode = 0xA //	心跳回应
)

type PermessageDeflate

type PermessageDeflate struct {
	// 是否开启压缩
	// Whether to turn on compression
	Enabled bool

	// 压缩级别
	// Compress level
	Level int

	// 压缩阈值, 长度小于阈值的消息不会被压缩, 仅适用于无上下文接管模式.
	// Compression threshold, messages below the threshold will not be compressed, only for context-free takeover mode.
	Threshold int

	// 压缩器内存池大小
	// 数值越大竞争的概率越小, 但是会耗费大量内存
	// Compressor memory pool size
	// The higher the value the lower the probability of competition, but it will consume a lot of memory
	PoolSize int

	// 服务端上下文接管
	// Server side context takeover
	ServerContextTakeover bool

	// 客户端上下文接管
	// Client side context takeover
	ClientContextTakeover bool

	// 服务端滑动窗口指数
	// 取值范围 8<=n<=15, 表示pow(2,n)个字节
	// The server-side sliding window index
	// Range 8<=n<=15, means pow(2,n) bytes.
	ServerMaxWindowBits int

	// 客户端滑动窗口指数
	// 取值范围 8<=x<=15, 表示pow(2,n)个字节
	// The client-side sliding window index
	// Range 8<=n<=15, means pow(2,n) bytes.
	ClientMaxWindowBits int
}

PermessageDeflate 压缩拓展配置 对于gws client, 建议开启上下文接管, 不修改滑动窗口指数, 提供最好的兼容性. 对于gws server, 如果开启上下文接管, 每个连接会占用更多内存, 合理配置滑动窗口指数. For gws client, it is recommended to enable contextual takeover and not modify the sliding window index to provide the best compatibility. For gws server, if you turn on context-side takeover, each connection takes up more memory, configure the sliding window index appropriately.

type Server

type Server struct {

	// 错误处理回调函数
	// Error handling callback function
	OnError func(conn net.Conn, err error)

	// 请求处理回调函数
	// Request handling callback function
	OnRequest func(conn net.Conn, br *bufio.Reader, r *http.Request)
	// contains filtered or unexported fields
}

Server WebSocket服务器 Websocket server

func NewServer

func NewServer(eventHandler Event, option *ServerOption) *Server

NewServer 创建一个新的 WebSocket 服务器实例 Creates a new WebSocket server instance

func (*Server) GetUpgrader

func (c *Server) GetUpgrader() *Upgrader

GetUpgrader 获取服务器的升级器实例 Retrieves the upgrader instance of the server

func (*Server) Run

func (c *Server) Run(addr string) error

Run 启动 WebSocket 服务器,监听指定地址 Starts the WebSocket server and listens on the specified address

func (*Server) RunListener

func (c *Server) RunListener(listener net.Listener) error

RunListener 使用指定的监听器运行 WebSocket 服务器 Runs the WebSocket server using the specified listener

func (*Server) RunTLS

func (c *Server) RunTLS(addr string, certFile, keyFile string) error

RunTLS 启动支持 TLS 的 WebSocket 服务器,监听指定地址 Starts the WebSocket server with TLS support and listens on the specified address

type ServerOption

type ServerOption struct {

	// 写缓冲区的大小, v1.4.5版本此参数被废弃
	// Deprecated: Size of the write buffer, v1.4.5 version of this parameter is deprecated
	WriteBufferSize int

	// PermessageDeflate 配置
	// PermessageDeflate configuration
	PermessageDeflate PermessageDeflate

	// 是否启用并行处理
	// Whether parallel processing is enabled
	ParallelEnabled bool

	// 并行协程限制
	// Parallel goroutine limit
	ParallelGolimit int

	// 读取最大负载大小
	// Maximum payload size for reading
	ReadMaxPayloadSize int

	// 读取缓冲区大小
	// Read buffer size
	ReadBufferSize int

	// 写入最大负载大小
	// Maximum payload size for writing
	WriteMaxPayloadSize int

	// 是否启用 UTF-8 检查
	// Whether UTF-8 check is enabled
	CheckUtf8Enabled bool

	// 日志记录器
	// Logger
	Logger Logger

	// 恢复函数
	// Recovery function
	Recovery func(logger Logger)

	// TLS 设置
	// TLS configuration
	TlsConfig *tls.Config

	// 握手超时时间
	// Handshake timeout duration
	HandshakeTimeout time.Duration

	// WebSocket 子协议, 握手失败会断开连接
	// WebSocket sub-protocol, handshake failure disconnects the connection
	SubProtocols []string

	// 额外的响应头(可能不受客户端支持)
	// Additional response headers (may not be supported by the client)
	// https://www.rfc-editor.org/rfc/rfc6455.html#section-1.3
	ResponseHeader http.Header

	// 鉴权函数,用于连接建立的请求
	// Authentication function for connection establishment requests
	Authorize func(r *http.Request, session SessionStorage) bool

	// 创建 session 存储空间,用于自定义 SessionStorage 实现
	// Create session storage space for custom SessionStorage implementations
	NewSession func() SessionStorage
	// contains filtered or unexported fields
}

ServerOption 服务端配置 Server configurations

type SessionStorage

type SessionStorage interface {
	// Len 返回存储中的键值对数量
	// Returns the number of key-value pairs in the storage
	Len() int

	// Load 根据键获取值,如果键存在则返回值和 true,否则返回 nil 和 false
	// retrieves the value for a given key. If the key exists, it returns the value and true; otherwise, it returns nil and false
	Load(key string) (value any, exist bool)

	// Delete 根据键删除存储中的键值对
	// removes the key-value pair from the storage for a given key
	Delete(key string)

	// Store 存储键值对
	// saves the key-value pair in the storage
	Store(key string, value any)

	// Range 遍历
	// 如果函数返回 false,遍历将提前终止.
	// If the function returns false, the iteration stops early.
	Range(f func(key string, value any) bool)
}

SessionStorage 会话存储

type Upgrader

type Upgrader struct {
	// contains filtered or unexported fields
}

func NewUpgrader

func NewUpgrader(eventHandler Event, option *ServerOption) *Upgrader

NewUpgrader 创建一个新的 Upgrader 实例 Creates a new instance of Upgrader

func (*Upgrader) Upgrade

func (c *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error)

Upgrade 升级 HTTP 连接到 WebSocket 连接 Upgrades the HTTP connection to a WebSocket connection

func (*Upgrader) UpgradeFromConn

func (c *Upgrader) UpgradeFromConn(conn net.Conn, br *bufio.Reader, r *http.Request) (*Conn, error)

UpgradeFromConn 从现有的网络连接升级到 WebSocket 连接 Upgrades from an existing network connection to a WebSocket connection

Directories

Path Synopsis
examples
wss

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL