net_websocket

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

* net_websocket
websocket support
** 选项 - io层
#+begin_src go
// ServerOption
//go:generate gogen option -n ServerOption -o option.server.go
func walleServer() interface{} {
	return map[string]interface{}{
		// Addr Server Addr. websocket监听地址
		"Addr": string(":8080"),
		// WsPath websocket server path。 路由
		"WsPath": string("/ws"),
		// Upgrade websocket upgrade
		"Upgrade": (*websocket.Upgrader)(DefaultUpgrade),
		// UpgradeFail upgrade fail notify.
		"UpgradeFail": func(w http.ResponseWriter, r *http.Request, reason error) {},
		// accepted load limit.链接数量限制
		"AcceptLoadLimit": func(sess Session, cnt int64) bool { return false },
		// Process Options 传递给process的选项。
		"ProcessOptions": []process.ProcessOption{},
		// process router。路由
		"Router": Router(nil),
		// SessionRouter custom session router。定制路由
		"SessionRouter": func(sess Session, global Router) (r Router) { return global },
		// log interface 日志接口
		"Logger": (*zaplog.Logger)(zaplog.Default),
		// SessionLogger custom session logger。定制日志接口
		"SessionLogger": func(sess Session, global *zaplog.Logger) (r *zaplog.Logger) { return global },
		// NewSession custom session。新链接通知
		"NewSession": func(in Session, r *http.Request) (Session, error) { return in, nil },
		// StopImmediately when session finish,business finish immediately.
		// 链接断开后,是否停止处理流程。 应该根据业务分析决定。
		"StopImmediately": false,
		// ReadTimeout read timetout
		"ReadTimeout": time.Duration(0),
		// WriteTimeout write timeout
		"WriteTimeout": time.Duration(0),
		// MaxMessageLimit limit message size
		"MaxMessageLimit": int(0),
		// Write network data method.
		"WriteMethods": WriteMethod(WriteAsync),
		// SendQueueSize async send queue size
		"SendQueueSize": int(1024),
		// Heartbeat use websocket ping/pong.
		"Heartbeat": time.Duration(0),
		// HttpServeMux custom set mux
		"HttpServeMux": (*http.ServeMux)(http.DefaultServeMux),
	}
}
#+end_src

** 服务器
#+begin_src go
import (
	"fmt"

	"github.com/walleframe/net_websocket"
	"github.com/walleframe/walle/process"
	"github.com/walleframe/walle/testpkg/wpb"
	"github.com/walleframe/walle/zaplog"
)

func main() {
	p := 12345
	fmt.Println("port:", p)

	zaplog.SetFrameLogger(zaplog.NoopLogger)
	zaplog.SetLogicLogger(zaplog.NoopLogger)
	// zaplog.SetFrameLogger(zaplog.GetLogicLogger())

	wpb.RegisterWSvcService(process.GetRouter(), &wpb.WPBSvc{})
	svc := net_websocket.NewServer()
	svc.Run(fmt.Sprintf(":%d", p))
}
#+end_src

** 客户端
#+begin_src go
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/walleframe/net_websocket"
	"github.com/walleframe/walle/testpkg/wpb"
	"github.com/walleframe/walle/util"
	"github.com/walleframe/walle/zaplog"
)

func main() {
	zaplog.SetFrameLogger(zaplog.NoopLogger)
	zaplog.SetLogicLogger(zaplog.NoopLogger)
	// zaplog.SetFrameLogger(zaplog.GetLogicLogger())

	cli, err := net_websocket.NewClient(
		fmt.Sprintf("ws://localhost:%d/ws", 12345), nil,
	)
	if err != nil {
		util.PanicIfError(err)
	}
	time.Sleep(time.Second)

	wcli := wpb.NewWSvcClient(cli)
	ctx := context.Background()

	rs, err := wcli.Add(ctx, &wpb.AddRq{Params: []int64{100}})
	fmt.Println(rs, err)
}

#+end_src

Documentation

Index

Constants

View Source
const (
	WriteAsync       = network.WriteAsync
	WriteImmediately = network.WriteImmediately
)

import const value

Variables

View Source
var DefaultUpgrade = &websocket.Upgrader{
	ReadBufferSize:  4096,
	WriteBufferSize: 4096,
}
View Source
var GoServerContextPool process.ContextPool = &goServerContextPool{
	Pool: sync.Pool{
		New: func() interface{} {
			return &sessionCtx{}
		},
	},
}

Functions

func InstallServerOptionsWatchDog

func InstallServerOptionsWatchDog(dog func(cc *ServerOptions))

InstallServerOptionsWatchDog install watch dog

func NewService

func NewService(name string, opt ...ServerOption) app.Service

Types

type Client

type Client = network.Client

import type

type ClientContext

type ClientContext = network.ClientContext

import type

type Router

type Router = process.Router

import type

type Server

type Server = network.Server

import type

type ServerOption

type ServerOption func(cc *ServerOptions) ServerOption

ServerOption option define

func WithAcceptLoadLimit

func WithAcceptLoadLimit(v func(sess Session, cnt int64) bool) ServerOption

accepted load limit

func WithAddr

func WithAddr(v string) ServerOption

Addr Server Addr

func WithFrameLogger

func WithFrameLogger(v *zaplog.Logger) ServerOption

frame log

func WithHeartbeat

func WithHeartbeat(v time.Duration) ServerOption

Heartbeat use websocket ping/pong.

func WithHttpServeMux

func WithHttpServeMux(v *http.ServeMux) ServerOption

HttpServeMux custom set mux

func WithMaxMessageLimit

func WithMaxMessageLimit(v int) ServerOption

MaxMessageLimit limit message size

func WithNewSession

func WithNewSession(v func(in Session, r *http.Request) (Session, error)) ServerOption

NewSession custom session

func WithProcessOptions

func WithProcessOptions(v ...process.ProcessOption) ServerOption

Process Options

func WithReadTimeout

func WithReadTimeout(v time.Duration) ServerOption

ReadTimeout read timetou

func WithRouter

func WithRouter(v Router) ServerOption

process router

func WithSendQueueSize

func WithSendQueueSize(v int) ServerOption

SendQueueSize async send queue size

func WithSessionLogger

func WithSessionLogger(v func(sess Session, global *zaplog.Logger) (r *zaplog.Logger)) ServerOption

SessionLogger custom session logger

func WithSessionRouter

func WithSessionRouter(v func(sess Session, global Router) (r Router)) ServerOption

SessionRouter custom session router

func WithStopImmediately

func WithStopImmediately(v bool) ServerOption

StopImmediately when session finish,business finish immediately.

func WithUpgrade

func WithUpgrade(v *websocket.Upgrader) ServerOption

Upgrade websocket upgrade

func WithUpgradeFail

func WithUpgradeFail(v func(w http.ResponseWriter, r *http.Request, reason error)) ServerOption

SessoinFilter

func WithWriteMethods

func WithWriteMethods(v WriteMethod) ServerOption

Write network data method.

func WithWriteTimeout

func WithWriteTimeout(v time.Duration) ServerOption

WriteTimeout write timeout

func WithWsPath

func WithWsPath(v string) ServerOption

WsPath websocket server path

type ServerOptions

type ServerOptions struct {
	// Addr Server Addr
	Addr string
	// WsPath websocket server path
	WsPath string
	// Upgrade websocket upgrade
	Upgrade *websocket.Upgrader
	// SessoinFilter
	UpgradeFail func(w http.ResponseWriter, r *http.Request, reason error)
	// accepted load limit
	AcceptLoadLimit func(sess Session, cnt int64) bool
	// Process Options
	ProcessOptions []process.ProcessOption
	// process router
	Router Router
	// SessionRouter custom session router
	SessionRouter func(sess Session, global Router) (r Router)
	// frame log
	FrameLogger *zaplog.Logger
	// SessionLogger custom session logger
	SessionLogger func(sess Session, global *zaplog.Logger) (r *zaplog.Logger)
	// NewSession custom session
	NewSession func(in Session, r *http.Request) (Session, error)
	// StopImmediately when session finish,business finish immediately.
	StopImmediately bool
	// ReadTimeout read timetou
	ReadTimeout time.Duration
	// WriteTimeout write timeout
	WriteTimeout time.Duration
	// MaxMessageLimit limit message size
	MaxMessageLimit int
	// Write network data method.
	WriteMethods WriteMethod
	// SendQueueSize async send queue size
	SendQueueSize int
	// Heartbeat use websocket ping/pong.
	Heartbeat time.Duration
	// HttpServeMux custom set mux
	HttpServeMux *http.ServeMux
}

ServerOption

func NewServerOptions

func NewServerOptions(opts ...ServerOption) *ServerOptions

NewServerOptions create options instance.

func (*ServerOptions) ApplyOption

func (cc *ServerOptions) ApplyOption(opts ...ServerOption)

ApplyOption modify options

func (*ServerOptions) GetSetOption

func (cc *ServerOptions) GetSetOption(opt ServerOption) ServerOption

GetSetOption modify and get last option

func (*ServerOptions) SetOption

func (cc *ServerOptions) SetOption(opt ServerOption)

SetOption modify options

type Session

type Session = network.Session

import type

type SessionContext

type SessionContext = network.SessionContext

import type

type WriteMethod

type WriteMethod = network.WriteMethod

import type

type WsServer

type WsServer struct {
	// contains filtered or unexported fields
}

WsServer websocket server

func NewServer

func NewServer(opts ...ServerOption) *WsServer

func (*WsServer) Broadcast

func (s *WsServer) Broadcast(uri interface{}, msg interface{}, md metadata.MD) error

func (*WsServer) BroadcastFilter

func (s *WsServer) BroadcastFilter(filter func(Session) bool, uri interface{}, msg interface{}, md metadata.MD) error

func (*WsServer) ForEach

func (s *WsServer) ForEach(f func(Session))

func (*WsServer) HttpServeWs

func (s *WsServer) HttpServeWs(w http.ResponseWriter, r *http.Request)

serveWs handles websocket requests from the peer.

func (*WsServer) Run

func (s *WsServer) Run(addr string) (err error)

func (*WsServer) Serve

func (s *WsServer) Serve(ln net.Listener) (err error)

func (*WsServer) Shutdown

func (s *WsServer) Shutdown(ctx context.Context) (err error)

type WsService

type WsService struct {
	// contains filtered or unexported fields
}

WsService implement app.Service interface

func (*WsService) Finish

func (svc *WsService) Finish()

func (*WsService) Init

func (svc *WsService) Init(s app.Stoper) (err error)

func (*WsService) Name

func (svc *WsService) Name() string

func (*WsService) Start

func (svc *WsService) Start(s app.Stoper) (err error)

func (*WsService) Stop

func (svc *WsService) Stop()

type WsSession

type WsSession struct {
	// process
	*rpc.RPCProcess
	// contains filtered or unexported fields
}

server session

func NewClient

func NewClient(addr string, head http.Header, opts ...process.ProcessOption) (*WsSession, error)

NewClient 新建客户端。NOTE: websocket socket 客户端不支持自动重连.仅用于测试

func NewClientEx

func NewClientEx(addr string, head http.Header,
	inner *process.InnerOptions,
	svr *ServerOptions,
) (cli *WsSession, err error)

NewClientEx 创建客户端。NOTE: websocket socket 客户端不支持自动重连.仅用于测试 inner *process.InnerOptions 选项应该由上层ClientProxy去决定如何设置。 svr 内部应该设置链接相关的参数。比如读写超时,如何发送数据 opts 业务方决定

func (*WsSession) AddCloseClientFunc

func (sess *WsSession) AddCloseClientFunc(f func(Client))

func (*WsSession) AddCloseSessionFunc

func (sess *WsSession) AddCloseSessionFunc(f func(sess Session))

func (*WsSession) ClientValid

func (sess *WsSession) ClientValid() bool

func (*WsSession) Close

func (sess *WsSession) Close() (err error)

func (*WsSession) GetConn

func (sess *WsSession) GetConn() interface{}

GetConn get raw conn(net.Conn,websocket.Conn...)

func (*WsSession) GetServer

func (sess *WsSession) GetServer() Server

GetServer get raw server(*WsServer,*TcpServer...)

func (*WsSession) Run

func (sess *WsSession) Run()

Run run client

func (*WsSession) SessionValue

func (sess *WsSession) SessionValue(key interface{}) interface{}

Value wrap context.Context.Value

func (*WsSession) WithSessionValue

func (sess *WsSession) WithSessionValue(key, value interface{})

WithValue wrap context.WithValue

func (*WsSession) Write

func (sess *WsSession) Write(in []byte) (n int, err error)

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL